From 1b645fcc8bb5568a4a59ea59d235537eb522520d Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 16 Jul 2018 21:51:39 -0700 Subject: [PATCH] Add parameter server blog post. (#2398) * Saving work on parameter server blog post. * Updates * Updates to blog post. * Add notes about tasks and actors. * Updates * Add RLlib paper link * Update intro * Address comments. * More fixes. * Clarify ray.get * Change date * Add @ray.remote clarification. * Update site deployment instructions. * Minor wording --- site/README.md | 2 +- ...parameter-server-in-fifteen-lines.markdown | 336 ++++++++++++++++++ 2 files changed, 337 insertions(+), 1 deletion(-) create mode 100644 site/_posts/2018-07-15-parameter-server-in-fifteen-lines.markdown diff --git a/site/README.md b/site/README.md index b029ac893..d2ec5d4a2 100644 --- a/site/README.md +++ b/site/README.md @@ -27,7 +27,7 @@ To deploy the site, run ``` cd ~ -git clone https://github.com/ray-project/ray-project.github.io.git +git clone git@github.com:ray-project/ray-project.github.io.git cd ray-project.github.io cp -r ~/ray/site/* . ``` diff --git a/site/_posts/2018-07-15-parameter-server-in-fifteen-lines.markdown b/site/_posts/2018-07-15-parameter-server-in-fifteen-lines.markdown new file mode 100644 index 000000000..fdb603c11 --- /dev/null +++ b/site/_posts/2018-07-15-parameter-server-in-fifteen-lines.markdown @@ -0,0 +1,336 @@ +--- +layout: post +title: "Implementing A Parameter Server in 15 Lines of Python with Ray" +excerpt: "This post describes how to implement a parameter server in Ray." +date: 2018-07-15 14:00:00 +--- + +Parameter servers are a core part of many machine learning applications. Their +role is to store the *parameters* of a machine learning model (e.g., the weights +of a neural network) and to *serve* them to clients (clients are often workers +that process data and compute updates to the parameters). + +Parameter servers (like databases) are normally built and shipped as standalone +systems. This post describes how to use [Ray][1] to implement a parameter server +in a few lines of code. + +By turning the parameter server from a "system" into an "application", this +approach makes it orders of magnitude simpler to deploy parameter server +applications. Similarly, by allowing applications and libraries to implement +their own parameter servers, this approach makes the behavior of the parameter +server much more configurable and flexible (since the application can simply +modify the implementation with a few lines of Python). + +**What is Ray?** [Ray][1] is a general-purpose framework for parallel and +distributed Python. Ray provides a unified task-parallel and actor abstraction +and achieves high performance through shared memory, zero-copy serialization, +and distributed scheduling. Ray also includes high-performance libraries +targeting AI applications, for example [hyperparameter tuning][5] and +[reinforcement learning][4]. + +## What is a Parameter Server? + +A parameter server is a key-value store used for training machine learning +models on a cluster. The **values** are the parameters of a machine-learning +model (e.g., a neural network). The **keys** index the model parameters. + +For example, in a movie **recommendation system**, there may be one key per user +and one key per movie. For each user and movie, there are corresponding +user-specific and movie-specific parameters. In a **language-modeling** +application, words may act as keys and their embeddings may be the values. In +its simplest form, a parameter server may implicitly have a single key and allow +all of the parameters to be retrieved and updated at once. We show how such a +parameter server can be implemented as a Ray actor (15 lines) below. + +```python +import numpy as np +import ray + + +@ray.remote +class ParameterServer(object): + def __init__(self, dim): + # Alternatively, params could be a dictionary mapping keys to arrays. + self.params = np.zeros(dim) + + def get_params(self): + return self.params + + def update_params(self, grad): + self.params += grad +``` + +**The `@ray.remote` decorator defines a service.** It takes the +`ParameterServer` class and allows it to be instantiated as a remote service or +actor. + +Here, we assume that the update is a gradient which should be added to the +parameter vector. This is just the simplest possible example, and many different +choices could be made. + +**A parameter server typically exists as a remote process or service** and +interacts with clients through remote procedure calls. To instantiate the +parameter server as a remote actor, we can do the following. + +```python +# We need to start Ray first. +ray.init() + +# Create a parameter server process. +ps = ParameterServer.remote(10) +``` + +**Actor method invocations return futures.** If we want to retrieve the actual +values, we can use a blocking `ray.get` call. For example, + +```python +>>> params_id = ps.get_params.remote() # This returns a future. + +>>> params_id +ObjectID(7268cb8d345ef26632430df6f18cc9690eb6b300) + +>>> ray.get(params_id) # This blocks until the task finishes. +array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) +``` + +Now, suppose we want to start some worker tasks that continuously compute +gradients and update the model parameters. Each worker will run in a loop that +does three things: +1. Get the latest parameters. +2. Compute an update to the parameters. +3. Update the parameters. + +As a Ray remote function (though the worker could also be an actor), this looks +like the following. + +```python +import time + +# Note that the worker function takes a handle to the parameter server as an +# argument, which allows the worker task to invoke methods on the parameter +# server actor. + +@ray.remote +def worker(ps): + for _ in range(100): + # Get the latest parameters. + params_id = ps.get_params.remote() # This method call is non-blocking + # and returns a future. + params = ray.get(params_id) # This is a blocking call which waits for + # the task to finish and gets the results. + + # Compute a gradient update. Here we just make a fake update, but in + # practice this would use a library like TensorFlow and would also take + # in a batch of data. + grad = np.ones(10) + time.sleep(0.2) # This is a fake placeholder for some computation. + + # Update the parameters. + ps.update_params.remote(grad) +``` + +Then we can start several worker tasks as follows. + +```python +# Start 2 workers. +for _ in range(2): + worker.remote(ps) +``` + +Then we can retrieve the parameters from the driver process and see that they +are being updated by the workers. + +```python +>>> ray.get(ps.get_params.remote()) +array([64., 64., 64., 64., 64., 64., 64., 64., 64., 64.]) +>>> ray.get(ps.get_params.remote()) +array([78., 78., 78., 78., 78., 78., 78., 78., 78., 78.]) +``` + +Part of the value that Ray adds here is that *Ray makes it as easy to start up a +remote service or actor as it is to define a Python class*. Handles to the actor +can be passed around to other actors and tasks to allow arbitrary and intuitive +messaging and communication patterns. Current alternatives are much more +involved. For example, [consider how the equivalent runtime service creation and +service handle passing would be done with GRPC][14]. + +## Additional Extensions + +Here we describe some important modifications to the above design. We describe +additional natural extensions in [this paper][3]. + +**Sharding Across Multiple Parameter Servers:** When your parameters are large +and your cluster is large, a single parameter server may not suffice because the +application could be bottlenecked by the network bandwidth into and out of the +machine that the parameter server is on (especially if there are many workers). + +A natural solution in this case is to shard the parameters across multiple +parameter servers. This can be achieved by simply starting up multiple parameter +server actors. An example of how to do this is shown in the code example at the +bottom. + +**Controlling Actor Placement:** The placement of specific actors and tasks on +different machines can be specified by using Ray's support for arbitrary +[resource requirements][2]. For example, if the worker requires a GPU, then its +remote decorator can be declared with `@ray.remote(num_gpus=1)`. Arbitrary +custom resources can be defined as well. + +## Unifying Tasks and Actors + +Ray supports parameter server applications efficiently in large part due to its +unified task-parallel and actor abstraction. + +Popular data processing systems such as [Apache Spark][12] allow stateless tasks +(functions with no side effects) to operate on immutable data. This assumption +simplifies the overall system design and makes it easier for applications to +reason about correctness. + +However, mutable state that is shared between many tasks is a recurring theme in +machine learning applications. That state could be the weights of a neural +network, the state of a third-party simulator, or an encapsulation of an +interaction with the physical world. + +To support these kinds of applications, Ray introduces an actor abstraction. An +actor will execute methods serially (so there are no concurrency issues), and +each method can arbitrarily mutate the actor's internal state. Methods can be +invoked by other actors and tasks (and even by other applications on the same +cluster). + +One thing that makes Ray so powerful is that it *unifies the actor abstraction +with the task-parallel abstraction* inheriting the benefits of both approaches. +Ray uses an underlying dynamic task graph to implement both actors and stateless +tasks in the same framework. As a consequence, these two abstractions are +completely interoperable. Tasks and actors can be created from within other +tasks and actors. Both return futures, which can be passed into other tasks or +actor methods to introduce scheduling and data dependencies. As a result, Ray +applications inherit the best features of both tasks and actors. + +## Under the Hood + +**Dynamic Task Graphs:** Under the hood, remote function invocations and actor +method invocations create tasks that are added to a dynamically growing graph of +tasks. The Ray backend is in charge of scheduling and executing these tasks +across a cluster (or a single multi-core machine). Tasks can be created by the +"driver" application or by other tasks. + +**Data:** Ray efficiently serializes data using the [Apache Arrow][9] data +layout. Objects are shared between workers and actors on the same machine +through [shared memory][10], which avoids the need for copies or +deserialization. This optimization is absolutely critical for achieving good +performance. + +**Scheduling:** Ray uses a distributed scheduling approach. Each machine has its +own scheduler, which manages the workers and actors on that machine. Tasks are +submitted by applications and workers to the scheduler on the same machine. From +there, they can be reassigned to other workers or passed to other local +schedulers. This allows Ray to achieve substantially higher task throughput than +what can be achieved with a centralized scheduler, which is important for +machine learning applications. + +## Conclusion + +A parameter server is normally implemented and shipped as a standalone system. +The thing that makes this approach so powerful is that we're able to implement a +parameter server with a few lines of code as an application. *This approach +makes it much simpler to deploy applications that use parameter servers and to +modify the behavior of the parameter server.* For example, if we want to shard +the parameter server, change the update rule, switch between asynchronous and +synchronous updates, ignore straggler workers, or any number of other +customizations, we can do each of these things with a few extra lines of code. + +This post describes how to use Ray actors to implement a parameter server. +However, actors are a much more general concept and can be useful for many +applications that involve stateful computation. Examples include logging, +streaming, simulation, model serving, graph processing, and many others. + +## Running this Code + +To run the complete application, first install Ray with `pip install ray`. Then +you should be able to run the code below, which implements a sharded parameter +server. + +```python +import numpy as np +import ray +import time + +# Start Ray. +ray.init() + + +@ray.remote +class ParameterServer(object): + def __init__(self, dim): + # Alternatively, params could be a dictionary mapping keys to arrays. + self.params = np.zeros(dim) + + def get_params(self): + return self.params + + def update_params(self, grad): + self.params += grad + + +@ray.remote +def worker(*parameter_servers): + for _ in range(100): + # Get the latest parameters. + parameter_shards = ray.get( + [ps.get_params.remote() for ps in parameter_servers]) + params = np.concatenate(parameter_shards) + + # Compute a gradient update. Here we just make a fake + # update, but in practice this would use a library like + # TensorFlow and would also take in a batch of data. + grad = np.ones(10) + time.sleep(0.2) # This is a fake placeholder for some computation. + grad_shards = np.split(grad, len(parameter_servers)) + + # Send the gradient updates to the parameter servers. + for ps, grad in zip(parameter_servers, grad_shards): + ps.update_params.remote(grad) + + +# Start two parameter servers, each with half of the parameters. +parameter_servers = [ParameterServer.remote(5) for _ in range(2)] + +# Start 2 workers. +workers = [worker.remote(*parameter_servers) for _ in range(2)] + +# Inspect the parameters at regular intervals. +for _ in range(5): + time.sleep(1) + print(ray.get([ps.get_params.remote() for ps in parameter_servers])) +``` + +Note that this example focuses on simplicity and that more can be done to +optimize this code. + +## Read More + +For more information about Ray, take a look at the following links: +1. The Ray [documentation][6] +2. The Ray [API][7] +3. Fast [serialization][9] with Ray and Apache Arrow +4. A [paper][11] describing the Ray system +5. Efficient [hyperparameter][5] tuning with Ray +6. Scalable [reinforcement][4] learning with Ray and [the RLlib paper][13] +7. Speeding up [Pandas][8] with Ray + +Questions should be directed to *ray-dev@googlegroups.com*. + + +[1]: https://github.com/ray-project/ray +[2]: http://ray.readthedocs.io/en/latest/resources.html +[3]: http://www.sysml.cc/doc/206.pdf +[4]: http://ray.readthedocs.io/en/latest/rllib.html +[5]: http://ray.readthedocs.io/en/latest/tune.html +[6]: http://ray.readthedocs.io/en/latest +[7]: http://ray.readthedocs.io/en/latest/api.html +[8]: https://github.com/modin-project/modin +[9]: https://ray-project.github.io/2017/10/15/fast-python-serialization-with-ray-and-arrow.html +[10]: https://ray-project.github.io/2017/08/08/plasma-in-memory-object-store.html +[11]: https://arxiv.org/abs/1712.05889 +[12]: http://spark.apache.org +[13]: https://arxiv.org/abs/1712.09381 +[14]: https://grpc.io/docs/tutorials/basic/python.html#defining-the-service