[sgd] Merge sharded param server based SGD implementation (#3033)

This includes most of the TF code used for the OSDI experiment. Perf sanity check on p3.16xl instances: Overall scaling looks ok, with the multi-node results within 5% of OSDI final numbers. This seems reasonable given that hugepages are not enabled here, and the param server shards are placed randomly.

$ RAY_USE_XRAY=1 ./test_sgd.py --gpu --batch-size=64 --num-workers=N \
  --devices-per-worker=M --strategy=<simple|ps> \
  --warmup --object-store-memory=10000000000

Images per second total
gpus total              | simple | ps
========================================
1                       | 218
2 (1 worker)            | 388
4 (1 worker)            | 759
4 (2 workers)           | 176    | 623
8 (1 worker)            | 985
8 (2 workers)           | 349    | 1031
16 (2 nodes, 2 workers) | 600    | 1661
16 (2 nodes, 4 workers) | 468    | 1712   <--- OSDI perf was 1817
This commit is contained in:
Eric Liang
2018-10-27 21:25:02 -07:00
committed by GitHub
parent befbf78048
commit af0c1174cd
9 changed files with 598 additions and 366 deletions
+26
View File
@@ -0,0 +1,26 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
class Model(object):
"""Your class must implement this interface to be used with Ray SGD.
This supports any form of input pipeline: it is up to you to define it
using TensorFlow. The only requirements are that the loss and optimizer
attributes must be defined.
For an example implementation, see tfbench/test_model.py
Attributes:
loss (tf.Tensor): Loss function to minimize.
optimizer (tf.train.Optimizer): Optimizer to use to minimize the loss.
"""
def get_feed_dict(self):
"""Extra values to pass in when computing gradients for the loss.
Returns:
TensorFlow feed_dict to add to the gradient operation.
"""
return {}
@@ -0,0 +1,82 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import numpy as np
import ray
from ray.experimental.sgd.util import Timeline, fetch, warmup
logger = logging.getLogger(__name__)
@ray.remote(num_cpus=0)
class ParameterServer(object):
"""Helper class for ray.experimental.sgd.DistributedSGD."""
def __init__(self, num_workers, tid):
self.num_sgd_workers = num_workers
self.acc_counter = 0
self.timeline = Timeline(tid)
# TODO(ekl) get this to work again so we get ray events
# self.timeline.patch_ray()
def initialize(self, shard_shape):
"""Resets the gradient buffer to zeros."""
self.accumulated = np.zeros(shard_shape, dtype=np.float32)
def prefetch(self, oids):
"""Tell plasma to prefetch the given object ids over the network."""
self.timeline.reset()
self.timeline.start("prefetch")
fetch(oids)
self.timeline.end("prefetch")
def add_spinwait(self, grad_shard_ids):
"""Optimized version of add() that operates on multiple grads."""
self.timeline.start("add_spinwait")
plasma_ids = [ray.pyarrow.plasma.ObjectID(x) for x in grad_shard_ids]
while plasma_ids:
for p in plasma_ids:
if ray.worker.global_worker.plasma_client.contains(p):
self.timeline.start("get_buffers")
grads = ray.worker.global_worker.plasma_client.get(p)
self.accumulated += grads
self.acc_counter += 1
self.timeline.end("get_buffers")
plasma_ids.remove(p)
break
self.timeline.end("add_spinwait")
def add(self, grad_shard_id):
"""Add the given gradient value to the accumulated gradients."""
self.timeline.start("add")
self.timeline.start("get_buffers")
oid = ray.pyarrow.plasma.ObjectID(grad_shard_id)
grads = ray.worker.global_worker.plasma_client.get(oid)
self.timeline.end("get_buffers")
self.accumulated += grads
self.acc_counter += 1
self.timeline.end("add")
def get(self, object_id):
"""Put the accumulated gradients to the given object id."""
self.timeline.start("get")
client = ray.worker.global_worker.plasma_client
assert self.acc_counter == self.num_sgd_workers, self.acc_counter
oid = ray.pyarrow.plasma.ObjectID(object_id)
client.put(self.accumulated.flatten(), object_id=oid)
self.accumulated = np.zeros_like(self.accumulated)
self.acc_counter = 0
self.timeline.end("get")
def get_timeline(self):
return self.timeline
def ip(self):
return ray.services.get_node_ip_address()
def warmup(self):
warmup()
+153 -345
View File
@@ -7,336 +7,177 @@ import random
import time
import numpy as np
import pyarrow.plasma as plasma
import tensorflow as tf
import ray
from ray.experimental.sgd.util import Timeline, fetch, run_timeline
from ray.experimental.sgd.modified_allreduce import sum_gradients_all_reduce, \
unpack_small_tensors
from ray.experimental.sgd.sgd_worker import SGDWorker
from ray.experimental.sgd.param_server import ParameterServer
logger = logging.getLogger(__name__)
class SGDWorker(object):
class DistributedSGD(object):
"""Experimental distributed SGD implementation in Ray.
This supports two modes:
'simple': centralized gradient aggregation
'ps': sharded parameter-server implementation
To use this class, you'll have to implement model.py:Model.
Arguments:
model_creator (func): Function that returns a model given worker and
device indexes as arguments. Each model replica will be created
within its own variable scope.
num_workers (int): Number of Ray actors to use for SGD.
devices_per_worker (int): Number of GPU or CPU devices to use per
worker. One model replica will be created per device.
gpu (bool): Whether to use GPU devices.
strategy (str): Strategy to use for distributed gradient aggregation.
This only applies if num_workers > 1.
grad_shard_bytes (int): Fuse gradient tensors into chunks of at most
this size (if applicable).
all_reduce_alg (str): TensorFlow strategy to use for gradient
synchronization within the same worker (if applicable).
See modified_allreduce.py for options.
Examples:
>>> # Setup distributed SGD
>>> model_creator = (
... lambda worker_idx, device_idx: YourModelClass(...))
>>> sgd = DistributedSGD(
... model_creator, num_workers=2,
... devices_per_worker=4, gpu=True, strategy="ps")
>>> # To train
>>> for i in range(100):
... stats = sgd.step(fetch_stats=i % 10 == 0)
>>> # To access or update model state
>>> sgd.foreach_model(lambda model: ...)
>>> # To access or update worker state
>>> sgd.foreach_worker(lambda worker: ...)
"""
def __init__(self,
worker_index,
model_creator,
all_reduce_alg="simple",
num_devices=1,
use_cpus=False,
max_bytes=60000000,
plasma_op=False):
self.worker_index = worker_index
assert num_devices > 0
num_workers,
devices_per_worker,
gpu=True,
strategy="ps",
grad_shard_bytes=10000000,
all_reduce_alg="simple"):
# TODO(ekl) support custom session
tf_session_args = {
"device_count": {
"CPU": num_devices
},
"log_device_placement": False,
"gpu_options": tf.GPUOptions(force_gpu_compatible=True),
"inter_op_parallelism_threads": 128,
}
config_proto = tf.ConfigProto(**tf_session_args)
self.sess = tf.Session(config=config_proto)
self.models = []
grad_ops = []
if num_workers == 1 and strategy == "ps":
logger.warn(
"The parameter server strategy does not make sense for single "
"worker operation, falling back to simple mode.")
strategy = "simple"
if use_cpus:
device_tmpl = "/cpu:%d"
if strategy == "ps":
use_plasma_op = True
elif strategy == "simple":
use_plasma_op = False
grad_shard_bytes = 0 # tensor fusion doesn't make sense
else:
device_tmpl = "/gpu:%d"
for device_idx in range(num_devices):
device = device_tmpl % device_idx
with tf.device(device):
with tf.variable_scope("device_%d" % device_idx):
model = model_creator(worker_index, device_idx)
self.models.append(model)
model.grads = [
t
for t in model.optimizer.compute_gradients(model.loss)
if t[0] is not None
]
grad_ops.append(model.grads)
raise ValueError("strategy must be one of 'ps', 'simple'")
self.strategy = strategy
if num_devices == 1:
assert not max_bytes, "Not supported with 1 GPU"
self.packed_grads_and_vars = grad_ops
self.model_creator = model_creator
if gpu:
requests = {"num_gpus": devices_per_worker}
else:
if max_bytes:
self.packed_grads_and_vars, packing_vals = (
sum_gradients_all_reduce(
"",
grad_ops,
1,
all_reduce_alg,
1,
list(range(num_devices)),
agg_small_grads_max_bytes=max_bytes))
else:
self.packed_grads_and_vars, _ = (sum_gradients_all_reduce(
"",
grad_ops,
1,
all_reduce_alg,
1,
list(range(num_devices)),
agg_small_grads_max_bytes=0))
self.per_device_grads = [
list(zip(*dev_gv))[0] for dev_gv in self.packed_grads_and_vars
]
assert (len(self.per_device_grads) == num_devices)
self.num_grads = num_grads = len(self.packed_grads_and_vars[0])
if max_bytes:
logger.info("Packed grads => {} tensors".format(num_grads))
requests = {"num_cpus": devices_per_worker}
# Ops for reading grads with the right control deps
nccl_noops = []
for j in range(num_grads)[::-1]:
deps = nccl_noops + [
dev_grad[j] for dev_grad in self.per_device_grads
RemoteSGDWorker = ray.remote(**requests)(SGDWorker)
self.workers = []
logger.info("Creating SGD workers ({} total)".format(num_workers))
for worker_index in range(num_workers):
self.workers.append(
RemoteSGDWorker.remote(
worker_index,
model_creator,
num_devices=devices_per_worker,
plasma_op=use_plasma_op,
gpu=gpu,
max_bytes=grad_shard_bytes,
all_reduce_alg=all_reduce_alg))
logger.info("Waiting for gradient configuration")
shard_shapes = ray.get(self.workers[0].shard_shapes.remote())
logger.info("Waiting for actors to start")
ray.get([w.shard_shapes.remote() for w in self.workers])
if strategy == "ps":
logger.info("Starting parameter servers ({} shards)".format(
len(shard_shapes)))
self.ps_list = [
ParameterServer.remote(len(self.workers), i)
for i, s in enumerate(shard_shapes)
]
with tf.control_dependencies(deps):
nccl_noops = [tf.no_op()]
# You must fetch this otherwise the NCCL allreduce will hang
self.nccl_control_out = tf.group(*nccl_noops)
round_robin_devices = False
if plasma_op:
store_socket = (
ray.worker.global_worker.plasma_client.store_socket_name)
if not plasma.tf_plasma_op:
plasma.build_plasma_tensorflow_op()
# For fetching grads -> plasma
self.plasma_in_grads = []
self.plasma_in_grads_oids = [
tf.placeholder(shape=[], dtype=tf.string, name="in_grad_oids")
for _ in range(num_grads)
]
ix = 0
for j in range(num_grads):
grad = self.per_device_grads[ix][j]
if round_robin_devices:
ix += 1 # round robin assignment
ix %= num_devices
with tf.device(self.models[ix].loss.device):
plasma_grad = plasma.tf_plasma_op.tensor_to_plasma(
[grad],
self.plasma_in_grads_oids[j],
plasma_store_socket_name=store_socket,
plasma_manager_socket_name="")
self.plasma_in_grads.append(plasma_grad)
# For applying grads <- plasma
unpacked_gv = []
self.plasma_out_grads_oids = [
tf.placeholder(
shape=[], dtype=tf.string, name="grad_out_oids")
for _ in range(num_grads)
]
packed_plasma_grads = []
ix = 0
for j in range(num_grads):
with tf.device(self.plasma_in_grads[j].device):
with tf.control_dependencies([self.plasma_in_grads[j]]):
grad_ph = plasma.tf_plasma_op.plasma_to_tensor(
self.plasma_out_grads_oids[j],
dtype=tf.float32,
plasma_store_socket_name=store_socket,
plasma_manager_socket_name="")
grad_ph = tf.reshape(grad_ph,
self.packed_grads_and_vars[0][j][0].shape)
logger.debug("Packed tensor {}".format(grad_ph))
packed_plasma_grads.append(grad_ph)
for i in range(num_devices):
per_device = []
for j, (g, v) in enumerate(self.packed_grads_and_vars[i]):
grad_ph = packed_plasma_grads[j]
per_device.append((grad_ph, v))
unpacked_gv.append(per_device)
if max_bytes:
unpacked_gv = unpack_small_tensors(unpacked_gv, packing_vals)
elif max_bytes:
unpacked_gv = unpack_small_tensors(self.packed_grads_and_vars,
packing_vals)
ray.get([
ps.initialize.remote(s)
for ps, s in zip(self.ps_list, shard_shapes)
])
logger.info("Parameter servers started")
else:
unpacked_gv = self.packed_grads_and_vars
# Same shape as packed_grads_and_vars
assert len(unpacked_gv) == num_devices
assert len(unpacked_gv[0][0]) == 2
apply_ops = []
to_apply = unpacked_gv[0]
for ix, m in enumerate(self.models):
apply_ops.append(
m.optimizer.apply_gradients(
[(g, v)
for ((g, _), (_, v)) in zip(to_apply, unpacked_gv[ix])]))
self.apply_op = tf.group(*apply_ops)
init_op = tf.group(tf.global_variables_initializer(),
tf.local_variables_initializer())
self.sess.run(init_op)
def foreach_model(self, fn):
return [fn(m) for m in self.models]
self.ps_list = []
def foreach_worker(self, fn):
return fn(self)
"""Apply the given function to each remote worker.
def compute_gradients(self):
start = time.time()
feed_dict = {}
# Aggregate feed dicts for each model on this worker.
for model in self.models:
feed_dict.update(model.get_feed_dict())
# We only need to fetch the first per_device_grad, since they are
# averaged across all devices by allreduce.
fetches = self.sess.run(
[
self.models[0].loss, self.per_device_grads[0],
self.nccl_control_out
],
feed_dict=feed_dict)
logger.debug(
"compute grad interior time {}".format(time.time() - start))
return fetches
Returns:
List of results from applying the function.
"""
results = ray.get([w.foreach_worker.remote(fn) for w in self.workers])
return results
def apply_gradients(self, avg_grads):
start = time.time()
result = {
g: avg_grads[i]
for (i, g) in enumerate(self.per_device_grads[0])
}
self.sess.run(self.apply_op, feed_dict=result)
logger.debug("apply grad interior time {}".format(time.time() - start))
def foreach_model(self, fn):
"""Apply the given function to each model replica in each worker.
def ps_compute_apply(self,
out_grad_shard_oids,
agg_grad_shard_oids,
tl_name="ps_compute_apply",
write_timeline=False):
feed_dict = {
ph: oid
for (ph,
oid) in zip(self.plasma_in_grads_oids, out_grad_shard_oids)
}
feed_dict.update({
ph: oid
for (ph,
oid) in zip(self.plasma_out_grads_oids, agg_grad_shard_oids)
})
fetch(agg_grad_shard_oids)
run_timeline(
self.sess,
[self.plasma_in_grads, self.apply_op, self.nccl_control_out],
feed_dict=feed_dict,
write_timeline=write_timeline)
Returns:
List of results from applying the function.
"""
results = ray.get([w.foreach_model.remote(fn) for w in self.workers])
out = []
for r in results:
out.extend(r)
return r
def num_grad_shards(self):
return self.num_grads
def step(self, fetch_stats=False):
"""Run a single SGD step.
def shard_shapes(self):
main_gv = self.packed_grads_and_vars[0]
return [g.shape for g, _ in main_gv]
Arguments:
fetch_stats (bool): Whether to return stats from the step. This can
slow down the computation by acting as a global barrier.
"""
if self.strategy == "ps":
return _distributed_sgd_step(
self.workers,
self.ps_list,
write_timeline=False,
fetch_stats=fetch_stats)
else:
return _simple_sgd_step(self.workers)
def ip(self):
return ray.services.get_node_ip_address()
def warmup(self):
logger.info("Warming up object store of worker actors")
ray.get([w.warmup.remote() for w in self.workers])
logger.info("Warmup complete")
class ParameterServer(object):
def __init__(self, num_workers, tid):
self.num_sgd_workers = num_workers
self.acc_counter = 0
self.timeline = Timeline(tid)
self.timeline.patch_ray()
def set_tid(self, tid):
self.timeline.tid = tid
def get_time(self):
return time.time() + self.timeline.offset
def set_time(self, ref_time):
self.timeline.offset = ref_time - time.time()
def initialize(self, shard_shape):
self.accumulated = np.zeros(shard_shape, dtype=np.float32)
def mark(self):
self.timeline.event("mark")
def prefetch(self, oids):
self.timeline.reset()
self.timeline.start("prefetch")
fetch(oids)
self.timeline.end("prefetch")
def add_spinwait(self, grad_shard_ids):
self.timeline.start("add_spinwait")
plasma_ids = [ray.pyarrow.plasma.ObjectID(x) for x in grad_shard_ids]
while plasma_ids:
for p in plasma_ids:
if ray.worker.global_worker.plasma_client.contains(p):
self.timeline.start("get_buffers")
grads = ray.worker.global_worker.plasma_client.get(p)
self.accumulated += grads
self.acc_counter += 1
self.timeline.end("get_buffers")
plasma_ids.remove(p)
break
self.timeline.end("add_spinwait")
def add(self, grad_shard_id):
self.timeline.start("add")
self.timeline.start("get_buffers")
oid = ray.pyarrow.plasma.ObjectID(grad_shard_id)
grads = ray.worker.global_worker.plasma_client.get(oid)
self.timeline.end("get_buffers")
self.accumulated += grads
self.acc_counter += 1
self.timeline.end("add")
def get(self, object_id):
self.timeline.start("get")
client = ray.worker.global_worker.plasma_client
assert self.acc_counter == self.num_sgd_workers, self.acc_counter
oid = ray.pyarrow.plasma.ObjectID(object_id)
client.put(self.accumulate.flatten(), object_id=oid)
self.accumulated = np.zeros_like(self.accumulated)
self.acc_counter = 0
self.timeline.end("get")
def get_timeline(self):
return self.timeline
def ip(self):
return ray.services.get_node_ip_address()
def pin(self, cpu_id):
try:
import psutil
p = psutil.Process()
p.cpu_affinity([cpu_id])
logger.info("Setting CPU Affinity to: {}".format(cpu_id))
except Exception as e:
logger.error(e)
def average_gradients(grads):
def _average_gradients(grads):
out = []
for grad_list in zip(*grads):
out.append(np.mean(grad_list, axis=0))
return out
def do_sgd_step(actors):
def _simple_sgd_step(actors):
if len(actors) == 1:
return ray.get(actors[0].compute_apply.remote())
start = time.time()
fetches = ray.get([a.compute_gradients.remote() for a in actors])
losses = [f[0] for f in fetches]
@@ -347,7 +188,7 @@ def do_sgd_step(actors):
assert len(grads) == 1
avg_grad = grads[0]
else:
avg_grad = average_gradients(grads)
avg_grad = _average_gradients(grads)
logger.debug("grad reduce time {}".format(time.time() - start))
start = time.time()
ray.get([a.apply_gradients.remote(avg_grad) for a in actors])
@@ -355,20 +196,24 @@ def do_sgd_step(actors):
return np.mean(losses)
def distributed_sgd_step(actors, ps_list, write_timeline):
def _distributed_sgd_step(actors, ps_list, fetch_stats, write_timeline):
# Preallocate object ids that actors will write gradient shards to
grad_shard_oids_list = [[np.random.bytes(20) for _ in ps_list]
for _ in actors]
logger.info("generated grad oids")
logger.info("Generated grad oids")
# Preallocate object ids that param servers will write new weights to
accum_shard_ids = [np.random.bytes(20) for _ in ps_list]
logger.info("generated accum oids")
logger.info("Generated accum oids")
# Kick off the fused compute grad / update weights tf run for each actor
losses = []
for actor, grad_shard_oids in zip(actors, grad_shard_oids_list):
actor.ps_compute_apply.remote(
grad_shard_oids, accum_shard_ids, write_timeline=write_timeline)
losses.append(
actor.ps_compute_apply.remote(
grad_shard_oids,
accum_shard_ids,
write_timeline=write_timeline))
logger.info("Launched all ps_compute_applys on all actors")
# Issue prefetch ops
@@ -401,44 +246,7 @@ def distributed_sgd_step(actors, ps_list, write_timeline):
else:
# Wait for at least the ps gets to finish
ray.get(ps_gets)
class DistributedSGD(object):
def __init__(self,
model_creator,
num_workers,
devices_per_worker,
use_cpus=False,
use_plasma_op=False):
self.model_creator = model_creator
if use_cpus:
requests = {"num_cpus": devices_per_worker}
else:
requests = {"num_gpus": devices_per_worker}
RemoteSGDWorker = ray.remote(**requests)(SGDWorker)
self.workers = []
for worker_index in range(num_workers):
logger.info("Creating worker {}".format(worker_index))
self.workers.append(
RemoteSGDWorker.remote(
worker_index,
model_creator,
num_devices=devices_per_worker,
plasma_op=use_plasma_op,
use_cpus=use_cpus))
assert not use_plasma_op, \
"TODO: when use_plasma_op is true, we must run in PS mode"
def foreach_worker(self, fn):
results = ray.get([w.foreach_worker.remote(fn) for w in self.workers])
return results
def foreach_model(self, fn):
results = ray.get([w.foreach_model.remote(fn) for w in self.workers])
out = []
for r in results:
out.extend(r)
return r
def step(self):
return do_sgd_step(self.workers)
if fetch_stats:
return np.mean(ray.get(losses))
else:
return None
+254
View File
@@ -0,0 +1,254 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import time
import pyarrow.plasma as plasma
import tensorflow as tf
import ray
from ray.experimental.sgd.util import fetch, run_timeline, warmup
from ray.experimental.sgd.modified_allreduce import sum_gradients_all_reduce, \
unpack_small_tensors
logger = logging.getLogger(__name__)
class SGDWorker(object):
"""Helper class for ray.experimental.sgd.DistributedSGD."""
def __init__(self,
worker_index,
model_creator,
all_reduce_alg="simple",
num_devices=1,
gpu=False,
max_bytes=10000000,
plasma_op=False):
self.worker_index = worker_index
assert num_devices > 0
# TODO(ekl) support custom session
tf_session_args = {
"device_count": {
"CPU": num_devices
},
"log_device_placement": False,
"gpu_options": tf.GPUOptions(force_gpu_compatible=True),
"inter_op_parallelism_threads": 128,
}
config_proto = tf.ConfigProto(**tf_session_args)
self.sess = tf.Session(config=config_proto)
self.models = []
grad_ops = []
if gpu:
device_tmpl = "/gpu:%d"
else:
device_tmpl = "/cpu:%d"
for device_idx in range(num_devices):
device = device_tmpl % device_idx
with tf.device(device):
with tf.variable_scope("device_%d" % device_idx):
model = model_creator(worker_index, device_idx)
self.models.append(model)
model.grads = [
t
for t in model.optimizer.compute_gradients(model.loss)
if t[0] is not None
]
grad_ops.append(model.grads)
if num_devices == 1:
assert not max_bytes, \
"grad_shard_bytes > 0 ({}) requires num_devices > 1".format(
max_bytes)
self.packed_grads_and_vars = grad_ops
else:
if max_bytes:
self.packed_grads_and_vars, packing_vals = (
sum_gradients_all_reduce(
"",
grad_ops,
1,
all_reduce_alg,
1,
list(range(num_devices)),
agg_small_grads_max_bytes=max_bytes))
else:
self.packed_grads_and_vars, _ = (sum_gradients_all_reduce(
"",
grad_ops,
1,
all_reduce_alg,
1,
list(range(num_devices)),
agg_small_grads_max_bytes=0))
self.per_device_grads = [
list(zip(*dev_gv))[0] for dev_gv in self.packed_grads_and_vars
]
assert (len(self.per_device_grads) == num_devices)
self.num_grads = num_grads = len(self.packed_grads_and_vars[0])
if max_bytes:
logger.info("Packed grads => {} tensors".format(num_grads))
# Ops for reading grads with the right control deps
nccl_noops = []
for j in range(num_grads)[::-1]:
deps = nccl_noops + [
dev_grad[j] for dev_grad in self.per_device_grads
]
with tf.control_dependencies(deps):
nccl_noops = [tf.no_op()]
# You must fetch this otherwise the NCCL allreduce will hang
self.nccl_control_out = tf.group(*nccl_noops)
if plasma_op:
store_socket = (
ray.worker.global_worker.plasma_client.store_socket_name)
manager_socket = (
ray.worker.global_worker.plasma_client.manager_socket_name)
if not plasma.tf_plasma_op:
plasma.build_plasma_tensorflow_op()
# For fetching grads -> plasma
self.plasma_in_grads = []
self.plasma_in_grads_oids = [
tf.placeholder(shape=[], dtype=tf.string, name="in_grad_oids")
for _ in range(num_grads)
]
for j in range(num_grads):
grad = self.per_device_grads[0][j]
with tf.device(self.models[0].loss.device):
plasma_grad = plasma.tf_plasma_op.tensor_to_plasma(
[grad],
self.plasma_in_grads_oids[j],
plasma_store_socket_name=store_socket,
plasma_manager_socket_name=manager_socket)
self.plasma_in_grads.append(plasma_grad)
# For applying grads <- plasma
unpacked_gv = []
self.plasma_out_grads_oids = [
tf.placeholder(
shape=[], dtype=tf.string, name="grad_out_oids")
for _ in range(num_grads)
]
packed_plasma_grads = []
for j in range(num_grads):
with tf.device(self.plasma_in_grads[j].device):
with tf.control_dependencies([self.plasma_in_grads[j]]):
grad_ph = plasma.tf_plasma_op.plasma_to_tensor(
self.plasma_out_grads_oids[j],
dtype=tf.float32,
plasma_store_socket_name=store_socket,
plasma_manager_socket_name=manager_socket)
grad_ph = tf.reshape(grad_ph,
self.packed_grads_and_vars[0][j][0].shape)
logger.debug("Packed tensor {}".format(grad_ph))
packed_plasma_grads.append(grad_ph)
for i in range(num_devices):
per_device = []
for j, (g, v) in enumerate(self.packed_grads_and_vars[i]):
grad_ph = packed_plasma_grads[j]
per_device.append((grad_ph, v))
unpacked_gv.append(per_device)
if max_bytes:
unpacked_gv = unpack_small_tensors(unpacked_gv, packing_vals)
elif max_bytes:
unpacked_gv = unpack_small_tensors(self.packed_grads_and_vars,
packing_vals)
else:
unpacked_gv = self.packed_grads_and_vars
# Same shape as packed_grads_and_vars
assert len(unpacked_gv) == num_devices
assert len(unpacked_gv[0][0]) == 2
apply_ops = []
to_apply = unpacked_gv[0]
for ix, m in enumerate(self.models):
apply_ops.append(
m.optimizer.apply_gradients(
[(g, v)
for ((g, _), (_, v)) in zip(to_apply, unpacked_gv[ix])]))
self.apply_op = tf.group(*apply_ops)
init_op = tf.group(tf.global_variables_initializer(),
tf.local_variables_initializer())
self.sess.run(init_op)
def foreach_model(self, fn):
return [fn(m) for m in self.models]
def foreach_worker(self, fn):
return fn(self)
def compute_gradients(self):
start = time.time()
feed_dict = {}
# Aggregate feed dicts for each model on this worker.
for model in self.models:
feed_dict.update(model.get_feed_dict())
# We only need to fetch the first per_device_grad, since they are
# averaged across all devices by allreduce.
fetches = self.sess.run(
[
self.models[0].loss, self.per_device_grads[0],
self.nccl_control_out
],
feed_dict=feed_dict)
logger.debug(
"Compute grad interior time {}".format(time.time() - start))
return fetches
def apply_gradients(self, avg_grads):
start = time.time()
result = {
g: avg_grads[i]
for (i, g) in enumerate(self.per_device_grads[0])
}
self.sess.run(self.apply_op, feed_dict=result)
logger.debug("Apply grad interior time {}".format(time.time() - start))
def compute_apply(self):
fetches = run_timeline(
self.sess,
[self.models[0].loss, self.apply_op, self.nccl_control_out],
name="compute_apply")
return fetches[0]
def ps_compute_apply(self,
out_grad_shard_oids,
agg_grad_shard_oids,
tl_name="ps_compute_apply",
write_timeline=False):
feed_dict = dict(zip(self.plasma_in_grads_oids, out_grad_shard_oids))
feed_dict.update(
dict(zip(self.plasma_out_grads_oids, agg_grad_shard_oids)))
fetch(agg_grad_shard_oids)
fetches = run_timeline(
self.sess, [
self.models[0].loss, self.plasma_in_grads, self.apply_op,
self.nccl_control_out
],
feed_dict=feed_dict,
write_timeline=write_timeline)
return fetches[0]
def num_grad_shards(self):
return self.num_grads
def shard_shapes(self):
main_gv = self.packed_grads_and_vars[0]
return [g.shape for g, _ in main_gv]
def ip(self):
return ray.services.get_node_ip_address()
def warmup(self):
warmup()
+46 -11
View File
@@ -1,32 +1,67 @@
#!/usr/bin/env python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import time
import ray
from ray.experimental.sgd.tfbench.test_model import TFBenchModel
from ray.experimental.sgd.sgd import DistributedSGD
parser = argparse.ArgumentParser()
parser.add_argument("--redis-address", default=None, type=str)
parser.add_argument("--num-iters", default=10, type=int)
parser.add_argument("--batch-size", default=1, type=int)
parser.add_argument("--num-workers", default=2, type=int)
parser.add_argument("--grad-shard-bytes", default=10000000, type=int)
parser.add_argument("--devices-per-worker", default=2, type=int)
parser.add_argument("--stats-interval", default=10, type=int)
parser.add_argument("--all-reduce-alg", default="simple", type=str)
parser.add_argument("--object-store-memory", default=None, type=int)
parser.add_argument(
"--num-iters", default=100, type=int, help="Number of iterations to run")
"--warmup", action="store_true", help="Warm up object store before start.")
parser.add_argument(
"--strategy", default="ps", type=str, help="One of 'simple' or 'ps'")
parser.add_argument(
"--gpu", action="store_true", help="Use GPUs for optimization")
if __name__ == "__main__":
ray.init()
args, _ = parser.parse_known_args()
ray.init(
redis_address=args.redis_address,
object_store_memory=args.object_store_memory)
model_creator = (
lambda worker_idx, device_idx: TFBenchModel(batch=1, use_cpus=True))
lambda worker_idx, device_idx: TFBenchModel(
batch=args.batch_size, use_cpus=not args.gpu))
sgd = DistributedSGD(
model_creator,
num_workers=2,
devices_per_worker=2,
use_cpus=True,
use_plasma_op=False)
num_workers=args.num_workers,
devices_per_worker=args.devices_per_worker,
gpu=args.gpu,
strategy=args.strategy,
grad_shard_bytes=args.grad_shard_bytes,
all_reduce_alg=args.all_reduce_alg)
for _ in range(args.num_iters):
loss = sgd.step()
print("Current loss", loss)
if args.warmup:
sgd.warmup()
t = []
for i in range(args.num_iters):
start = time.time()
fetch_stats = i % args.stats_interval == 0
print("== Step {} ==".format(i))
stats = sgd.step(fetch_stats=fetch_stats)
ips = ((args.batch_size * args.num_workers * args.devices_per_worker) /
(time.time() - start))
print("Iteration time", time.time() - start, "Images per second", ips)
t.append(ips)
if fetch_stats:
print("Current loss", stats)
print("Peak throughput", max(sum(t[i:i + 5]) / 5 for i in range(len(t))))
@@ -5,13 +5,14 @@ from __future__ import print_function
import tensorflow as tf
from tfbench import model_config
from ray.experimental.sgd.model import Model
class MockDataset():
name = "synthetic"
class TFBenchModel(object):
class TFBenchModel(Model):
def __init__(self, batch=64, use_cpus=False):
image_shape = [batch, 224, 224, 3]
labels_shape = [batch]
@@ -25,20 +26,22 @@ class TFBenchModel(object):
name='synthetic_images')
# Minor hack to avoid H2D copy when using synthetic data
self.inputs = tf.contrib.framework.local_variable(
inputs = tf.contrib.framework.local_variable(
images, name='gpu_cached_images')
self.labels = tf.random_uniform(
labels = tf.random_uniform(
labels_shape,
minval=0,
maxval=999,
dtype=tf.int32,
name='synthetic_labels')
self.model = model_config.get_model_config("resnet101", MockDataset())
logits, aux = self.model.build_network(
self.inputs, data_format=use_cpus and "NHWC" or "NCHW")
model = model_config.get_model_config("resnet101", MockDataset())
logits, aux = model.build_network(
inputs, data_format=use_cpus and "NHWC" or "NCHW")
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
logits=logits, labels=self.labels)
logits=logits, labels=labels)
# Implement model interface
self.loss = tf.reduce_mean(loss, name='xentropy-loss')
self.optimizer = tf.train.GradientDescentOptimizer(1e-6)
+19
View File
@@ -4,6 +4,7 @@ from __future__ import print_function
import json
import logging
import numpy as np
import os
import time
import tensorflow as tf
@@ -13,6 +14,24 @@ import ray
logger = logging.getLogger(__name__)
def warmup():
logger.info("Warming up object store")
zeros = np.zeros(int(100e6 / 8), dtype=np.float64)
start = time.time()
for _ in range(10):
ray.put(zeros)
logger.info("Initial latency for 100MB put {}".format(
(time.time() - start) / 10))
for _ in range(5):
for _ in range(100):
ray.put(zeros)
start = time.time()
for _ in range(10):
ray.put(zeros)
logger.info("Warmed up latency for 100MB put {}".format(
(time.time() - start) / 10))
def fetch(oids):
local_sched_client = ray.worker.global_worker.local_scheduler_client
for o in oids: