diff --git a/doc/source/dask-on-ray.rst b/doc/source/dask-on-ray.rst
index b5383ac8b..0530fdc4c 100644
--- a/doc/source/dask-on-ray.rst
+++ b/doc/source/dask-on-ray.rst
@@ -1,22 +1,32 @@
-***********
Dask on Ray
-***********
+===========
-Ray offers a scheduler integration for Dask, allowing you to build data
-analyses using the familiar Dask collections (dataframes, arrays) and execute
-the underlying computations on a Ray cluster. Using this Dask scheduler, the
-entire Dask ecosystem can be executed on top of Ray.
+.. _dask-on-ray:
-.. note::
+`Dask `__ is a Python parallel computing library geared towards scaling analytics and
+scientific computing workloads. It provides `big data collections
+`__ that mimic the APIs of
+the familiar `NumPy `__ and `Pandas `__ libraries,
+allowing those abstractions to represent
+larger-than-memory data and/or allowing operations on that data to be run on a multi-machine cluster,
+while also providing automatic data parallelism, smart scheduling,
+and optimized operations. Operations on these collections create a task graph, which is
+executed by a scheduler.
- Note that Ray does not currently support object spilling, and hence cannot
- process datasets larger than cluster memory. This is a planned feature.
+Ray provides a scheduler for Dask (`dask_on_ray`) which allows you to build data
+analyses using Dask's collections and execute
+the underlying tasks on a Ray cluster.
+
+`dask_on_ray` uses Dask's scheduler API, which allows you to
+specify any callable as the scheduler that you would like Dask to use to execute your
+workload. Using the Dask-on-Ray scheduler, the entire Dask ecosystem can be executed on top of Ray.
-=========
Scheduler
-=========
+---------
-The Dask-Ray scheduler can execute any valid Dask graph, and can be used with
+.. _dask-on-ray-scheduler:
+
+The Dask-on-Ray scheduler can execute any valid Dask graph, and can be used with
any Dask `.compute() `__
call.
Here's an example:
@@ -25,53 +35,99 @@ Here's an example:
import ray
from ray.util.dask import ray_dask_get
- import dask.delayed
+ import dask.array as da
+ import dask.dataframe as dd
+ import numpy as np
+ import pandas as pd
import time
# Start Ray.
# Tip: If you're connecting to an existing cluster, use ray.init(address="auto").
ray.init()
+ d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))
- @dask.delayed
- def inc(x):
- time.sleep(1)
- return x + 1
-
- @dask.delayed
- def add(x, y):
- time.sleep(3)
- return x + y
-
- x = inc(1)
- y = inc(2)
- z = add(x, y)
# The Dask scheduler submits the underlying task graph to Ray.
- z.compute(scheduler=ray_dask_get)
+ d_arr.mean().compute(scheduler=ray_dask_get)
+
+ # Set the scheduler to ray_dask_get in your config so you don't have to specify it on
+ # each compute call.
+ dask.config.set(scheduler=ray_dask_get)
+
+ df = dd.from_pandas(pd.DataFrame(
+ np.random.randint(0, 100, size=(1024, 2)),
+ columns=["age", "grade"]))
+ df.groupby(["age"]).mean().compute()
+
+
+.. note::
+ For execution on a Ray cluster, you should *not* use the
+ `Dask.distributed `__
+ client; simply use plain Dask and its collections, and pass ``ray_dask_get``
+ to ``.compute()`` calls or set the scheduler in one of the other ways detailed `here `__. Follow the instructions for
+ :ref:`using Ray on a cluster ` to modify the
+ ``ray.init()`` call.
Why use Dask on Ray?
- 1. If you'd like to create data analyses using the familiar NumPy and Pandas
- APIs provided by Dask and execute them on a production-ready distributed
- task execution system like Ray.
- 2. If you'd like to use Dask and Ray libraries in the same application
- without having two different task execution backends.
- 3. To take advantage of Ray-specific features such as the
+1. To take advantage of Ray-specific features such as the
:ref:`cluster launcher ` and
:ref:`shared-memory store `.
+2. If you'd like to use Dask and Ray libraries in the same application without having two different clusters.
+3. If you'd like to create data analyses using the familiar NumPy and Pandas APIs provided by Dask and execute them on a fast, fault-tolerant distributed task execution system geared towards production, like Ray.
-Note that for execution on a Ray cluster, you should *not* use the
-`Dask.distributed `__
-client; simply use plain Dask and its collections, and pass ``ray_dask_get``
-to ``.compute()`` calls. Follow the instructions for
-:ref:`using Ray on a cluster ` to modify the
-``ray.init()`` call.
+Dask-on-Ray is an ongoing project and is not expected to achieve the same performance as using Ray directly. All `Dask abstractions `__ should run seamlessly on top of Ray using this scheduler, so if you find that one of these abstractions doesn't run on Ray, please `open an issue `__.
-Dask-on-Ray is an ongoing project and is not expected to achieve the same performance as using Ray directly.
+Out-of-Core Data Processing
+---------------------------
+
+.. _dask-on-ray-out-of-core:
+
+Processing datasets larger than cluster memory is supported via Ray's :ref:`object spilling `: if
+the in-memory object store is full, objects will be spilled to external storage (local disk by
+default). This feature is available but off by default in Ray 1.2, and is on by default
+in Ray 1.3+. Please see your Ray version's object spilling documentation for steps to enable and/or configure
+object spilling.
+
+Custom optimization for Dask DataFrame shuffling
+------------------------------------------------
+
+.. _dask-on-ray-shuffle-optimization:
+
+Dask on Ray provides a Dask DataFrame optimizer that leverages Ray's ability to
+execute multiple-return tasks in order to speed up shuffling by as much as 4x on Ray.
+Simply set the `dataframe_optimize` configuration option to our optimizer function, similar to how you specify the Dask-on-Ray scheduler:
+
+.. code-block:: python
+
+ import ray
+ from ray.util.dask import ray_dask_get, dataframe_optimize
+ import dask.dataframe as dd
+ import numpy as np
+ import pandas as pd
+ import time
+
+ # Start Ray.
+ # Tip: If you're connecting to an existing cluster, use ray.init(address="auto").
+ ray.init()
+
+ # Set the scheduler to ray_dask_get, and set the Dask DataFrame optimizer to our
+ # custom optimization function, this time using the config setter as a context manager.
+ with dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize):
+ npartitions = 100
+ df = dd.from_pandas(pd.DataFrame(
+ np.random.randint(0, 100, size=(10000, 2)),
+ columns=["age", "grade"]), npartitions=npartitions)
+ # We set max_branch to infinity in order to ensure that the task-based shuffle
+ # happens in a single stage, which is required in order for our optimization to
+ # work.
+ df.set_index(
+ ["age"], shuffle="tasks", max_branch=float("inf")).head(10, npartitions=-1)
-=========
Callbacks
-=========
+---------
+
+.. _dask-on-ray-callbacks:
Dask's `custom callback abstraction `__
is extended with Ray-specific callbacks, allowing the user to hook into the
@@ -208,11 +264,12 @@ execution time exceeds some user-defined threshold:
with cache_callback:
z.compute(scheduler=ray_dask_get)
-Note that the existing Dask scheduler callbacks (``start``, ``start_state``,
-``pretask``, ``posttask``, ``finish``) are also available, which can be used to
-introspect the Dask task to Ray task conversion process, but that ``pretask``
-and ``posttask`` are executed before and after the Ray task is *submitted*, not
-executed, and that ``finish`` is executed after all Ray tasks have been
-*submitted*, not executed.
+.. note::
+ The existing Dask scheduler callbacks (``start``, ``start_state``,
+ ``pretask``, ``posttask``, ``finish``) are also available, which can be used to
+ introspect the Dask task to Ray task conversion process, but note that the ``pretask``
+ and ``posttask`` hooks are executed before and after the Ray task is *submitted*, not
+ executed, and that ``finish`` is executed after all Ray tasks have been
+ *submitted*, not executed.
This callback API is currently unstable and subject to change.
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 277c82e55..e90b52299 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -296,6 +296,16 @@ Papers
raysgd/raysgd_tune.rst
raysgd/raysgd_ref.rst
+.. toctree::
+ :hidden:
+ :maxdepth: -1
+ :caption: Data Processing
+
+ modin/index.rst
+ dask-on-ray.rst
+ mars-on-ray.rst
+ raydp.rst
+
.. toctree::
:hidden:
:maxdepth: -1
@@ -305,10 +315,6 @@ Papers
joblib.rst
iter.rst
xgboost-ray.rst
- modin/index.rst
- dask-on-ray.rst
- mars-on-ray.rst
- raydp.rst
ray-client.rst
.. toctree::
diff --git a/doc/source/memory-management.rst b/doc/source/memory-management.rst
index 8892800a6..f12f7efef 100644
--- a/doc/source/memory-management.rst
+++ b/doc/source/memory-management.rst
@@ -179,6 +179,7 @@ In the output of ``ray memory``, we see that the second object displays as a nor
Object Spilling
---------------
+.. _object-spilling:
Ray 1.3+ spills objects to external storage once the object store is full. By default, objects are spilled to the local filesystem.
To configure the directory where objects are placed, use: