From af0c1174cd35a65ec3ca3630b999577fd4614578 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 27 Oct 2018 21:25:02 -0700 Subject: [PATCH] [sgd] Merge sharded param server based SGD implementation (#3033) This includes most of the TF code used for the OSDI experiment. Perf sanity check on p3.16xl instances: Overall scaling looks ok, with the multi-node results within 5% of OSDI final numbers. This seems reasonable given that hugepages are not enabled here, and the param server shards are placed randomly. $ RAY_USE_XRAY=1 ./test_sgd.py --gpu --batch-size=64 --num-workers=N \ --devices-per-worker=M --strategy= \ --warmup --object-store-memory=10000000000 Images per second total gpus total | simple | ps ======================================== 1 | 218 2 (1 worker) | 388 4 (1 worker) | 759 4 (2 workers) | 176 | 623 8 (1 worker) | 985 8 (2 workers) | 349 | 1031 16 (2 nodes, 2 workers) | 600 | 1661 16 (2 nodes, 4 workers) | 468 | 1712 <--- OSDI perf was 1817 --- doc/source/rllib-training.rst | 2 +- python/ray/experimental/sgd/model.py | 26 + python/ray/experimental/sgd/param_server.py | 82 +++ python/ray/experimental/sgd/sgd.py | 498 ++++++------------ python/ray/experimental/sgd/sgd_worker.py | 254 +++++++++ python/ray/experimental/sgd/test_sgd.py | 57 +- .../experimental/sgd/tfbench/test_model.py | 17 +- python/ray/experimental/sgd/util.py | 19 + test/jenkins_tests/run_multi_node_tests.sh | 9 +- 9 files changed, 598 insertions(+), 366 deletions(-) create mode 100644 python/ray/experimental/sgd/model.py create mode 100644 python/ray/experimental/sgd/param_server.py create mode 100644 python/ray/experimental/sgd/sgd_worker.py diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index 583a0ccae..7d11bad07 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -78,7 +78,7 @@ In an example below, we train A2C by specifying 8 workers through the config fla Specifying Resources ~~~~~~~~~~~~~~~~~~~~ -You can control the degree of parallelism used by setting the ``num_workers`` hyperparameter for most agents. Many agents also provide a ``num_gpus`` or ``gpu`` option. In addition, you can allocate a fraction of a GPU by setting ``gpu_fraction: f``. For example, with DQN you can pack five agents onto one GPU by setting ``gpu_fraction: 0.2``. Note that fractional GPU support requires enabling the experimental X-ray backend by setting the environment variable ``RAY_USE_XRAY=1``. +You can control the degree of parallelism used by setting the ``num_workers`` hyperparameter for most agents. Many agents also provide a ``num_gpus`` or ``gpu`` option. In addition, you can allocate a fraction of a GPU by setting ``gpu_fraction: f``. For example, with DQN you can pack five agents onto one GPU by setting ``gpu_fraction: 0.2``. Note that in Ray < 0.6.0 fractional GPU support requires setting the environment variable ``RAY_USE_XRAY=1``. Common Parameters ~~~~~~~~~~~~~~~~~ diff --git a/python/ray/experimental/sgd/model.py b/python/ray/experimental/sgd/model.py new file mode 100644 index 000000000..ac8e0eedf --- /dev/null +++ b/python/ray/experimental/sgd/model.py @@ -0,0 +1,26 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +class Model(object): + """Your class must implement this interface to be used with Ray SGD. + + This supports any form of input pipeline: it is up to you to define it + using TensorFlow. The only requirements are that the loss and optimizer + attributes must be defined. + + For an example implementation, see tfbench/test_model.py + + Attributes: + loss (tf.Tensor): Loss function to minimize. + optimizer (tf.train.Optimizer): Optimizer to use to minimize the loss. + """ + + def get_feed_dict(self): + """Extra values to pass in when computing gradients for the loss. + + Returns: + TensorFlow feed_dict to add to the gradient operation. + """ + return {} diff --git a/python/ray/experimental/sgd/param_server.py b/python/ray/experimental/sgd/param_server.py new file mode 100644 index 000000000..517d419c3 --- /dev/null +++ b/python/ray/experimental/sgd/param_server.py @@ -0,0 +1,82 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging + +import numpy as np + +import ray +from ray.experimental.sgd.util import Timeline, fetch, warmup + +logger = logging.getLogger(__name__) + + +@ray.remote(num_cpus=0) +class ParameterServer(object): + """Helper class for ray.experimental.sgd.DistributedSGD.""" + + def __init__(self, num_workers, tid): + self.num_sgd_workers = num_workers + self.acc_counter = 0 + self.timeline = Timeline(tid) + # TODO(ekl) get this to work again so we get ray events + # self.timeline.patch_ray() + + def initialize(self, shard_shape): + """Resets the gradient buffer to zeros.""" + self.accumulated = np.zeros(shard_shape, dtype=np.float32) + + def prefetch(self, oids): + """Tell plasma to prefetch the given object ids over the network.""" + self.timeline.reset() + self.timeline.start("prefetch") + fetch(oids) + self.timeline.end("prefetch") + + def add_spinwait(self, grad_shard_ids): + """Optimized version of add() that operates on multiple grads.""" + self.timeline.start("add_spinwait") + plasma_ids = [ray.pyarrow.plasma.ObjectID(x) for x in grad_shard_ids] + while plasma_ids: + for p in plasma_ids: + if ray.worker.global_worker.plasma_client.contains(p): + self.timeline.start("get_buffers") + grads = ray.worker.global_worker.plasma_client.get(p) + self.accumulated += grads + self.acc_counter += 1 + self.timeline.end("get_buffers") + plasma_ids.remove(p) + break + self.timeline.end("add_spinwait") + + def add(self, grad_shard_id): + """Add the given gradient value to the accumulated gradients.""" + self.timeline.start("add") + self.timeline.start("get_buffers") + oid = ray.pyarrow.plasma.ObjectID(grad_shard_id) + grads = ray.worker.global_worker.plasma_client.get(oid) + self.timeline.end("get_buffers") + self.accumulated += grads + self.acc_counter += 1 + self.timeline.end("add") + + def get(self, object_id): + """Put the accumulated gradients to the given object id.""" + self.timeline.start("get") + client = ray.worker.global_worker.plasma_client + assert self.acc_counter == self.num_sgd_workers, self.acc_counter + oid = ray.pyarrow.plasma.ObjectID(object_id) + client.put(self.accumulated.flatten(), object_id=oid) + self.accumulated = np.zeros_like(self.accumulated) + self.acc_counter = 0 + self.timeline.end("get") + + def get_timeline(self): + return self.timeline + + def ip(self): + return ray.services.get_node_ip_address() + + def warmup(self): + warmup() diff --git a/python/ray/experimental/sgd/sgd.py b/python/ray/experimental/sgd/sgd.py index 82c61613a..2ad74710a 100644 --- a/python/ray/experimental/sgd/sgd.py +++ b/python/ray/experimental/sgd/sgd.py @@ -7,336 +7,177 @@ import random import time import numpy as np -import pyarrow.plasma as plasma -import tensorflow as tf import ray -from ray.experimental.sgd.util import Timeline, fetch, run_timeline -from ray.experimental.sgd.modified_allreduce import sum_gradients_all_reduce, \ - unpack_small_tensors +from ray.experimental.sgd.sgd_worker import SGDWorker +from ray.experimental.sgd.param_server import ParameterServer logger = logging.getLogger(__name__) -class SGDWorker(object): +class DistributedSGD(object): + """Experimental distributed SGD implementation in Ray. + + This supports two modes: + 'simple': centralized gradient aggregation + 'ps': sharded parameter-server implementation + + To use this class, you'll have to implement model.py:Model. + + Arguments: + model_creator (func): Function that returns a model given worker and + device indexes as arguments. Each model replica will be created + within its own variable scope. + num_workers (int): Number of Ray actors to use for SGD. + devices_per_worker (int): Number of GPU or CPU devices to use per + worker. One model replica will be created per device. + gpu (bool): Whether to use GPU devices. + strategy (str): Strategy to use for distributed gradient aggregation. + This only applies if num_workers > 1. + grad_shard_bytes (int): Fuse gradient tensors into chunks of at most + this size (if applicable). + all_reduce_alg (str): TensorFlow strategy to use for gradient + synchronization within the same worker (if applicable). + See modified_allreduce.py for options. + + Examples: + >>> # Setup distributed SGD + >>> model_creator = ( + ... lambda worker_idx, device_idx: YourModelClass(...)) + >>> sgd = DistributedSGD( + ... model_creator, num_workers=2, + ... devices_per_worker=4, gpu=True, strategy="ps") + + >>> # To train + >>> for i in range(100): + ... stats = sgd.step(fetch_stats=i % 10 == 0) + + >>> # To access or update model state + >>> sgd.foreach_model(lambda model: ...) + + >>> # To access or update worker state + >>> sgd.foreach_worker(lambda worker: ...) + """ + def __init__(self, - worker_index, model_creator, - all_reduce_alg="simple", - num_devices=1, - use_cpus=False, - max_bytes=60000000, - plasma_op=False): - self.worker_index = worker_index - assert num_devices > 0 + num_workers, + devices_per_worker, + gpu=True, + strategy="ps", + grad_shard_bytes=10000000, + all_reduce_alg="simple"): - # TODO(ekl) support custom session - tf_session_args = { - "device_count": { - "CPU": num_devices - }, - "log_device_placement": False, - "gpu_options": tf.GPUOptions(force_gpu_compatible=True), - "inter_op_parallelism_threads": 128, - } - config_proto = tf.ConfigProto(**tf_session_args) - self.sess = tf.Session(config=config_proto) - self.models = [] - grad_ops = [] + if num_workers == 1 and strategy == "ps": + logger.warn( + "The parameter server strategy does not make sense for single " + "worker operation, falling back to simple mode.") + strategy = "simple" - if use_cpus: - device_tmpl = "/cpu:%d" + if strategy == "ps": + use_plasma_op = True + elif strategy == "simple": + use_plasma_op = False + grad_shard_bytes = 0 # tensor fusion doesn't make sense else: - device_tmpl = "/gpu:%d" - for device_idx in range(num_devices): - device = device_tmpl % device_idx - with tf.device(device): - with tf.variable_scope("device_%d" % device_idx): - model = model_creator(worker_index, device_idx) - self.models.append(model) - model.grads = [ - t - for t in model.optimizer.compute_gradients(model.loss) - if t[0] is not None - ] - grad_ops.append(model.grads) + raise ValueError("strategy must be one of 'ps', 'simple'") + self.strategy = strategy - if num_devices == 1: - assert not max_bytes, "Not supported with 1 GPU" - self.packed_grads_and_vars = grad_ops + self.model_creator = model_creator + if gpu: + requests = {"num_gpus": devices_per_worker} else: - if max_bytes: - self.packed_grads_and_vars, packing_vals = ( - sum_gradients_all_reduce( - "", - grad_ops, - 1, - all_reduce_alg, - 1, - list(range(num_devices)), - agg_small_grads_max_bytes=max_bytes)) - else: - self.packed_grads_and_vars, _ = (sum_gradients_all_reduce( - "", - grad_ops, - 1, - all_reduce_alg, - 1, - list(range(num_devices)), - agg_small_grads_max_bytes=0)) - self.per_device_grads = [ - list(zip(*dev_gv))[0] for dev_gv in self.packed_grads_and_vars - ] - assert (len(self.per_device_grads) == num_devices) - self.num_grads = num_grads = len(self.packed_grads_and_vars[0]) - if max_bytes: - logger.info("Packed grads => {} tensors".format(num_grads)) + requests = {"num_cpus": devices_per_worker} - # Ops for reading grads with the right control deps - nccl_noops = [] - for j in range(num_grads)[::-1]: - deps = nccl_noops + [ - dev_grad[j] for dev_grad in self.per_device_grads + RemoteSGDWorker = ray.remote(**requests)(SGDWorker) + self.workers = [] + logger.info("Creating SGD workers ({} total)".format(num_workers)) + for worker_index in range(num_workers): + self.workers.append( + RemoteSGDWorker.remote( + worker_index, + model_creator, + num_devices=devices_per_worker, + plasma_op=use_plasma_op, + gpu=gpu, + max_bytes=grad_shard_bytes, + all_reduce_alg=all_reduce_alg)) + + logger.info("Waiting for gradient configuration") + shard_shapes = ray.get(self.workers[0].shard_shapes.remote()) + + logger.info("Waiting for actors to start") + ray.get([w.shard_shapes.remote() for w in self.workers]) + + if strategy == "ps": + logger.info("Starting parameter servers ({} shards)".format( + len(shard_shapes))) + self.ps_list = [ + ParameterServer.remote(len(self.workers), i) + for i, s in enumerate(shard_shapes) ] - with tf.control_dependencies(deps): - nccl_noops = [tf.no_op()] - - # You must fetch this otherwise the NCCL allreduce will hang - self.nccl_control_out = tf.group(*nccl_noops) - - round_robin_devices = False - if plasma_op: - store_socket = ( - ray.worker.global_worker.plasma_client.store_socket_name) - if not plasma.tf_plasma_op: - plasma.build_plasma_tensorflow_op() - - # For fetching grads -> plasma - self.plasma_in_grads = [] - self.plasma_in_grads_oids = [ - tf.placeholder(shape=[], dtype=tf.string, name="in_grad_oids") - for _ in range(num_grads) - ] - ix = 0 - for j in range(num_grads): - grad = self.per_device_grads[ix][j] - if round_robin_devices: - ix += 1 # round robin assignment - ix %= num_devices - with tf.device(self.models[ix].loss.device): - plasma_grad = plasma.tf_plasma_op.tensor_to_plasma( - [grad], - self.plasma_in_grads_oids[j], - plasma_store_socket_name=store_socket, - plasma_manager_socket_name="") - self.plasma_in_grads.append(plasma_grad) - - # For applying grads <- plasma - unpacked_gv = [] - self.plasma_out_grads_oids = [ - tf.placeholder( - shape=[], dtype=tf.string, name="grad_out_oids") - for _ in range(num_grads) - ] - packed_plasma_grads = [] - ix = 0 - for j in range(num_grads): - with tf.device(self.plasma_in_grads[j].device): - with tf.control_dependencies([self.plasma_in_grads[j]]): - grad_ph = plasma.tf_plasma_op.plasma_to_tensor( - self.plasma_out_grads_oids[j], - dtype=tf.float32, - plasma_store_socket_name=store_socket, - plasma_manager_socket_name="") - grad_ph = tf.reshape(grad_ph, - self.packed_grads_and_vars[0][j][0].shape) - logger.debug("Packed tensor {}".format(grad_ph)) - packed_plasma_grads.append(grad_ph) - for i in range(num_devices): - per_device = [] - for j, (g, v) in enumerate(self.packed_grads_and_vars[i]): - grad_ph = packed_plasma_grads[j] - per_device.append((grad_ph, v)) - unpacked_gv.append(per_device) - - if max_bytes: - unpacked_gv = unpack_small_tensors(unpacked_gv, packing_vals) - - elif max_bytes: - unpacked_gv = unpack_small_tensors(self.packed_grads_and_vars, - packing_vals) + ray.get([ + ps.initialize.remote(s) + for ps, s in zip(self.ps_list, shard_shapes) + ]) + logger.info("Parameter servers started") else: - unpacked_gv = self.packed_grads_and_vars - - # Same shape as packed_grads_and_vars - assert len(unpacked_gv) == num_devices - assert len(unpacked_gv[0][0]) == 2 - - apply_ops = [] - to_apply = unpacked_gv[0] - for ix, m in enumerate(self.models): - apply_ops.append( - m.optimizer.apply_gradients( - [(g, v) - for ((g, _), (_, v)) in zip(to_apply, unpacked_gv[ix])])) - self.apply_op = tf.group(*apply_ops) - init_op = tf.group(tf.global_variables_initializer(), - tf.local_variables_initializer()) - self.sess.run(init_op) - - def foreach_model(self, fn): - return [fn(m) for m in self.models] + self.ps_list = [] def foreach_worker(self, fn): - return fn(self) + """Apply the given function to each remote worker. - def compute_gradients(self): - start = time.time() - feed_dict = {} - # Aggregate feed dicts for each model on this worker. - for model in self.models: - feed_dict.update(model.get_feed_dict()) - # We only need to fetch the first per_device_grad, since they are - # averaged across all devices by allreduce. - fetches = self.sess.run( - [ - self.models[0].loss, self.per_device_grads[0], - self.nccl_control_out - ], - feed_dict=feed_dict) - logger.debug( - "compute grad interior time {}".format(time.time() - start)) - return fetches + Returns: + List of results from applying the function. + """ + results = ray.get([w.foreach_worker.remote(fn) for w in self.workers]) + return results - def apply_gradients(self, avg_grads): - start = time.time() - result = { - g: avg_grads[i] - for (i, g) in enumerate(self.per_device_grads[0]) - } - self.sess.run(self.apply_op, feed_dict=result) - logger.debug("apply grad interior time {}".format(time.time() - start)) + def foreach_model(self, fn): + """Apply the given function to each model replica in each worker. - def ps_compute_apply(self, - out_grad_shard_oids, - agg_grad_shard_oids, - tl_name="ps_compute_apply", - write_timeline=False): - feed_dict = { - ph: oid - for (ph, - oid) in zip(self.plasma_in_grads_oids, out_grad_shard_oids) - } - feed_dict.update({ - ph: oid - for (ph, - oid) in zip(self.plasma_out_grads_oids, agg_grad_shard_oids) - }) - fetch(agg_grad_shard_oids) - run_timeline( - self.sess, - [self.plasma_in_grads, self.apply_op, self.nccl_control_out], - feed_dict=feed_dict, - write_timeline=write_timeline) + Returns: + List of results from applying the function. + """ + results = ray.get([w.foreach_model.remote(fn) for w in self.workers]) + out = [] + for r in results: + out.extend(r) + return r - def num_grad_shards(self): - return self.num_grads + def step(self, fetch_stats=False): + """Run a single SGD step. - def shard_shapes(self): - main_gv = self.packed_grads_and_vars[0] - return [g.shape for g, _ in main_gv] + Arguments: + fetch_stats (bool): Whether to return stats from the step. This can + slow down the computation by acting as a global barrier. + """ + if self.strategy == "ps": + return _distributed_sgd_step( + self.workers, + self.ps_list, + write_timeline=False, + fetch_stats=fetch_stats) + else: + return _simple_sgd_step(self.workers) - def ip(self): - return ray.services.get_node_ip_address() + def warmup(self): + logger.info("Warming up object store of worker actors") + ray.get([w.warmup.remote() for w in self.workers]) + logger.info("Warmup complete") -class ParameterServer(object): - def __init__(self, num_workers, tid): - self.num_sgd_workers = num_workers - self.acc_counter = 0 - self.timeline = Timeline(tid) - self.timeline.patch_ray() - - def set_tid(self, tid): - self.timeline.tid = tid - - def get_time(self): - return time.time() + self.timeline.offset - - def set_time(self, ref_time): - self.timeline.offset = ref_time - time.time() - - def initialize(self, shard_shape): - self.accumulated = np.zeros(shard_shape, dtype=np.float32) - - def mark(self): - self.timeline.event("mark") - - def prefetch(self, oids): - self.timeline.reset() - self.timeline.start("prefetch") - fetch(oids) - self.timeline.end("prefetch") - - def add_spinwait(self, grad_shard_ids): - self.timeline.start("add_spinwait") - plasma_ids = [ray.pyarrow.plasma.ObjectID(x) for x in grad_shard_ids] - while plasma_ids: - for p in plasma_ids: - if ray.worker.global_worker.plasma_client.contains(p): - self.timeline.start("get_buffers") - grads = ray.worker.global_worker.plasma_client.get(p) - self.accumulated += grads - self.acc_counter += 1 - self.timeline.end("get_buffers") - plasma_ids.remove(p) - break - self.timeline.end("add_spinwait") - - def add(self, grad_shard_id): - self.timeline.start("add") - self.timeline.start("get_buffers") - oid = ray.pyarrow.plasma.ObjectID(grad_shard_id) - grads = ray.worker.global_worker.plasma_client.get(oid) - self.timeline.end("get_buffers") - self.accumulated += grads - self.acc_counter += 1 - self.timeline.end("add") - - def get(self, object_id): - self.timeline.start("get") - client = ray.worker.global_worker.plasma_client - assert self.acc_counter == self.num_sgd_workers, self.acc_counter - oid = ray.pyarrow.plasma.ObjectID(object_id) - client.put(self.accumulate.flatten(), object_id=oid) - self.accumulated = np.zeros_like(self.accumulated) - self.acc_counter = 0 - self.timeline.end("get") - - def get_timeline(self): - return self.timeline - - def ip(self): - return ray.services.get_node_ip_address() - - def pin(self, cpu_id): - try: - import psutil - p = psutil.Process() - p.cpu_affinity([cpu_id]) - logger.info("Setting CPU Affinity to: {}".format(cpu_id)) - except Exception as e: - logger.error(e) - - -def average_gradients(grads): +def _average_gradients(grads): out = [] for grad_list in zip(*grads): out.append(np.mean(grad_list, axis=0)) return out -def do_sgd_step(actors): +def _simple_sgd_step(actors): + if len(actors) == 1: + return ray.get(actors[0].compute_apply.remote()) + start = time.time() fetches = ray.get([a.compute_gradients.remote() for a in actors]) losses = [f[0] for f in fetches] @@ -347,7 +188,7 @@ def do_sgd_step(actors): assert len(grads) == 1 avg_grad = grads[0] else: - avg_grad = average_gradients(grads) + avg_grad = _average_gradients(grads) logger.debug("grad reduce time {}".format(time.time() - start)) start = time.time() ray.get([a.apply_gradients.remote(avg_grad) for a in actors]) @@ -355,20 +196,24 @@ def do_sgd_step(actors): return np.mean(losses) -def distributed_sgd_step(actors, ps_list, write_timeline): +def _distributed_sgd_step(actors, ps_list, fetch_stats, write_timeline): # Preallocate object ids that actors will write gradient shards to grad_shard_oids_list = [[np.random.bytes(20) for _ in ps_list] for _ in actors] - logger.info("generated grad oids") + logger.info("Generated grad oids") # Preallocate object ids that param servers will write new weights to accum_shard_ids = [np.random.bytes(20) for _ in ps_list] - logger.info("generated accum oids") + logger.info("Generated accum oids") # Kick off the fused compute grad / update weights tf run for each actor + losses = [] for actor, grad_shard_oids in zip(actors, grad_shard_oids_list): - actor.ps_compute_apply.remote( - grad_shard_oids, accum_shard_ids, write_timeline=write_timeline) + losses.append( + actor.ps_compute_apply.remote( + grad_shard_oids, + accum_shard_ids, + write_timeline=write_timeline)) logger.info("Launched all ps_compute_applys on all actors") # Issue prefetch ops @@ -401,44 +246,7 @@ def distributed_sgd_step(actors, ps_list, write_timeline): else: # Wait for at least the ps gets to finish ray.get(ps_gets) - - -class DistributedSGD(object): - def __init__(self, - model_creator, - num_workers, - devices_per_worker, - use_cpus=False, - use_plasma_op=False): - self.model_creator = model_creator - if use_cpus: - requests = {"num_cpus": devices_per_worker} - else: - requests = {"num_gpus": devices_per_worker} - RemoteSGDWorker = ray.remote(**requests)(SGDWorker) - self.workers = [] - for worker_index in range(num_workers): - logger.info("Creating worker {}".format(worker_index)) - self.workers.append( - RemoteSGDWorker.remote( - worker_index, - model_creator, - num_devices=devices_per_worker, - plasma_op=use_plasma_op, - use_cpus=use_cpus)) - assert not use_plasma_op, \ - "TODO: when use_plasma_op is true, we must run in PS mode" - - def foreach_worker(self, fn): - results = ray.get([w.foreach_worker.remote(fn) for w in self.workers]) - return results - - def foreach_model(self, fn): - results = ray.get([w.foreach_model.remote(fn) for w in self.workers]) - out = [] - for r in results: - out.extend(r) - return r - - def step(self): - return do_sgd_step(self.workers) + if fetch_stats: + return np.mean(ray.get(losses)) + else: + return None diff --git a/python/ray/experimental/sgd/sgd_worker.py b/python/ray/experimental/sgd/sgd_worker.py new file mode 100644 index 000000000..eb06b09d0 --- /dev/null +++ b/python/ray/experimental/sgd/sgd_worker.py @@ -0,0 +1,254 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import time + +import pyarrow.plasma as plasma +import tensorflow as tf + +import ray +from ray.experimental.sgd.util import fetch, run_timeline, warmup +from ray.experimental.sgd.modified_allreduce import sum_gradients_all_reduce, \ + unpack_small_tensors + +logger = logging.getLogger(__name__) + + +class SGDWorker(object): + """Helper class for ray.experimental.sgd.DistributedSGD.""" + + def __init__(self, + worker_index, + model_creator, + all_reduce_alg="simple", + num_devices=1, + gpu=False, + max_bytes=10000000, + plasma_op=False): + self.worker_index = worker_index + assert num_devices > 0 + + # TODO(ekl) support custom session + tf_session_args = { + "device_count": { + "CPU": num_devices + }, + "log_device_placement": False, + "gpu_options": tf.GPUOptions(force_gpu_compatible=True), + "inter_op_parallelism_threads": 128, + } + config_proto = tf.ConfigProto(**tf_session_args) + self.sess = tf.Session(config=config_proto) + self.models = [] + grad_ops = [] + + if gpu: + device_tmpl = "/gpu:%d" + else: + device_tmpl = "/cpu:%d" + for device_idx in range(num_devices): + device = device_tmpl % device_idx + with tf.device(device): + with tf.variable_scope("device_%d" % device_idx): + model = model_creator(worker_index, device_idx) + self.models.append(model) + model.grads = [ + t + for t in model.optimizer.compute_gradients(model.loss) + if t[0] is not None + ] + grad_ops.append(model.grads) + + if num_devices == 1: + assert not max_bytes, \ + "grad_shard_bytes > 0 ({}) requires num_devices > 1".format( + max_bytes) + self.packed_grads_and_vars = grad_ops + else: + if max_bytes: + self.packed_grads_and_vars, packing_vals = ( + sum_gradients_all_reduce( + "", + grad_ops, + 1, + all_reduce_alg, + 1, + list(range(num_devices)), + agg_small_grads_max_bytes=max_bytes)) + else: + self.packed_grads_and_vars, _ = (sum_gradients_all_reduce( + "", + grad_ops, + 1, + all_reduce_alg, + 1, + list(range(num_devices)), + agg_small_grads_max_bytes=0)) + self.per_device_grads = [ + list(zip(*dev_gv))[0] for dev_gv in self.packed_grads_and_vars + ] + assert (len(self.per_device_grads) == num_devices) + self.num_grads = num_grads = len(self.packed_grads_and_vars[0]) + if max_bytes: + logger.info("Packed grads => {} tensors".format(num_grads)) + + # Ops for reading grads with the right control deps + nccl_noops = [] + for j in range(num_grads)[::-1]: + deps = nccl_noops + [ + dev_grad[j] for dev_grad in self.per_device_grads + ] + with tf.control_dependencies(deps): + nccl_noops = [tf.no_op()] + + # You must fetch this otherwise the NCCL allreduce will hang + self.nccl_control_out = tf.group(*nccl_noops) + + if plasma_op: + store_socket = ( + ray.worker.global_worker.plasma_client.store_socket_name) + manager_socket = ( + ray.worker.global_worker.plasma_client.manager_socket_name) + if not plasma.tf_plasma_op: + plasma.build_plasma_tensorflow_op() + + # For fetching grads -> plasma + self.plasma_in_grads = [] + self.plasma_in_grads_oids = [ + tf.placeholder(shape=[], dtype=tf.string, name="in_grad_oids") + for _ in range(num_grads) + ] + for j in range(num_grads): + grad = self.per_device_grads[0][j] + with tf.device(self.models[0].loss.device): + plasma_grad = plasma.tf_plasma_op.tensor_to_plasma( + [grad], + self.plasma_in_grads_oids[j], + plasma_store_socket_name=store_socket, + plasma_manager_socket_name=manager_socket) + self.plasma_in_grads.append(plasma_grad) + + # For applying grads <- plasma + unpacked_gv = [] + self.plasma_out_grads_oids = [ + tf.placeholder( + shape=[], dtype=tf.string, name="grad_out_oids") + for _ in range(num_grads) + ] + packed_plasma_grads = [] + for j in range(num_grads): + with tf.device(self.plasma_in_grads[j].device): + with tf.control_dependencies([self.plasma_in_grads[j]]): + grad_ph = plasma.tf_plasma_op.plasma_to_tensor( + self.plasma_out_grads_oids[j], + dtype=tf.float32, + plasma_store_socket_name=store_socket, + plasma_manager_socket_name=manager_socket) + grad_ph = tf.reshape(grad_ph, + self.packed_grads_and_vars[0][j][0].shape) + logger.debug("Packed tensor {}".format(grad_ph)) + packed_plasma_grads.append(grad_ph) + for i in range(num_devices): + per_device = [] + for j, (g, v) in enumerate(self.packed_grads_and_vars[i]): + grad_ph = packed_plasma_grads[j] + per_device.append((grad_ph, v)) + unpacked_gv.append(per_device) + + if max_bytes: + unpacked_gv = unpack_small_tensors(unpacked_gv, packing_vals) + + elif max_bytes: + unpacked_gv = unpack_small_tensors(self.packed_grads_and_vars, + packing_vals) + else: + unpacked_gv = self.packed_grads_and_vars + + # Same shape as packed_grads_and_vars + assert len(unpacked_gv) == num_devices + assert len(unpacked_gv[0][0]) == 2 + + apply_ops = [] + to_apply = unpacked_gv[0] + for ix, m in enumerate(self.models): + apply_ops.append( + m.optimizer.apply_gradients( + [(g, v) + for ((g, _), (_, v)) in zip(to_apply, unpacked_gv[ix])])) + self.apply_op = tf.group(*apply_ops) + init_op = tf.group(tf.global_variables_initializer(), + tf.local_variables_initializer()) + self.sess.run(init_op) + + def foreach_model(self, fn): + return [fn(m) for m in self.models] + + def foreach_worker(self, fn): + return fn(self) + + def compute_gradients(self): + start = time.time() + feed_dict = {} + # Aggregate feed dicts for each model on this worker. + for model in self.models: + feed_dict.update(model.get_feed_dict()) + # We only need to fetch the first per_device_grad, since they are + # averaged across all devices by allreduce. + fetches = self.sess.run( + [ + self.models[0].loss, self.per_device_grads[0], + self.nccl_control_out + ], + feed_dict=feed_dict) + logger.debug( + "Compute grad interior time {}".format(time.time() - start)) + return fetches + + def apply_gradients(self, avg_grads): + start = time.time() + result = { + g: avg_grads[i] + for (i, g) in enumerate(self.per_device_grads[0]) + } + self.sess.run(self.apply_op, feed_dict=result) + logger.debug("Apply grad interior time {}".format(time.time() - start)) + + def compute_apply(self): + fetches = run_timeline( + self.sess, + [self.models[0].loss, self.apply_op, self.nccl_control_out], + name="compute_apply") + return fetches[0] + + def ps_compute_apply(self, + out_grad_shard_oids, + agg_grad_shard_oids, + tl_name="ps_compute_apply", + write_timeline=False): + feed_dict = dict(zip(self.plasma_in_grads_oids, out_grad_shard_oids)) + feed_dict.update( + dict(zip(self.plasma_out_grads_oids, agg_grad_shard_oids))) + fetch(agg_grad_shard_oids) + fetches = run_timeline( + self.sess, [ + self.models[0].loss, self.plasma_in_grads, self.apply_op, + self.nccl_control_out + ], + feed_dict=feed_dict, + write_timeline=write_timeline) + return fetches[0] + + def num_grad_shards(self): + return self.num_grads + + def shard_shapes(self): + main_gv = self.packed_grads_and_vars[0] + return [g.shape for g, _ in main_gv] + + def ip(self): + return ray.services.get_node_ip_address() + + def warmup(self): + warmup() diff --git a/python/ray/experimental/sgd/test_sgd.py b/python/ray/experimental/sgd/test_sgd.py index d6369a4e0..79e00b265 100755 --- a/python/ray/experimental/sgd/test_sgd.py +++ b/python/ray/experimental/sgd/test_sgd.py @@ -1,32 +1,67 @@ +#!/usr/bin/env python + from __future__ import absolute_import from __future__ import division from __future__ import print_function import argparse +import time import ray from ray.experimental.sgd.tfbench.test_model import TFBenchModel from ray.experimental.sgd.sgd import DistributedSGD parser = argparse.ArgumentParser() +parser.add_argument("--redis-address", default=None, type=str) +parser.add_argument("--num-iters", default=10, type=int) +parser.add_argument("--batch-size", default=1, type=int) +parser.add_argument("--num-workers", default=2, type=int) +parser.add_argument("--grad-shard-bytes", default=10000000, type=int) +parser.add_argument("--devices-per-worker", default=2, type=int) +parser.add_argument("--stats-interval", default=10, type=int) +parser.add_argument("--all-reduce-alg", default="simple", type=str) +parser.add_argument("--object-store-memory", default=None, type=int) parser.add_argument( - "--num-iters", default=100, type=int, help="Number of iterations to run") + "--warmup", action="store_true", help="Warm up object store before start.") +parser.add_argument( + "--strategy", default="ps", type=str, help="One of 'simple' or 'ps'") +parser.add_argument( + "--gpu", action="store_true", help="Use GPUs for optimization") if __name__ == "__main__": - ray.init() - args, _ = parser.parse_known_args() + ray.init( + redis_address=args.redis_address, + object_store_memory=args.object_store_memory) model_creator = ( - lambda worker_idx, device_idx: TFBenchModel(batch=1, use_cpus=True)) + lambda worker_idx, device_idx: TFBenchModel( + batch=args.batch_size, use_cpus=not args.gpu)) sgd = DistributedSGD( model_creator, - num_workers=2, - devices_per_worker=2, - use_cpus=True, - use_plasma_op=False) + num_workers=args.num_workers, + devices_per_worker=args.devices_per_worker, + gpu=args.gpu, + strategy=args.strategy, + grad_shard_bytes=args.grad_shard_bytes, + all_reduce_alg=args.all_reduce_alg) - for _ in range(args.num_iters): - loss = sgd.step() - print("Current loss", loss) + if args.warmup: + sgd.warmup() + + t = [] + + for i in range(args.num_iters): + start = time.time() + fetch_stats = i % args.stats_interval == 0 + print("== Step {} ==".format(i)) + stats = sgd.step(fetch_stats=fetch_stats) + ips = ((args.batch_size * args.num_workers * args.devices_per_worker) / + (time.time() - start)) + print("Iteration time", time.time() - start, "Images per second", ips) + t.append(ips) + if fetch_stats: + print("Current loss", stats) + + print("Peak throughput", max(sum(t[i:i + 5]) / 5 for i in range(len(t)))) diff --git a/python/ray/experimental/sgd/tfbench/test_model.py b/python/ray/experimental/sgd/tfbench/test_model.py index 0dd48607e..d866668f8 100644 --- a/python/ray/experimental/sgd/tfbench/test_model.py +++ b/python/ray/experimental/sgd/tfbench/test_model.py @@ -5,13 +5,14 @@ from __future__ import print_function import tensorflow as tf from tfbench import model_config +from ray.experimental.sgd.model import Model class MockDataset(): name = "synthetic" -class TFBenchModel(object): +class TFBenchModel(Model): def __init__(self, batch=64, use_cpus=False): image_shape = [batch, 224, 224, 3] labels_shape = [batch] @@ -25,20 +26,22 @@ class TFBenchModel(object): name='synthetic_images') # Minor hack to avoid H2D copy when using synthetic data - self.inputs = tf.contrib.framework.local_variable( + inputs = tf.contrib.framework.local_variable( images, name='gpu_cached_images') - self.labels = tf.random_uniform( + labels = tf.random_uniform( labels_shape, minval=0, maxval=999, dtype=tf.int32, name='synthetic_labels') - self.model = model_config.get_model_config("resnet101", MockDataset()) - logits, aux = self.model.build_network( - self.inputs, data_format=use_cpus and "NHWC" or "NCHW") + model = model_config.get_model_config("resnet101", MockDataset()) + logits, aux = model.build_network( + inputs, data_format=use_cpus and "NHWC" or "NCHW") loss = tf.nn.sparse_softmax_cross_entropy_with_logits( - logits=logits, labels=self.labels) + logits=logits, labels=labels) + + # Implement model interface self.loss = tf.reduce_mean(loss, name='xentropy-loss') self.optimizer = tf.train.GradientDescentOptimizer(1e-6) diff --git a/python/ray/experimental/sgd/util.py b/python/ray/experimental/sgd/util.py index 6c4f89719..82bc60a25 100644 --- a/python/ray/experimental/sgd/util.py +++ b/python/ray/experimental/sgd/util.py @@ -4,6 +4,7 @@ from __future__ import print_function import json import logging +import numpy as np import os import time import tensorflow as tf @@ -13,6 +14,24 @@ import ray logger = logging.getLogger(__name__) +def warmup(): + logger.info("Warming up object store") + zeros = np.zeros(int(100e6 / 8), dtype=np.float64) + start = time.time() + for _ in range(10): + ray.put(zeros) + logger.info("Initial latency for 100MB put {}".format( + (time.time() - start) / 10)) + for _ in range(5): + for _ in range(100): + ray.put(zeros) + start = time.time() + for _ in range(10): + ray.put(zeros) + logger.info("Warmed up latency for 100MB put {}".format( + (time.time() - start) / 10)) + + def fetch(oids): local_sched_client = ray.worker.global_worker.local_scheduler_client for o in oids: diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 1f31796a7..398413691 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -320,8 +320,13 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/cartpole_lstm.py --stop=200 --use-prev-action-reward -docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/test_sgd.py --num-iters=2 +docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/experimental/sgd/test_sgd.py --num-iters=2 \ + --batch-size=1 --strategy=simple + +docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/experimental/sgd/test_sgd.py --num-iters=2 \ + --batch-size=1 --strategy=ps docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \