Consolidate and clean up documentation (#5645)

This commit is contained in:
Eric Liang
2019-09-07 11:50:18 -07:00
committed by GitHub
parent 732336fc4f
commit 1455a19c85
17 changed files with 534 additions and 706 deletions
+1 -1
View File
@@ -133,7 +133,7 @@ If TensorBoard is installed, automatically visualize all trial results:
RLlib Quick Start
-----------------
.. image:: https://github.com/ray-project/ray/raw/master/doc/source/images/rllib-stack.png
.. image:: https://github.com/ray-project/ray/raw/master/doc/source/images/rllib-wide.jpg
`RLlib`_ is an open-source library for reinforcement learning built on top of Ray that offers both high scalability and a unified API for a variety of applications.
+2 -2
View File
@@ -1,5 +1,5 @@
How-to: Using Actors
====================
Using Actors
============
An actor is essentially a stateful worker (or a service). When a new actor is
instantiated, a new worker is created, and methods of the actor are scheduled on
+62
View File
@@ -143,3 +143,65 @@ Notes
* Several limitations come from Cython's own `unsupported <https://github.com/cython/cython/wiki/Unsupported>`_ Python features.
* We currently do not support compiling and distributing Cython code to ``ray`` clusters. In other words, Cython developers are responsible for compiling and distributing any Cython code to their cluster (much as would be the case for users who need Python packages like ``scipy``).
* For most simple use cases, developers need not worry about Python 2 or 3, but users who do need to care can have a look at the ``language_level`` Cython compiler directive (see `here <http://cython.readthedocs.io/en/latest/src/reference/compilation.html>`_).
Inspecting Cluster State
------------------------
Applications written on top of Ray will often want to have some information
or diagnostics about the cluster. Some common questions include:
1. How many nodes are in my autoscaling cluster?
2. What resources are currently available in my cluster, both used and total?
3. What are the objects currently in my cluster?
For this, you can use the global state API.
Node Information
~~~~~~~~~~~~~~~~
To get information about the current nodes in your cluster, you can use ``ray.nodes()``:
.. autofunction:: ray.nodes
:noindex:
.. code-block:: python
import ray
ray.init()
print(ray.nodes())
"""
[{'ClientID': 'a9e430719685f3862ed7ba411259d4138f8afb1e',
'IsInsertion': True,
'NodeManagerAddress': '192.168.19.108',
'NodeManagerPort': 37428,
'ObjectManagerPort': 43415,
'ObjectStoreSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/plasma_store',
'RayletSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/raylet',
'Resources': {'CPU': 4.0},
'alive': True}]
"""
The above information includes:
- `ClientID`: A unique identifier for the raylet.
- `alive`: Whether the node is still alive.
- `NodeManagerAddress`: PrivateIP of the node that the raylet is on.
- `Resources`: The total resource capacity on the node.
Resource Information
~~~~~~~~~~~~~~~~~~~~
To get information about the current total resource capacity of your cluster, you can use ``ray.cluster_resources()``.
.. autofunction:: ray.cluster_resources
:noindex:
To get information about the current available resource capacity of your cluster, you can use ``ray.available_resources()``.
.. autofunction:: ray.available_resources
:noindex:
+2 -2
View File
@@ -1,5 +1,5 @@
Built-in Autoscaling
=====================
Automatic Cluster Setup
=======================
This document provides instructions for launching a Ray cluster either privately, on AWS, or on GCP.
Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

+3 -6
View File
@@ -192,13 +192,11 @@ Getting Involved
walkthrough.rst
actors.rst
using-ray-with-gpus.rst
user-profiling.rst
inspect.rst
object-store.rst
configure.rst
serialization.rst
memory-management.rst
advanced.rst
configure.rst
troubleshooting.rst
advanced.rst
package-ref.rst
.. toctree::
@@ -272,6 +270,5 @@ Getting Involved
development.rst
profiling.rst
internals-overview.rst
fault-tolerance.rst
contrib.rst
-82
View File
@@ -1,82 +0,0 @@
How-To: Inspect Cluster State
=============================
Applications written on top of Ray will often want to have some information
or diagnostics about the cluster. Some common questions include:
1. How many nodes are in my autoscaling cluster?
2. What resources are currently available in my cluster, both used and total?
3. What are the objects currently in my cluster?
For this, you can use the global state API.
Context: Ray Processes
----------------------
For context, when using Ray, several processes are involved.
- Multiple **worker** processes execute tasks and store results in object
stores. Each worker is a separate process.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **raylet** per node assigns tasks to workers on the same node.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its raylet and get objects from the object
store, but it is different in that the raylet will not assign tasks to
the driver to be executed.
- Multiple **Redis servers** maintain much of the system's state. For example, it keeps
track of which objects live on which machines and of the task specifications
(but not data). It can also be queried directly for debugging purposes.
Node Information
----------------
To get information about the current nodes in your cluster, you can use ``ray.nodes()``:
.. autofunction:: ray.nodes
:noindex:
.. code-block:: python
import ray
ray.init()
print(ray.nodes())
"""
[{'ClientID': 'a9e430719685f3862ed7ba411259d4138f8afb1e',
'IsInsertion': True,
'NodeManagerAddress': '192.168.19.108',
'NodeManagerPort': 37428,
'ObjectManagerPort': 43415,
'ObjectStoreSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/plasma_store',
'RayletSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/raylet',
'Resources': {'CPU': 4.0},
'alive': True}]
"""
The above information includes:
- `ClientID`: A unique identifier for the raylet.
- `alive`: Whether the node is still alive.
- `NodeManagerAddress`: PrivateIP of the node that the raylet is on.
- `Resources`: The total resource capacity on the node.
Resource Information
--------------------
To get information about the current total resource capacity of your cluster, you can use ``ray.cluster_resources()``.
.. autofunction:: ray.cluster_resources
:noindex:
To get information about the current available resource capacity of your cluster, you can use ``ray.available_resources()``.
.. autofunction:: ray.available_resources
:noindex:
-173
View File
@@ -1,173 +0,0 @@
An Overview of the Internals
============================
In this document, we overview the internal architecture of Ray.
Connecting to Ray
-----------------
There are two ways that a Ray script can be initiated. It can either be run in a
standalone fashion or it can be connect to an existing Ray cluster.
Running Ray standalone
~~~~~~~~~~~~~~~~~~~~~~
Ray can be used standalone by calling ``ray.init()`` within a script. When the
call to ``ray.init()`` happens, all of the relevant processes are started.
These include a raylet, an object store and manager, a Redis server,
and a number of worker processes.
When the script exits, these processes will be killed.
**Note:** This approach is limited to a single machine.
Connecting to an existing Ray cluster
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To connect to an existing Ray cluster, simply pass the argument address of the
Redis server as the ``address=`` keyword argument into ``ray.init``. In
this case, no new processes will be started when ``ray.init`` is called, and
similarly the processes will continue running when the script exits. In this
case, all processes except workers that correspond to actors are shared between
different driver processes.
Ray Processes
-------------
When using Ray, several processes are involved.
- Multiple **worker** processes execute tasks and store results in object
stores. Each worker is a separate process.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **raylet** per node assigns tasks to workers on the same node.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its raylet and get objects from the object
store, but it is different in that the raylet will not assign tasks to
the driver to be executed.
- A **Redis server** maintains much of the system's state. For example, it keeps
track of which objects live on which machines and of the task specifications
(but not data). It can also be queried directly for debugging purposes.
Defining a remote function
--------------------------
A central component of this system is the **centralized control plane**. This is
implemented using one or more Redis servers. `Redis`_ is an in-memory key-value
store.
.. _`Redis`: https://github.com/antirez/redis
We use the centralized control plane in two ways. First, as persistent store of
the system's control state. Second, as a message bus for communication between
processes (using Redis's publish-subscribe functionality).
Now, consider a remote function definition as below.
.. code-block:: python
@ray.remote
def f(x):
return x + 1
When the remote function is defined as above, the function is immediately
pickled, assigned a unique ID, and stored in a Redis server.
Each worker process has a separate thread running in the background that
listens for the addition of remote functions to the centralized control state.
When a new remote function is added, the thread fetches the pickled remote
function, unpickles it, and can then execute that function.
Calling a remote function
-------------------------
When a driver or worker invokes a remote function, a number of things happen.
- First, a task object is created. The task object includes the following.
- The ID of the function being called.
- The IDs or values of the arguments to the function. Python primitives like
integers or short strings will be pickled and included as part of the task
object. Larger or more complex objects will be put into the object store
with an internal call to ``ray.put``, and the resulting IDs are included in
the task object. Object IDs that are passed directly as arguments are also
included in the task object.
- The ID of the task. This is generated uniquely from the above content.
- The IDs for the return values of the task. These are generated uniquely
from the above content.
- The task object is then sent to the raylet on the same node as the driver
or worker.
- The raylet makes a decision to either schedule the task locally or to
pass the task on to another raylet.
- If all of the task's object dependencies are present in the local object
store and there are enough CPU and GPU resources available to execute the
task, then the raylet will assign the task to one of its available workers.
- If those conditions are not met, the task will be forwarded to another
raylet. This is done by peer-to-peer connection between raylets.
The task table can be inspected as follows.
.. autofunction:: ray.tasks
:noindex:
- Once a task has been scheduled to a raylet, the raylet queues
the task for execution. A task is assigned to a worker when enough resources
become available and the object dependencies are available locally,
in first-in, first-out order.
- When the task has been assigned to a worker, the worker executes the task and
puts the task's return values into the object store. The object store will
then update the **object table**, which is part of the centralized control
state, to reflect the fact that it contains the newly created objects. The
object table can be viewed as follows.
.. autofunction:: ray.objects
:noindex:
- When the task's return values are placed into the object store, they are first
serialized into a contiguous blob of bytes using the `Apache Arrow`_ data
layout, which is helpful for efficiently sharing data between processes using
shared memory.
.. _`Apache Arrow`: https://arrow.apache.org/
Notes and limitations
~~~~~~~~~~~~~~~~~~~~~
- When an object store on a particular node fills up, it will begin evicting
objects in a least-recently-used manner. If an object that is needed later is
evicted, then the call to ``ray.get`` for that object will initiate the
reconstruction of the object. The raylet will attempt to reconstruct
the object by replaying its task lineage.
Getting an object ID
--------------------
Several things happen when a driver or worker calls ``ray.get`` on an object ID.
.. code-block:: python
ray.get(x_id)
- The driver or worker goes to the object store on the same node and requests
the relevant object. Each object store consists of two components, a
shared-memory key-value store of immutable objects, and a manager to
coordinate the transfer of objects between nodes.
- If the object is not present in the object store, the manager checks the
object table to see which other object stores, if any, have the object. It
then requests the object directly from one of those object stores, via its
manager. If the object doesn't exist anywhere, then the centralized control
state will notify the requesting manager when the object is created. If the
object doesn't exist anywhere because it has been evicted from all object
stores, the worker will also request reconstruction of the object from the
raylet. These checks repeat periodically until the object is
available in the local object store, whether through reconstruction or
through object transfer.
- Once the object is available in the local object store, the driver or worker
will map the relevant region of memory into its own address space (to avoid
copying the object), and will deserialize the bytes into a Python object.
Note that any numpy arrays that are part of the object will not be copied.
+22
View File
@@ -5,8 +5,30 @@ Training APIs
-------------
* `Command-line <rllib-training.html>`__
* `Configuration <rllib-training.html#configuration>`__
- `Specifying Parameters <rllib-training.html#specifying-parameters>`__
- `Specifying Resources <rllib-training.html#specifying-resources>`__
- `Common Parameters <rllib-training.html#common-parameters>`__
- `Tuned Examples <rllib-training.html#tuned-examples>`__
* `Python API <rllib-training.html#python-api>`__
- `Custom Training Workflows <rllib-training.html#custom-training-workflows>`__
- `Accessing Policy State <rllib-training.html#accessing-policy-state>`__
- `Accessing Model State <rllib-training.html#accessing-model-state>`__
- `Global Coordination <rllib-training.html#global-coordination>`__
- `Callbacks and Custom Metrics <rllib-training.html#callbacks-and-custom-metrics>`__
- `Rewriting Trajectories <rllib-training.html#rewriting-trajectories>`__
- `Curriculum Learning <rllib-training.html#curriculum-learning>`__
* `Debugging <rllib-training.html#debugging>`__
- `Gym Monitor <rllib-training.html#gym-monitor>`__
- `Eager Mode <rllib-training.html#eager-mode>`__
- `Episode Traces <rllib-training.html#episode-traces>`__
- `Log Verbosity <rllib-training.html#log-verbosity>`__
- `Stack Traces <rllib-training.html#stack-traces>`__
* `REST API <rllib-training.html#rest-api>`__
Environments
+63 -6
View File
@@ -209,13 +209,69 @@ Similar to accessing policy state, you may want to get a reference to the underl
.. code-block:: python
# Get a reference to the model through the policy
>>> from ray.rllib.agents.dqn import DQNTrainer
>>> trainer = DQNTrainer(env="CartPole-v0")
>>> trainer.get_policy().model
>>> model = trainer.get_policy().model
<ray.rllib.models.catalog.FullyConnectedNetwork_as_DistributionalQModel ...>
>>> trainer.get_policy().model.variables()
# List of all model variables
>>> model.variables()
[<tf.Variable 'default_policy/fc_1/kernel:0' shape=(4, 256) dtype=float32>, ...]
# Run a forward pass to get logits, can run with policy.get_session()
>>> model.from_batch({"obs": np.array([[0.1, 0.2, 0.3, 0.4]])})
(<tf.Tensor 'model_3/fc_out/Tanh:0' shape=(1, 256) dtype=float32>, [])
# Access the base Keras models (all default models have a base)
>>> model.base_model.summary()
Model: "model"
_______________________________________________________________________
Layer (type) Output Shape Param # Connected to
=======================================================================
observations (InputLayer) [(None, 4)] 0
_______________________________________________________________________
fc_1 (Dense) (None, 256) 1280 observations[0][0]
_______________________________________________________________________
fc_out (Dense) (None, 256) 65792 fc_1[0][0]
_______________________________________________________________________
value_out (Dense) (None, 1) 257 fc_1[0][0]
=======================================================================
Total params: 67,329
Trainable params: 67,329
Non-trainable params: 0
______________________________________________________________________________
# Access the Q value model (specific to DQN)
>>> model.q_value_head.summary()
Model: "model_1"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
model_out (InputLayer) [(None, 256)] 0
_________________________________________________________________
lambda (Lambda) [(None, 2), (None, 2, 1), 66306
=================================================================
Total params: 66,306
Trainable params: 66,306
Non-trainable params: 0
_________________________________________________________________
# Access the state value model (specific to DQN)
>>> model.state_value_head.summary()
Model: "model_2"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
model_out (InputLayer) [(None, 256)] 0
_________________________________________________________________
lambda_1 (Lambda) (None, 1) 66049
=================================================================
Total params: 66,049
Trainable params: 66,049
Non-trainable params: 0
_________________________________________________________________
This is especially useful when used with `custom model classes <rllib-models.html>`__.
Global Coordination
@@ -306,11 +362,12 @@ Rewriting Trajectories
~~~~~~~~~~~~~~~~~~~~~~
Note that in the ``on_postprocess_batch`` callback you have full access to the trajectory batch (``post_batch``) and other training state. This can be used to rewrite the trajectory, which has a number of uses including:
* Backdating rewards to previous time steps (e.g., based on values in ``info``).
* Adding model-based curiosity bonuses to rewards (you can train the model with a `custom model supervised loss <rllib-models.html#supervised-model-losses>`__).
Example: Curriculum Learning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Curriculum Learning
~~~~~~~~~~~~~~~~~~~
Let's look at two ways to use the above APIs to implement `curriculum learning <https://bair.berkeley.edu/blog/2017/12/20/reverse-curriculum/>`__. In curriculum learning, the agent task is adjusted over time to improve the learning process. Suppose that we have an environment class with a ``set_phase()`` method that we can call to adjust the task difficulty over time:
@@ -401,8 +458,8 @@ The ``"monitor": true`` config can be used to save Gym episode videos to the res
openaigym.video.0.31403.video000000.meta.json
openaigym.video.0.31403.video000000.mp4
TensorFlow Eager
~~~~~~~~~~~~~~~~
Eager Mode
~~~~~~~~~~
Policies built with ``build_tf_policy`` can be also run in eager mode by setting the ``"eager": True`` config option or using ``rllib train --eager``. This will tell RLlib to execute the model forward pass, action distribution, loss, and stats functions in eager mode.
@@ -1,65 +1,17 @@
Object Store and Serialization
==============================
Serialization
=============
Since Ray processes do not share memory space, data transferred between workers and nodes will need to **serialized** and **deserialized**. Ray uses the `Plasma object store <https://arrow.apache.org/docs/python/plasma.html>`_ to efficiently transfer objects across different processes and different nodes.
Since Ray processes do not share memory space, data transferred between workers and nodes will need to **serialized** and **deserialized**. Ray uses the `Plasma object store <https://arrow.apache.org/docs/python/plasma.html>`_ to efficiently transfer objects across different processes and different nodes. Numpy arrays in the object store are shared between workers on the same node (zero-copy deserialization).
Object Store (Plasma)
---------------------
Plasma Object Store
-------------------
Plasma is an in-memory object store that is being developed as part of `Apache Arrow`_. Ray uses Plasma to efficiently transfer objects across different processes and different nodes. All objects in Plasma object store are **immutable** and held in shared memory. This is so that they can be accessed efficiently by many workers on the same node.
Ray will place objects in the object store in the following situations:
1. Calling ``ray.put``
.. code:: python
y = 2
# This places `2` into the object store.
object_id = ray.put(y)
2. The return values of a remote function.
.. code:: python
@ray.remote
def remote_function():
return 1
# This places `1` into the object store.
object_id = remote_function.remote()
3. Arguments to remote functions (except for simple arguments like ints or floats).
.. code:: python
@ray.remote
def remote_function(y):
# Note that inside the remote function, the actual argument is provided.
return len(y)
argument = np.random.rand(100, 100)
# This implicitly places `argument` into the object store.
remote_function.remote(argument)
Each node has its own object store. When data is put into the object store, it does not get automatically broadcasted to other nodes. Data remains local to the writer until requested by another task or actor on another node.
.. tip:: In certain cases, it may be necessary to either write `your own serialization protocol <object-store.html#custom-serialization>`_ or use Actors to hold objects and transfer object state (i.e., weight matrices) among Ray workers.
Advanced: Huge Pages
~~~~~~~~~~~~~~~~~~~~
On Linux, it is possible to increase the write throughput of the Plasma object store by using huge pages. See the `Configuration page <configure.html#using-the-object-store-with-huge-pages>`_ for information on how to use huge pages in Ray.
.. _`Apache Arrow`: https://arrow.apache.org/
Serialization Overview
----------------------
Overview
--------
Objects that are serialized for transfer among Ray processes go through three stages:
@@ -73,57 +25,14 @@ Objects that are serialized for transfer among Ray processes go through three st
**3. Cloudpickle**: Ray falls back to ``cloudpickle`` as a final attempt for serialization. This may be slow.
Custom Serialization
~~~~~~~~~~~~~~~~~~~~
If none of these options work, we recommend registering a custom serializer.
.. autofunction:: ray.register_custom_serializer
:noindex:
Below is an example of using ``ray.register_custom_serializer``:
.. code-block:: python
import ray
ray.init()
class Foo(object):
def __init__(self, value):
self.value = value
def custom_serializer(obj):
return obj.value
def custom_deserializer(value):
object = Foo()
object.value = value
return object
ray.register_custom_serializer(
Foo, serializer=custom_serializer, deserializer=custom_deserializer)
object_id = ray.put(Foo(100))
assert ray.get(object_id).value == 100
If you find cases where Ray serialization doesn't work or does something unexpected, please `let us know`_ so we can fix it.
.. _`let us know`: https://github.com/ray-project/ray/issues
Serialization: Numpy Arrays
---------------------------
Numpy Arrays
------------
Ray optimizes for numpy arrays by using the `Apache Arrow`_ data format.
The numpy array is stored as a read-only object, and all Ray workers on the same node can read the numpy array in the object store without copying (zero-copy reads). Each numpy array object in the worker process holds a pointer to the relevant array held in shared memory. Any writes to the read-only object will require the user to first copy it into the local process memory.
There are some advantages to this form of serialization:
- Deserialization can be very fast.
- Memory is shared between processes so worker processes can all read the same
data without having to copy it.
.. tip:: You can often avoid serialization issues by using only native types (e.g., numpy arrays or lists/dicts of numpy arrays and other primitive types), or by using Actors hold objects that cannot be serialized.
Serialization notes and limitations
-----------------------------------
@@ -159,4 +68,52 @@ Serialization notes and limitations
This object exceeds the maximum recursion depth. It may contain itself recursively.
- Whenever possible, use numpy arrays for maximum performance.
- Whenever possible, use numpy arrays or Python collections of numpy arrays for maximum performance.
Last resort: Custom Serialization
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If none of these options work, you can try registering a custom serializer.
.. autofunction:: ray.register_custom_serializer
:noindex:
Below is an example of using ``ray.register_custom_serializer``:
.. code-block:: python
import ray
ray.init()
class Foo(object):
def __init__(self, value):
self.value = value
def custom_serializer(obj):
return obj.value
def custom_deserializer(value):
object = Foo()
object.value = value
return object
ray.register_custom_serializer(
Foo, serializer=custom_serializer, deserializer=custom_deserializer)
object_id = ray.put(Foo(100))
assert ray.get(object_id).value == 100
If you find cases where Ray serialization doesn't work or does something unexpected, please `let us know`_ so we can fix it.
.. _`let us know`: https://github.com/ray-project/ray/issues
Advanced: Huge Pages
~~~~~~~~~~~~~~~~~~~~
On Linux, it is possible to increase the write throughput of the Plasma object store by using huge pages. See the `Configuration page <configure.html#using-the-object-store-with-huge-pages>`_ for information on how to use huge pages in Ray.
.. _`Apache Arrow`: https://arrow.apache.org/
+263 -76
View File
@@ -1,5 +1,230 @@
Troubleshooting and FAQs
========================
Debugging and Profiling
=======================
Observing Ray Work
------------------
You can run ``ray stack`` to dump the stack traces of all Ray workers on
the current node. This requires ``py-spy`` to be installed. See the `Troubleshooting page <troubleshooting.html>`_ for more details.
Visualizing Tasks in the Ray Timeline
-------------------------------------
The most important tool is the timeline visualization tool. To visualize tasks
in the Ray timeline, you can dump the timeline as a JSON file by running ``ray
timeline`` from the command line or by using the following command.
.. code-block:: python
ray.timeline(filename="/tmp/timeline.json")
Then open `chrome://tracing`_ in the Chrome web browser, and load
``timeline.json``.
.. _`chrome://tracing`: chrome://tracing
Profiling Using Python's CProfile
---------------------------------
A second way to profile the performance of your Ray application is to
use Python's native cProfile `profiling module`_. Rather than tracking
line-by-line of your application code, cProfile can give the total runtime
of each loop function, as well as list the number of calls made and
execution time of all function calls made within the profiled code.
.. _`profiling module`: https://docs.python.org/3/library/profile.html#module-cProfile
Unlike ``line_profiler`` above, this detailed list of profiled function calls
**includes** internal function calls and function calls made within Ray!
However, similar to ``line_profiler``, cProfile can be enabled with minimal
changes to your application code (given that each section of the code you want
to profile is defined as its own function). To use cProfile, add an import
statement, then replace calls to the loop functions as follows:
.. code-block:: python
import cProfile # Added import statement
def ex1():
list1 = []
for i in range(5):
list1.append(ray.get(func.remote()))
def main():
ray.init()
cProfile.run('ex1()') # Modified call to ex1
cProfile.run('ex2()')
cProfile.run('ex3()')
if __name__ == "__main__":
main()
Now, when executing your Python script, a cProfile list of profiled function
calls will be outputted to terminal for each call made to ``cProfile.run()``.
At the very top of cProfile's output gives the total execution time for
``'ex1()'``:
.. code-block:: bash
601 function calls (595 primitive calls) in 2.509 seconds
Following is a snippet of profiled function calls for ``'ex1()'``. Most of
these calls are quick and take around 0.000 seconds, so the functions of
interest are the ones with non-zero execution times:
.. code-block:: bash
ncalls tottime percall cumtime percall filename:lineno(function)
...
1 0.000 0.000 2.509 2.509 your_script_here.py:31(ex1)
5 0.000 0.000 0.001 0.000 remote_function.py:103(remote)
5 0.000 0.000 0.001 0.000 remote_function.py:107(_submit)
...
10 0.000 0.000 0.000 0.000 worker.py:2459(__init__)
5 0.000 0.000 2.508 0.502 worker.py:2535(get)
5 0.000 0.000 0.000 0.000 worker.py:2695(get_global_worker)
10 0.000 0.000 2.507 0.251 worker.py:374(retrieve_and_deserialize)
5 0.000 0.000 2.508 0.502 worker.py:424(get_object)
5 0.000 0.000 0.000 0.000 worker.py:514(submit_task)
...
The 5 separate calls to Ray's ``get``, taking the full 0.502 seconds each call,
can be noticed at ``worker.py:2535(get)``. Meanwhile, the act of calling the
remote function itself at ``remote_function.py:103(remote)`` only takes 0.001
seconds over 5 calls, and thus is not the source of the slow performance of
``ex1()``.
Profiling Ray Actors with cProfile
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Considering that the detailed output of cProfile can be quite different depending
on what Ray functionalities we use, let us see what cProfile's output might look
like if our example involved Actors (for an introduction to Ray actors, see our
`Actor documentation here`_).
.. _`Actor documentation here`: http://ray.readthedocs.io/en/latest/actors.html
Now, instead of looping over five calls to a remote function like in ``ex1``,
let's create a new example and loop over five calls to a remote function
**inside an actor**. Our actor's remote function again just sleeps for 0.5
seconds:
.. code-block:: python
# Our actor
@ray.remote
class Sleeper(object):
def __init__(self):
self.sleepValue = 0.5
# Equivalent to func(), but defined within an actor
def actor_func(self):
time.sleep(self.sleepValue)
Recalling the suboptimality of ``ex1``, let's first see what happens if we
attempt to perform all five ``actor_func()`` calls within a single actor:
.. code-block:: python
def ex4():
# This is suboptimal in Ray, and should only be used for the sake of this example
actor_example = Sleeper.remote()
five_results = []
for i in range(5):
five_results.append(actor_example.actor_func.remote())
# Wait until the end to call ray.get()
ray.get(five_results)
We enable cProfile on this example as follows:
.. code-block:: python
def main():
ray.init()
cProfile.run('ex4()')
if __name__ == "__main__":
main()
Running our new Actor example, cProfile's abbreviated output is as follows:
.. code-block:: bash
12519 function calls (11956 primitive calls) in 2.525 seconds
ncalls tottime percall cumtime percall filename:lineno(function)
...
1 0.000 0.000 0.015 0.015 actor.py:546(remote)
1 0.000 0.000 0.015 0.015 actor.py:560(_submit)
1 0.000 0.000 0.000 0.000 actor.py:697(__init__)
...
1 0.000 0.000 2.525 2.525 your_script_here.py:63(ex4)
...
9 0.000 0.000 0.000 0.000 worker.py:2459(__init__)
1 0.000 0.000 2.509 2.509 worker.py:2535(get)
9 0.000 0.000 0.000 0.000 worker.py:2695(get_global_worker)
4 0.000 0.000 2.508 0.627 worker.py:374(retrieve_and_deserialize)
1 0.000 0.000 2.509 2.509 worker.py:424(get_object)
8 0.000 0.000 0.001 0.000 worker.py:514(submit_task)
...
It turns out that the entire example still took 2.5 seconds to execute, or the
time for five calls to ``actor_func()`` to run in serial. We remember in ``ex1``
that this behavior was because we did not wait until after submitting all five
remote function tasks to call ``ray.get()``, but we can verify on cProfile's
output line ``worker.py:2535(get)`` that ``ray.get()`` was only called once at
the end, for 2.509 seconds. What happened?
It turns out Ray cannot parallelize this example, because we have only
initialized a single ``Sleeper`` actor. Because each actor is a single,
stateful worker, our entire code is submitted and ran on a single worker the
whole time.
To better parallelize the actors in ``ex4``, we can take advantage
that each call to ``actor_func()`` is independent, and instead
create five ``Sleeper`` actors. That way, we are creating five workers
that can run in parallel, instead of creating a single worker that
can only handle one call to ``actor_func()`` at a time.
.. code-block:: python
def ex4():
# Modified to create five separate Sleepers
five_actors = [Sleeper.remote() for i in range(5)]
# Each call to actor_func now goes to a different Sleeper
five_results = []
for actor_example in five_actors:
five_results.append(actor_example.actor_func.remote())
ray.get(five_results)
Our example in total now takes only 1.5 seconds to run:
.. code-block:: bash
1378 function calls (1363 primitive calls) in 1.567 seconds
ncalls tottime percall cumtime percall filename:lineno(function)
...
5 0.000 0.000 0.002 0.000 actor.py:546(remote)
5 0.000 0.000 0.002 0.000 actor.py:560(_submit)
5 0.000 0.000 0.000 0.000 actor.py:697(__init__)
...
1 0.000 0.000 1.566 1.566 your_script_here.py:71(ex4)
...
21 0.000 0.000 0.000 0.000 worker.py:2459(__init__)
1 0.000 0.000 1.564 1.564 worker.py:2535(get)
25 0.000 0.000 0.000 0.000 worker.py:2695(get_global_worker)
3 0.000 0.000 1.564 0.521 worker.py:374(retrieve_and_deserialize)
1 0.000 0.000 1.564 1.564 worker.py:424(get_object)
20 0.001 0.000 0.001 0.000 worker.py:514(submit_task)
...
This document discusses some common problems that people run into when using Ray
as well as some known problems. If you encounter other problems, please
@@ -7,6 +232,42 @@ as well as some known problems. If you encounter other problems, please
.. _`let us know`: https://github.com/ray-project/ray/issues
Crashes
-------
If Ray crashed, you may wonder what happened. Currently, this can occur for some
of the following reasons.
- **Stressful workloads:** Workloads that create many many tasks in a short
amount of time can sometimes interfere with the heartbeat mechanism that we
use to check that processes are still alive. On the head node in the cluster,
you can check the files ``/tmp/ray/session_*/logs/monitor*``. They will
indicate which processes Ray has marked as dead (due to a lack of heartbeats).
However, it is currently possible for a process to get marked as dead without
actually having died.
- **Starting many actors:** Workloads that start a large number of actors all at
once may exhibit problems when the processes (or libraries that they use)
contend for resources. Similarly, a script that starts many actors over the
lifetime of the application will eventually cause the system to run out of
file descriptors. This is addressable, but currently we do not garbage collect
actor processes until the script finishes.
- **Running out of file descriptors:** As a workaround, you may be able to
increase the maximum number of file descriptors with a command like
``ulimit -n 65536``. If that fails, double check that the hard limit is
sufficiently large by running ``ulimit -Hn``. If it is too small, you can
increase the hard limit as follows (these instructions work on EC2).
* Increase the hard ulimit for open file descriptors system-wide by running
the following.
.. code-block:: bash
sudo bash -c "echo $USER hard nofile 65536 >> /etc/security/limits.conf"
* Logout and log back in.
No Speedup
----------
@@ -56,80 +317,6 @@ submitting a minimal code example that demonstrates the problem.
.. _`Github issue`: https://github.com/ray-project/ray/issues
Crashes
-------
If Ray crashed, you may wonder what happened. Currently, this can occur for some
of the following reasons.
- **Stressful workloads:** Workloads that create many many tasks in a short
amount of time can sometimes interfere with the heartbeat mechanism that we
use to check that processes are still alive. On the head node in the cluster,
you can check the files ``/tmp/ray/session_*/logs/monitor*``. They will
indicate which processes Ray has marked as dead (due to a lack of heartbeats).
However, it is currently possible for a process to get marked as dead without
actually having died.
- **Starting many actors:** Workloads that start a large number of actors all at
once may exhibit problems when the processes (or libraries that they use)
contend for resources. Similarly, a script that starts many actors over the
lifetime of the application will eventually cause the system to run out of
file descriptors. This is addressable, but currently we do not garbage collect
actor processes until the script finishes.
- **Running out of file descriptors:** As a workaround, you may be able to
increase the maximum number of file descriptors with a command like
``ulimit -n 65536``. If that fails, double check that the hard limit is
sufficiently large by running ``ulimit -Hn``. If it is too small, you can
increase the hard limit as follows (these instructions work on EC2).
* Increase the hard ulimit for open file descriptors system-wide by running
the following.
.. code-block:: bash
sudo bash -c "echo $USER hard nofile 65536 >> /etc/security/limits.conf"
* Logout and log back in.
Hanging
-------
.. tip::
You can run ``ray stack`` to dump the stack traces of all Ray workers on
the current node. This requires py-spy to be installed.
If a workload is hanging and not progressing, the problem may be one of the
following.
- **Reconstructing an object created with put:** When an object that is needed
has been evicted or lost, Ray will attempt to rerun the task that created the
object. However, there are some cases that currently are not handled. For
example, if the object was created by a call to ``ray.put`` on the driver
process, then the argument that was passed into ``ray.put`` is no longer
available and so the call to ``ray.put`` cannot be rerun (without rerunning
the driver).
- **Reconstructing an object created by actor task:** Ray currently does not
reconstruct objects created by actor methods.
Serialization Problems
----------------------
Ray's serialization is currently imperfect. If you encounter an object that
Ray does not serialize/deserialize correctly, please let us know. For example,
you may want to bring it up on `this thread`_.
- `Objects with multiple references to the same object`_.
- `Subtypes of lists, dictionaries, or tuples`_.
.. _`this thread`: https://github.com/ray-project/ray/issues/557
.. _`Objects with multiple references to the same object`: https://github.com/ray-project/ray/issues/319
.. _`Subtypes of lists, dictionaries, or tuples`: https://github.com/ray-project/ray/issues/512
Outdated Function Definitions
-----------------------------
-231
View File
@@ -1,231 +0,0 @@
How-to: Profile Ray Programs
============================
Profiling the performance of your code can be very helpful to determine
performance bottlenecks or to find out where your code may not be parallelized
properly.
Visualizing Tasks in the Ray Timeline
-------------------------------------
The most important tool is the timeline visualization tool. To visualize tasks
in the Ray timeline, you can dump the timeline as a JSON file by running ``ray
timeline`` from the command line or by using the following command.
.. code-block:: python
ray.timeline(filename="/tmp/timeline.json")
Then open `chrome://tracing`_ in the Chrome web browser, and load
``timeline.json``.
.. _`chrome://tracing`: chrome://tracing
Observing Ray Work
------------------
You can run ``ray stack`` to dump the stack traces of all Ray workers on
the current node. This requires ``py-spy`` to be installed. See the `Troubleshooting page <troubleshooting.html>`_ for more details.
Profiling Using Python's CProfile
---------------------------------
A second way to profile the performance of your Ray application is to
use Python's native cProfile `profiling module`_. Rather than tracking
line-by-line of your application code, cProfile can give the total runtime
of each loop function, as well as list the number of calls made and
execution time of all function calls made within the profiled code.
.. _`profiling module`: https://docs.python.org/3/library/profile.html#module-cProfile
Unlike ``line_profiler`` above, this detailed list of profiled function calls
**includes** internal function calls and function calls made within Ray!
However, similar to ``line_profiler``, cProfile can be enabled with minimal
changes to your application code (given that each section of the code you want
to profile is defined as its own function). To use cProfile, add an import
statement, then replace calls to the loop functions as follows:
.. code-block:: python
import cProfile # Added import statement
def ex1():
list1 = []
for i in range(5):
list1.append(ray.get(func.remote()))
def main():
ray.init()
cProfile.run('ex1()') # Modified call to ex1
cProfile.run('ex2()')
cProfile.run('ex3()')
if __name__ == "__main__":
main()
Now, when executing your Python script, a cProfile list of profiled function
calls will be outputted to terminal for each call made to ``cProfile.run()``.
At the very top of cProfile's output gives the total execution time for
``'ex1()'``:
.. code-block:: bash
601 function calls (595 primitive calls) in 2.509 seconds
Following is a snippet of profiled function calls for ``'ex1()'``. Most of
these calls are quick and take around 0.000 seconds, so the functions of
interest are the ones with non-zero execution times:
.. code-block:: bash
ncalls tottime percall cumtime percall filename:lineno(function)
...
1 0.000 0.000 2.509 2.509 your_script_here.py:31(ex1)
5 0.000 0.000 0.001 0.000 remote_function.py:103(remote)
5 0.000 0.000 0.001 0.000 remote_function.py:107(_submit)
...
10 0.000 0.000 0.000 0.000 worker.py:2459(__init__)
5 0.000 0.000 2.508 0.502 worker.py:2535(get)
5 0.000 0.000 0.000 0.000 worker.py:2695(get_global_worker)
10 0.000 0.000 2.507 0.251 worker.py:374(retrieve_and_deserialize)
5 0.000 0.000 2.508 0.502 worker.py:424(get_object)
5 0.000 0.000 0.000 0.000 worker.py:514(submit_task)
...
The 5 separate calls to Ray's ``get``, taking the full 0.502 seconds each call,
can be noticed at ``worker.py:2535(get)``. Meanwhile, the act of calling the
remote function itself at ``remote_function.py:103(remote)`` only takes 0.001
seconds over 5 calls, and thus is not the source of the slow performance of
``ex1()``.
Profiling Ray Actors with cProfile
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Considering that the detailed output of cProfile can be quite different depending
on what Ray functionalities we use, let us see what cProfile's output might look
like if our example involved Actors (for an introduction to Ray actors, see our
`Actor documentation here`_).
.. _`Actor documentation here`: http://ray.readthedocs.io/en/latest/actors.html
Now, instead of looping over five calls to a remote function like in ``ex1``,
let's create a new example and loop over five calls to a remote function
**inside an actor**. Our actor's remote function again just sleeps for 0.5
seconds:
.. code-block:: python
# Our actor
@ray.remote
class Sleeper(object):
def __init__(self):
self.sleepValue = 0.5
# Equivalent to func(), but defined within an actor
def actor_func(self):
time.sleep(self.sleepValue)
Recalling the suboptimality of ``ex1``, let's first see what happens if we
attempt to perform all five ``actor_func()`` calls within a single actor:
.. code-block:: python
def ex4():
# This is suboptimal in Ray, and should only be used for the sake of this example
actor_example = Sleeper.remote()
five_results = []
for i in range(5):
five_results.append(actor_example.actor_func.remote())
# Wait until the end to call ray.get()
ray.get(five_results)
We enable cProfile on this example as follows:
.. code-block:: python
def main():
ray.init()
cProfile.run('ex4()')
if __name__ == "__main__":
main()
Running our new Actor example, cProfile's abbreviated output is as follows:
.. code-block:: bash
12519 function calls (11956 primitive calls) in 2.525 seconds
ncalls tottime percall cumtime percall filename:lineno(function)
...
1 0.000 0.000 0.015 0.015 actor.py:546(remote)
1 0.000 0.000 0.015 0.015 actor.py:560(_submit)
1 0.000 0.000 0.000 0.000 actor.py:697(__init__)
...
1 0.000 0.000 2.525 2.525 your_script_here.py:63(ex4)
...
9 0.000 0.000 0.000 0.000 worker.py:2459(__init__)
1 0.000 0.000 2.509 2.509 worker.py:2535(get)
9 0.000 0.000 0.000 0.000 worker.py:2695(get_global_worker)
4 0.000 0.000 2.508 0.627 worker.py:374(retrieve_and_deserialize)
1 0.000 0.000 2.509 2.509 worker.py:424(get_object)
8 0.000 0.000 0.001 0.000 worker.py:514(submit_task)
...
It turns out that the entire example still took 2.5 seconds to execute, or the
time for five calls to ``actor_func()`` to run in serial. We remember in ``ex1``
that this behavior was because we did not wait until after submitting all five
remote function tasks to call ``ray.get()``, but we can verify on cProfile's
output line ``worker.py:2535(get)`` that ``ray.get()`` was only called once at
the end, for 2.509 seconds. What happened?
It turns out Ray cannot parallelize this example, because we have only
initialized a single ``Sleeper`` actor. Because each actor is a single,
stateful worker, our entire code is submitted and ran on a single worker the
whole time.
To better parallelize the actors in ``ex4``, we can take advantage
that each call to ``actor_func()`` is independent, and instead
create five ``Sleeper`` actors. That way, we are creating five workers
that can run in parallel, instead of creating a single worker that
can only handle one call to ``actor_func()`` at a time.
.. code-block:: python
def ex4():
# Modified to create five separate Sleepers
five_actors = [Sleeper.remote() for i in range(5)]
# Each call to actor_func now goes to a different Sleeper
five_results = []
for actor_example in five_actors:
five_results.append(actor_example.actor_func.remote())
ray.get(five_results)
Our example in total now takes only 1.5 seconds to run:
.. code-block:: bash
1378 function calls (1363 primitive calls) in 1.567 seconds
ncalls tottime percall cumtime percall filename:lineno(function)
...
5 0.000 0.000 0.002 0.000 actor.py:546(remote)
5 0.000 0.000 0.002 0.000 actor.py:560(_submit)
5 0.000 0.000 0.000 0.000 actor.py:697(__init__)
...
1 0.000 0.000 1.566 1.566 your_script_here.py:71(ex4)
...
21 0.000 0.000 0.000 0.000 worker.py:2459(__init__)
1 0.000 0.000 1.564 1.564 worker.py:2535(get)
25 0.000 0.000 0.000 0.000 worker.py:2695(get_global_worker)
3 0.000 0.000 1.564 0.521 worker.py:374(retrieve_and_deserialize)
1 0.000 0.000 1.564 1.564 worker.py:424(get_object)
20 0.001 0.000 0.001 0.000 worker.py:514(submit_task)
...
+5 -18
View File
@@ -1,5 +1,5 @@
How-to: Using Ray with GPUs
===========================
GPU Support
===========
GPUs are critical for many machine learning applications. Ray enables remote
functions and actors to specify their GPU requirements in the ``ray.remote``
@@ -8,22 +8,9 @@ decorator.
Starting Ray with GPUs
----------------------
In order for remote functions and actors to use GPUs, Ray must know how many
GPUs are available. If you are starting Ray on a single machine, you can specify
the number of GPUs as follows.
.. code-block:: python
ray.init(num_gpus=4)
If you don't pass in the ``num_gpus`` argument, Ray will automatically detect the number of GPUs available.
If you are starting Ray with the ``ray start`` command, you can indicate the
number of GPUs on the machine with the ``--num-gpus`` argument.
.. code-block:: bash
ray start --head --num-gpus=4
Ray will automatically detect the number of GPUs available on a machine.
If you need to, you can override this by specifying ``ray.init(num_gpus=N)`` or
``ray start --num-gpus=N``.
**Note:** There is nothing preventing you from passing in a larger value of
``num_gpus`` than the true number of GPUs on the machine. In this case, Ray will
+22 -6
View File
@@ -95,7 +95,7 @@ Note the following behaviors:
- The second task will not be executed until the first task has finished
executing because the second task depends on the output of the first task.
- If the two tasks are scheduled on different machines, the output of the
first task (the value corresponding to ``x1_id``) will be sent over the
first task (the value corresponding to ``y1_id``) will be sent over the
network to the machine where the second task is scheduled.
Oftentimes, you may want to specify a task's resource requirements (for example
@@ -114,7 +114,7 @@ to execute the task. Ray can also handle arbitrary custom resources.
* If you do not specify any resources in the ``@ray.remote`` decorator, the
default is 1 CPU resource and no other resources.
* If specifying CPUs, Ray does not enforce isolation (i.e., your task is
expected to honor its request.)
expected to honor its request).
* If specifying GPUs, Ray does provide isolation in forms of visible devices
(setting the environment variable ``CUDA_VISIBLE_DEVICES``), but it is the
task's responsibility to actually use the GPUs (e.g., through a deep
@@ -159,7 +159,7 @@ Further, remote function can return multiple object IDs.
Objects in Ray
--------------
In Ray, we can create and compute on objects. We refer to these objects as **remote objects**, and we use **object IDs** to refer to them. Remote objects are stored in **object stores**, and there is one object store per node in the cluster. In the cluster setting, we may not actually know which machine each object lives on.
In Ray, we can create and compute on objects. We refer to these objects as **remote objects**, and we use **object IDs** to refer to them. Remote objects are stored in `shared-memory <https://en.wikipedia.org/wiki/Shared_memory>`__ **object stores**, and there is one object store per node in the cluster. In the cluster setting, we may not actually know which machine each object lives on.
An **object ID** is essentially a unique ID that can be used to refer to a
remote object. If you're familiar with futures, our object IDs are conceptually
@@ -178,7 +178,6 @@ Object IDs can be created in multiple ways.
.. autofunction:: ray.put
:noindex:
.. important::
Remote objects are immutable. That is, their values cannot be changed after
@@ -190,8 +189,10 @@ Fetching Results
----------------
The command ``ray.get(x_id)`` takes an object ID and creates a Python object
from the corresponding remote object. For some objects like arrays, we can use
shared memory and avoid copying the object.
from the corresponding remote object. First, if the current node's object store
does not contain the object, the object is downloaded. Then, if the object is a `numpy array <https://docs.scipy.org/doc/numpy/reference/generated/numpy.array.html>`__
or a collection of numpy arrays, the ``get`` call is zero-copy and returns arrays backed by shared object store memory.
Otherwise, we deserialize the object data into a Python object.
.. code-block:: python
@@ -214,6 +215,21 @@ works as follows.
.. autofunction:: ray.wait
:noindex:
Object Eviction
---------------
When the object store gets full, objects will be evicted to make room for new objects.
This happens in approximate LRU (least recently used) order. To avoid objects from
being evicted, you can call ``ray.get`` and store their values instead. Numpy array
objects cannot be evicted while they are mapped in any Python process. You can also
configure `memory limits <memory-management.html>`__ to control object store usage by
actors.
.. note::
Objects created with ``ray.put`` are pinned in memory while a Python reference
to the object ID returned by the put exists. This only applies to the specific
ID returned by put, not IDs in general or copies of that IDs.
Remote Classes (Actors)
-----------------------
+19 -1
View File
@@ -147,7 +147,8 @@ class ModelV2(object):
restored["obs"] = restore_original_dimensions(
input_dict["obs"], self.obs_space, self.framework)
restored["obs_flat"] = input_dict["obs"]
res = self.forward(restored, state or [], seq_lens)
with self.context():
res = self.forward(restored, state or [], seq_lens)
if ((not isinstance(res, list) and not isinstance(res, tuple))
or len(res) != 2):
raise ValueError(
@@ -195,3 +196,20 @@ class ModelV2(object):
def last_output(self):
"""Returns the last output returned from calling the model."""
return self._last_output
def context(self):
"""Returns a contextmanager for the current forward pass."""
return NullContextManager()
class NullContextManager(object):
"""No-op context manager"""
def __init__(self):
pass
def __enter__(self):
pass
def __exit__(self, *args):
pass
+11
View File
@@ -43,6 +43,17 @@ class TFModelV2(ModelV2):
name,
framework="tf")
self.var_list = []
if tf.executing_eagerly():
self.graph = None
else:
self.graph = tf.get_default_graph()
def context(self):
"""Returns a contextmanager for the current TF graph."""
if self.graph:
return self.graph.as_default()
else:
return ModelV2.context(self)
def forward(self, input_dict, state, seq_lens):
"""Call the model with the given input tensors and state.