[dask-on-ray] Add better Dask-on-Ray example, and detail custom shuffle optimization. (#13950)

* Add better Dask-on-Ray example, and detail custom shuffle optimization.

* Misc. updates and feedback.

* Update doc/source/dask-on-ray.rst

Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>

* Set max_branch to infinity in shuffle optimization example.

* Feedback

* Apply suggestions from code review

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* 80 col width

Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Clark Zinzow
2021-02-10 15:24:09 -07:00
committed by GitHub
parent 05ab75fbe1
commit c5574a33e4
3 changed files with 116 additions and 52 deletions
+105 -48
View File
@@ -1,22 +1,32 @@
***********
Dask on Ray
***********
===========
Ray offers a scheduler integration for Dask, allowing you to build data
analyses using the familiar Dask collections (dataframes, arrays) and execute
the underlying computations on a Ray cluster. Using this Dask scheduler, the
entire Dask ecosystem can be executed on top of Ray.
.. _dask-on-ray:
.. note::
`Dask <https://dask.org/>`__ is a Python parallel computing library geared towards scaling analytics and
scientific computing workloads. It provides `big data collections
<https://docs.dask.org/en/latest/user-interfaces.html>`__ that mimic the APIs of
the familiar `NumPy <https://numpy.org/>`__ and `Pandas <https://pandas.pydata.org/>`__ libraries,
allowing those abstractions to represent
larger-than-memory data and/or allowing operations on that data to be run on a multi-machine cluster,
while also providing automatic data parallelism, smart scheduling,
and optimized operations. Operations on these collections create a task graph, which is
executed by a scheduler.
Note that Ray does not currently support object spilling, and hence cannot
process datasets larger than cluster memory. This is a planned feature.
Ray provides a scheduler for Dask (`dask_on_ray`) which allows you to build data
analyses using Dask's collections and execute
the underlying tasks on a Ray cluster.
`dask_on_ray` uses Dask's scheduler API, which allows you to
specify any callable as the scheduler that you would like Dask to use to execute your
workload. Using the Dask-on-Ray scheduler, the entire Dask ecosystem can be executed on top of Ray.
=========
Scheduler
=========
---------
The Dask-Ray scheduler can execute any valid Dask graph, and can be used with
.. _dask-on-ray-scheduler:
The Dask-on-Ray scheduler can execute any valid Dask graph, and can be used with
any Dask `.compute() <https://docs.dask.org/en/latest/api.html#dask.compute>`__
call.
Here's an example:
@@ -25,53 +35,99 @@ Here's an example:
import ray
from ray.util.dask import ray_dask_get
import dask.delayed
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import time
# Start Ray.
# Tip: If you're connecting to an existing cluster, use ray.init(address="auto").
ray.init()
d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))
@dask.delayed
def inc(x):
time.sleep(1)
return x + 1
@dask.delayed
def add(x, y):
time.sleep(3)
return x + y
x = inc(1)
y = inc(2)
z = add(x, y)
# The Dask scheduler submits the underlying task graph to Ray.
z.compute(scheduler=ray_dask_get)
d_arr.mean().compute(scheduler=ray_dask_get)
# Set the scheduler to ray_dask_get in your config so you don't have to specify it on
# each compute call.
dask.config.set(scheduler=ray_dask_get)
df = dd.from_pandas(pd.DataFrame(
np.random.randint(0, 100, size=(1024, 2)),
columns=["age", "grade"]))
df.groupby(["age"]).mean().compute()
.. note::
For execution on a Ray cluster, you should *not* use the
`Dask.distributed <https://distributed.dask.org/en/latest/quickstart.html>`__
client; simply use plain Dask and its collections, and pass ``ray_dask_get``
to ``.compute()`` calls or set the scheduler in one of the other ways detailed `here <https://docs.dask.org/en/latest/scheduling.html#configuration>`__. Follow the instructions for
:ref:`using Ray on a cluster <using-ray-on-a-cluster>` to modify the
``ray.init()`` call.
Why use Dask on Ray?
1. If you'd like to create data analyses using the familiar NumPy and Pandas
APIs provided by Dask and execute them on a production-ready distributed
task execution system like Ray.
2. If you'd like to use Dask and Ray libraries in the same application
without having two different task execution backends.
3. To take advantage of Ray-specific features such as the
1. To take advantage of Ray-specific features such as the
:ref:`cluster launcher <ref-automatic-cluster>` and
:ref:`shared-memory store <memory>`.
2. If you'd like to use Dask and Ray libraries in the same application without having two different clusters.
3. If you'd like to create data analyses using the familiar NumPy and Pandas APIs provided by Dask and execute them on a fast, fault-tolerant distributed task execution system geared towards production, like Ray.
Note that for execution on a Ray cluster, you should *not* use the
`Dask.distributed <https://distributed.dask.org/en/latest/quickstart.html>`__
client; simply use plain Dask and its collections, and pass ``ray_dask_get``
to ``.compute()`` calls. Follow the instructions for
:ref:`using Ray on a cluster <using-ray-on-a-cluster>` to modify the
``ray.init()`` call.
Dask-on-Ray is an ongoing project and is not expected to achieve the same performance as using Ray directly. All `Dask abstractions <https://docs.dask.org/en/latest/user-interfaces.html>`__ should run seamlessly on top of Ray using this scheduler, so if you find that one of these abstractions doesn't run on Ray, please `open an issue <https://github.com/ray-project/ray/issues/new/choose>`__.
Dask-on-Ray is an ongoing project and is not expected to achieve the same performance as using Ray directly.
Out-of-Core Data Processing
---------------------------
.. _dask-on-ray-out-of-core:
Processing datasets larger than cluster memory is supported via Ray's :ref:`object spilling <object-spilling>`: if
the in-memory object store is full, objects will be spilled to external storage (local disk by
default). This feature is available but off by default in Ray 1.2, and is on by default
in Ray 1.3+. Please see your Ray version's object spilling documentation for steps to enable and/or configure
object spilling.
Custom optimization for Dask DataFrame shuffling
------------------------------------------------
.. _dask-on-ray-shuffle-optimization:
Dask on Ray provides a Dask DataFrame optimizer that leverages Ray's ability to
execute multiple-return tasks in order to speed up shuffling by as much as 4x on Ray.
Simply set the `dataframe_optimize` configuration option to our optimizer function, similar to how you specify the Dask-on-Ray scheduler:
.. code-block:: python
import ray
from ray.util.dask import ray_dask_get, dataframe_optimize
import dask.dataframe as dd
import numpy as np
import pandas as pd
import time
# Start Ray.
# Tip: If you're connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Set the scheduler to ray_dask_get, and set the Dask DataFrame optimizer to our
# custom optimization function, this time using the config setter as a context manager.
with dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize):
npartitions = 100
df = dd.from_pandas(pd.DataFrame(
np.random.randint(0, 100, size=(10000, 2)),
columns=["age", "grade"]), npartitions=npartitions)
# We set max_branch to infinity in order to ensure that the task-based shuffle
# happens in a single stage, which is required in order for our optimization to
# work.
df.set_index(
["age"], shuffle="tasks", max_branch=float("inf")).head(10, npartitions=-1)
=========
Callbacks
=========
---------
.. _dask-on-ray-callbacks:
Dask's `custom callback abstraction <https://docs.dask.org/en/latest/diagnostics-local.html#custom-callbacks>`__
is extended with Ray-specific callbacks, allowing the user to hook into the
@@ -208,11 +264,12 @@ execution time exceeds some user-defined threshold:
with cache_callback:
z.compute(scheduler=ray_dask_get)
Note that the existing Dask scheduler callbacks (``start``, ``start_state``,
``pretask``, ``posttask``, ``finish``) are also available, which can be used to
introspect the Dask task to Ray task conversion process, but that ``pretask``
and ``posttask`` are executed before and after the Ray task is *submitted*, not
executed, and that ``finish`` is executed after all Ray tasks have been
*submitted*, not executed.
.. note::
The existing Dask scheduler callbacks (``start``, ``start_state``,
``pretask``, ``posttask``, ``finish``) are also available, which can be used to
introspect the Dask task to Ray task conversion process, but note that the ``pretask``
and ``posttask`` hooks are executed before and after the Ray task is *submitted*, not
executed, and that ``finish`` is executed after all Ray tasks have been
*submitted*, not executed.
This callback API is currently unstable and subject to change.
+10 -4
View File
@@ -296,6 +296,16 @@ Papers
raysgd/raysgd_tune.rst
raysgd/raysgd_ref.rst
.. toctree::
:hidden:
:maxdepth: -1
:caption: Data Processing
modin/index.rst
dask-on-ray.rst
mars-on-ray.rst
raydp.rst
.. toctree::
:hidden:
:maxdepth: -1
@@ -305,10 +315,6 @@ Papers
joblib.rst
iter.rst
xgboost-ray.rst
modin/index.rst
dask-on-ray.rst
mars-on-ray.rst
raydp.rst
ray-client.rst
.. toctree::
+1
View File
@@ -179,6 +179,7 @@ In the output of ``ray memory``, we see that the second object displays as a nor
Object Spilling
---------------
.. _object-spilling:
Ray 1.3+ spills objects to external storage once the object store is full. By default, objects are spilled to the local filesystem.
To configure the directory where objects are placed, use: