Multi-GPU training with estimator, tf.keras and tf.data

文/ Zalando Research Research scientist Kashif Rasul

源| TensorFlow Public number

Like most AI research departments, Zalando Research I also realized the importance of experimenting with ideas and rapid prototyping. As data sets become larger and larger, it becomes useful to understand how to leverage the shared resources we have to efficiently and quickly train deep learning models. The

TensorFlow estimator API is useful for training models using multiple GPUs in a distributed environment. This article will focus on this workflow. We first use the Fashion-MNIST small dataset to train a custom estimator written in tf.keras and then introduce a more practical use case at the end of the article.

TL; DR: Basically, we need to remember that for the tf.keras. model, we can use tf.keras.estimator.model_to_estimator method to convert it to tf.estimator.Estimator object, you can use tf. The estimator API is used for training. Once the conversion is complete, we can use the mechanisms provided by the estimator to configure the training model with different hardware configurations.

import os import time

#!pip install -q -U tensorflow-gpu
import tensorflow as tf

import numpy as np

Import Fashion-MNIST DatasetWe replaced the MNIST with the Fashion-MNIST dataset, which contains thousands of grayscale images of Zalando fashion articles. Getting training and test data is very simple, as follows:

(train_images, train_labels), (test_images, test_labels) = 
   tf.keras.datasets.fashion_mnist.load_data()

We want to convert the pixel values ​​of these images from a number between 0 and 255 to a number between 0 and 1, and convert the data set In the format [B, H, W, C], where B represents the number of batch images, H and W are height and width, respectively, and C is the number of channels in our dataset (grayscale is 1):

TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)

train_images = np.asarray(train_images, dtype=np.float32) / 255
# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))

test_images = np.asarray(test_images, dtype=np.float32) / 255
# Convert the test images and add channels
test_images = test_images.reshape((TEST_SIZE, 28, 28, 1 ))

Next, we want to convert the tag from an integer number (for example, 2 or pullover) to a one-hot encoding (for example, 0,0,1,0,0,0,0,0,0,0). To do this, we use the tf.keras.utils.to_categorical function: # How many categories we are predicting from (0-9)
LABEL_DIMENSIONS = 10

train_labels = tf.keras.utils.to_categorical(train_labels, 
                                            LABEL_DIMENSIONS)

test_labels = tf.keras.utils.to_categorical(test_labels,
                                           LABEL_DIMENSIONS)

# Cast the labels to floats, needed later
train_labels = train_labels.astype(np.float32)
test_labels = test_labels.astype(np.float32)

Build tf.keras modelWe will use the Keras function API to create a neural network. Keras is a high-level API for building and training deep learning models that are modular in design, easy to use, and easy to extend. Tf.keras is TensorFlow's implementation of this API, which supports Eager Execution, tf.data pipelines, and estimators.

In terms of architecture, we will use ConvNet. A very general statement is that ConvNet is a stack of convolutional layers (Conv2D) and pooling layers (MaxPooling2D). But most importantly, ConvNet treats each training example as a 3D shape tensor (height, width, channel). For grayscale images, the tensor starts at channel = 1 and returns a 3D tensor.

So, after the ConvNet part, we need to flatten the tensor and add a dense layer, the last one returns a LABEL_DIMENSIONS size vector with tf.nn.softmax activation:

inputs = tf.keras.Input(shape=(28,28,1))  # Returns a placeholder

x = tf.keras.layers.Conv2D(filters=32, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(inputs)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS,
                                   activation=tf.nn.softmax)(x)

Now we can To define the learning model, select the optimizer (we choose one from TensorFlow instead of the optimizer from tf.keras.optimrs) and compile:

model = tf.keras.Model(inputs=inputs, outputs=predictions)

optimizer = tf.train.AdamOptimizer(learning_rate=0.001)

model.compile(loss='categorical_crossentropy',
             optimizer=optimizer,
             metrics=['accuracy'])

Create estimatorCreate with compiled Keras model The estimator, which we call the model_to_estimator method. Note that the initial model state of the Keras model is saved in the estimator that was created.

What are the advantages of the estimator? First, let's mention the following:

  • You can run an estimator-based model on a local host or distributed multi-GPU environment without changing your model; the
  • estimator simplifies between model developers The shared implementation; the
  • estimator can build graphics for you, so it's a bit like Eager Execution, with no explicit sessions.

So how do we train a simple tf.keras model to use multiple GPUs? We can use the tf.contrib.distribute.MirroredStrategy paradigm to perform in-graphic copying through synchronous training. To learn more about this strategy, watch the Distributed TensorFlow Training Lecture. Note: Distributed TensorFlow links https://www.youtube.com/watch?v=bRMGoPqsn20

Basically, each worker GPU has a network copy and gets a subset of the data to calculate the local gradient and then wait for all The worker ends in a synchronous manner. Workers then pass their local gradients to each other through Ring All-reduce operations, which are typically optimized to reduce network bandwidth and increase throughput. After all the gradients have arrived, each worker will calculate its average and update the parameters, then start the next step. Ideally, you have multiple high-speed interconnected GPUs on a single node.

To use this strategy, we first create an estimator with the compiled tf.keras model and then assign it to the MirroredStrategy configuration via RunConfig config. By default, this configuration uses all GPUs, but you can also give it a num_gpus option to use a specific number of GPUs:

NUM_GPUS = 2

strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)

estimator = tf.keras.estimator.model_to_estimator(model,
                                                 config=config)

Create estimator input functions To pipe the data to the estimator, we need to define a data import function that returns the tf.data dataset (image, label) of the bulk data. The following function receives a numpy array and returns the dataset through an ETL procedure.

Note that in the end we also called the prefetch method, which buffers the data to the GPU during training so that the next batch of data is ready and waiting for the GPU instead of letting the GPU wait for data on each iteration. The GPU may still not be fully utilized. To improve this, we can use a fused version of the conversion operation (such as shuffle_and_repeat) instead of two separate operations. However, I chose a simple use case here.

def input_fn(images, labels, epochs, batch_size):

     # Convert the inputs to a Dataset. (E)
    ds = tf.data.Dataset.from_tensor_slices((images, labels))    

    # Shuffle, repeat, and batch the examples. (T)
    SHUFFLE_SIZE = 5000
    ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)
    ds = ds.prefetch(2)    

    # Return the dataset. (L)
    return ds

培训估计器First, we define a SessionRunHook class that records the number of iterations per iteration of the random gradient descent method:

class TimeHistory(tf.train.SessionRunHook):
    def begin(self):
       self.times = []    

    def before_run(self, run_context):
       self.iter_time_start = time.time()    

    def after_run(self, run_context, run_values):
       self.times.append(time.time() - self.iter_time_start)

亮点在这里! We can call the train function on the estimator and give it the input_fn we defined (including the batch size and the number of training rounds we want) and the TimeHistory instance via the hooks parameter:

time_hist = TimeHistory()

BATCH_SIZE = 512
EPOCHS = 5

estimator.train(lambda:input_fn(train_images,
                               train_labels,
                               epochs=EPOCHS,
                               batch_size=BATCH_SIZE),
               hooks=[time_hist])

performanceNow, we can Use time hooks to calculate the total time of training and the average number of images per second (average throughput):

total_time = sum(time_hist.times)
print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")

avg_time_per_batch = np.mean(time_hist.times)
print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with
       {NUM_GPUS} GPU(s)")

Fashion-MNIST training throughput and total time when training with two K80 GPUs, using different NUM_GPUS, Display scaling is poor

evaluation estimatorIn order to verify the performance of the model, we need to call the evaluation method for the estimator:

estimator.evaluate(lambda:input_fn(test_images, 
                                  test_labels,
                                  epochs=1,
                                  batch_size=BATCH_SIZE))

retina OCT (optical coherence tomography) image example To handle the extended performance of larger data sets, we use the retina OCT image dataset, which is one of Kaggle's many large data sets. The data set consists of cross-sectional X-ray images of living human retinas and is divided into four categories: NORMAL, CNV, DME, and DRUSEN: representative images of

optical coherence tomography, selected from Kermany et al. "Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning"

This dataset has a total of 84,495 X-ray images in JPEG format, mostly in size 512x496, which can be downloaded via Kaggle CLI: Note: CLI link After the https://github.com/Kaggle/kaggle-api

#!pip install kaggle
#!kaggle datasets download -d paultimothymooney/kermany2018

download is complete, the training set and test set image classes are in their respective folders, so we can define the pattern as:

labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']

train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg')
test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')

Next, We're going to write the estimator's input function, which extracts any file pattern and returns the scaled image and the uniquely encoded tag as tf.data.Dataset. This time, we follow the best practices in the Input Pipe Performance Guide. Note that if prefetch's buffer_size is None, TensorFlow will automatically use the optimal prefetch buffer size: Note: Input Pipe Performance Guide Link Https://www.tensorflow.org/performance/datasets_performance

1    def input_fn(file_pattern, labels, 
2                        image_size=(224,224), 
3                        shuffle=False,
4                        batch_size=64, 
5                        num_epochs=None,
6                        buffer_size=4096,
7                        prefetch_buffer_size=None): 
8
9            table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
10          num_classes = len(labels) 
11
12          def _map_func(filename):
13                label = tf.string_split([filename], delimiter=os.sep).values[-2]
14                image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
15                image = tf.image.convert_image_dtype(image, dtype=tf.float32) 
16                image = tf.image.resize_images(image, size=image_size)
17                return (image, tf.one_hot(table.lookup(label), num_classes))
18
19          dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
20
21          if num_epochs is not None and shuffle:
22                dataset = dataset.apply(
23                    tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs))
24          elif shuffle:
25                dataset = dataset.shuffle(buffer_size)
26          elif num_epochs is not None:
27                dataset = dataset.repeat(num_epochs)
28
29          dataset = dataset.apply(
30                tf.contrib.data.map_and_batch(map_func=_map_func,
31                                        batch_size=batch_size,
32                                        num_parallel_calls=os.cpu_count()))
33          dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
34
35          return dataset 

When training this model, we will use a pre-trained VGG16 and only retrain its last 5 layers:

keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3),
                                         include_top=False)

output = keras_vgg16.output
output = tf.keras.layers.Flatten()(output)
prediction = tf.keras.layers.Dense(len(labels),
                                  activation=tf.nn.softmax)(output)

model = tf.keras.Model(inputs=keras_vgg16.input,
                      outputs=prediction)

for layer in keras_vgg16.layers[:-4]:
   layer.trainable = False

我,我Everything is ready, you can follow the above steps, and use NUM_GPUS GPU to train our model in a few minutes:

model.compile(loss='categorical_crossentropy',               optimizer=tf.train.AdamOptimizer(),              metrics=['accuracy'])

NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,                                                  config=config)
BATCH_SIZE = 64
EPOCHS = 1

estimator.train(input_fn=lambda:input_fn(train_folder,                                         labels,                                         shuffle=True,                                         batch_size=BATCH_SIZE,                                         buffer_size=2048,                                         num_epochs=EPOCHS,                                         prefetch_buffer_size=4),                hooks=[time_hist])

After training, we can evaluate the accuracy of the test set, which should be around 95% (on the initial baseline) Say it's not bad):

estimator.evaluate(input_fn=lambda:input_fn(test_folder,
                                           labels, 
                                           shuffle=False,
                                           batch_size=BATCH_SIZE,
                                           buffer_size=1024,
                                           num_epochs=1))

Fashion-MNIST training throughput and total time when training with two K80 GPUs, using different NUM_GPUS, showing linear scaling

summary We introduced above Easily train the Keras deep learning model on multiple GPUs using the estimator API, how to write best-practice input pipelines to take advantage of our resources (linear scaling), and how to time our training throughput through hooks.

Please be aware that in the end we are mainly concerned with test set errors. You may notice that the accuracy of the test set decreases as the NUM_GPUS value increases. One reason for this may be that MirroredStrategy can effectively train the model when using the batch size of BATCH_SIZE*NUM_GPUS, and we may need to adjust the BATCH_SIZE or learning rate as we increase the number of GPUs. For ease of drawing, all other hyperparameters except NUM_GPUS remain unchanged, but in fact we need to adjust these hyperparameters.

The size of the dataset and model also affects the scaling of these scenarios. The GPU's bandwidth is poor when reading or writing small data, especially if it is a relatively old GPU (such as the K80), and may cause the situation shown in the Fashion-MNIST diagram above.

致谢Thanks to the TensorFlow team, especially Josh Gordon, and colleagues from Zalando Research, especially Duncan Blythe, Gokhan Yildirim and Sebastian Heinz for their help in editing the draft.