Environment setup: see dsnote "Setup Tensorflow GPU on CentOS 7".
Multiple GPUs on single host
The corresponding strategy for this scenario is
tf.distribute.MirroredStrategy
.
The demo codes:
import tensorflow as tf
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
model.compile(loss='mse', optimizer='sgd')
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
import numpy as np
inputs, targets = np.ones((100,1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Another demo (verified on 50 server, 2020.6.4):
import tensorflow as tf
mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
model.evaluate(x_test, y_test)
Multiple GPUs on multiple hosts
MultiWorkerMirroredStrategy
Single node demo
The following codes are for verification on each worker of the cluster:
# fix issue #24496
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
import tensorflow as tf
import numpy as np
import os
import json
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Multi-workers on the same host
不论多个 worker 是不是在一台主机上,每个 worker 对应一个模型训练脚本, 这些脚本除了 TF_CONFIG.task.index 不同,其他完全一样。
依次执行这些脚本,先启动的会等待后启动的,直到最后一个 worker 启动后开始训练。
下面的脚本复制一份并将 TF_CONFIG.task.index 改为 1 后, 在 50 (TF 2.1.0) 和 220 (TF 1.14.0) 上运行成功:
# fix issue #24496 of TF 2.x
# unnecessary for tf 1.x
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
import tensorflow as tf
import numpy as np
import os
import json
# put this before model definitions to avoid issue #34568
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
num_workers = 4
per_worker_batch_size = 64
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
# according to issue #35100, the followoing warning is superfluous:
# Error occurred when finalizing GeneratorDataset iterator: Cancelled: Operation was cancelled
Multi-workers on different hosts
注意不同主机必须在同一个子网里,下面是 220 和 221 服务器 avatar 的 ~/dist_tf/extip.py 文件:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["172.15.106.220:12345", "172.15.106.221:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
221 上只修改 task.index:
...
'task': {'type': 'worker', 'index': 1}
...
在 220 和 221 上分别运行:
/opt/app/anaconda3/bin/python extip.py
.
Fix TF bug Could not create cudnn handle: CUDNN_STATUS_INTERNAL_ERROR
:
see tf issue #24496.
Multi-workers on different hosts for TF 2.x
运行脚本与上节相同,但 49 上的配置为:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["192.168.2.49:12345", "192.168.2.50:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
50 的配置为:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["192.168.2.49:12345", "192.168.2.50:23456"]
},
'task': {'type': 'worker', 'index': 1}
})
分别在两台主机上启动脚本,由于 tensorfow Keras API 的 bug issue 33531, 在计算第一个 epoch 时卡死,日志中有下面的错误:
Error Found an unshardable source dataset: name "TensorSliceDataset/_2"
社区目前还没有 fix 这个 bug, 基于 Keras API 的多主机多节点的分布式训练目前不可用。
Deprecated:
import tensorflow as tf
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
# or specify the implementation of collective ops: NCCL (the other one is gRPC)
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(tf.distribute.experimental.CollectiveCommunication.NCCL)
ParameterServerStrategy
In this setup, some nodes(machines) are designated as workers and some as parameter servers. Each variable of the model is placed on one parameter server. Computation is replicated across all GPUs of all workers.
This is suitable for the scenario where you have some machines without GPUs, but on the other machines GPUs is installed.
根据 Distributed training with TensorFlow, TF.Keras 在 2.3 之后支持 ParameterServerStrategy, 目前的 Keras 分布式教程 Multi-worker training with Keras 只使用 MultiWorkerMirroredStrategy, 所以等后续 Keras 支持 ParameterServerStrategy 后再验证。
Ref:
-
Chapter 19 of Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow, 2nd edition.