diff --git a/doc/requirements-doc.txt b/doc/requirements-doc.txt index 6cec0b448..c4d92868e 100644 --- a/doc/requirements-doc.txt +++ b/doc/requirements-doc.txt @@ -19,6 +19,7 @@ sphinx-click sphinx-copybutton sphinx-gallery sphinx-jsonschema +sphinx-tabs sphinx-version-warning sphinx_rtd_theme tabulate diff --git a/doc/source/conf.py b/doc/source/conf.py index 273515fd8..37812c604 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -64,6 +64,7 @@ extensions = [ 'sphinx.ext.viewcode', 'sphinx.ext.napoleon', 'sphinx_click.ext', + 'sphinx_tabs.tabs', 'sphinx-jsonschema', 'sphinx_gallery.gen_gallery', 'sphinx_copybutton', diff --git a/doc/source/serve/tutorials/batch.rst b/doc/source/serve/tutorials/batch.rst index 0ab76f99a..152d6d192 100644 --- a/doc/source/serve/tutorials/batch.rst +++ b/doc/source/serve/tutorials/batch.rst @@ -111,9 +111,9 @@ Let's deploy the new version to the same endpoint. Don't forget to set To query the backend via Python API, we can use ``serve.get_handle`` to receive a handle to the corresponding "endpoint". To enqueue a query, you can call ``handle.remote(argument_name=argument_value)``. This call returns immediately -with a :ref:`Ray ObjectRef`. You can call `ray.get` to retrieve +with a :ref:`Ray ObjectRef`. You can call `ray.get` to retrieve the result. .. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py :start-after: __doc_query_handle_begin__ - :end-before: __doc_query_handle_end__ \ No newline at end of file + :end-before: __doc_query_handle_end__ diff --git a/doc/source/walkthrough.rst b/doc/source/walkthrough.rst index 30e2fc105..fe9281a8c 100644 --- a/doc/source/walkthrough.rst +++ b/doc/source/walkthrough.rst @@ -4,12 +4,14 @@ Ray Core Walkthrough This walkthrough will overview the core concepts of Ray: 1. Starting Ray -2. Using remote functions (tasks) [``ray.remote``] -3. Fetching results (object refs) [``ray.put``, ``ray.get``, ``ray.wait``] -4. Using remote classes (actors) [``ray.remote``] +2. Using remote functions (tasks) +3. Fetching results (object refs) +4. Using remote classes (actors) With Ray, your code will work on a single machine and can be easily scaled to large cluster. +Java demo code in this documentation can be found `here `__. + Installation ------------ @@ -18,17 +20,32 @@ To run this walkthrough, install Ray with ``pip install -U ray``. For the latest Starting Ray ------------ -You can start Ray on a single machine by adding this to your python script. +You can start Ray on a single machine by adding this to your code. -.. code-block:: python +.. tabs:: + .. code-tab:: python - import ray + import ray - # Start Ray. If you're connecting to an existing cluster, you would use - # ray.init(address=) instead. - ray.init() + # Start Ray. If you're connecting to an existing cluster, you would use + # ray.init(address=) instead. + ray.init() - ... + ... + + .. code-tab:: java + + import io.ray.Ray; + + public class MyRayApp { + + public static void main(String[] args) { + // Start Ray runtime. If you're connecting to an existing cluster, you can set + // the `-Dray.redis.address=` java system property. + Ray.init() + ... + } + } Ray will then be able to utilize all cores of your machine. Find out how to configure the number of cores Ray will use at :ref:`configuring-ray`. @@ -39,97 +56,157 @@ To start a multi-node Ray cluster, see the :ref:`cluster setup page `__ page for specific documentation on how to use ``ray.remote``. - .. code:: python + .. group-tab:: Java - # These happen in parallel. - for _ in range(4): - remote_function.remote() + .. code:: java -The invocations are executed in parallel because the call to ``remote_function.remote()`` doesn't block. -All computation is performed in the background, driven by Ray's internal event loop. + public class MyRayApp { + // A regular Java static method. + public static int myFunction() { + return 1; + } + } -See the `ray.remote package reference `__ page for specific documentation on how to use ``ray.remote``. + // Invoke the above method as a Ray remote function. + // This will immediately return an object ref (a future) and then create + // a task that will be executed on a worker process. + ObjectRef res = Ray.task(MyRayApp::myFunction).remote(); -.. _ray-object-ids: + // The result can be retrieved with ``ObjectRef::get``. + Assert.assertTrue(res.get() == 1); -**Object refs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**. For example, take this function: + public static int slowFunction() throws InterruptedException { + TimeUnit.SECONDS.sleep(10); + return 1; + } -.. code:: python + // Invocations of Ray remote functions happen in parallel. + // All computation is performed in the background, driven by Ray's internal event loop. + for(int i = 0; i < 4; i++) { + // This doesn't block. + Ray.task(MyRayApp::slowFunction).remote(); + } + +.. _ray-object-refs: + +Passing object refs to remote functions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Object refs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular object**. For example, take this function: + +.. tabs:: + .. code-tab:: python @ray.remote - def remote_chain_function(value): + def function_with_an_argument(value): return value + 1 - y1_id = remote_function.remote() - assert ray.get(y1_id) == 1 + obj_ref1 = my_function.remote() + assert ray.get(obj_ref1) == 1 - chained_id = remote_chain_function.remote(y1_id) - assert ray.get(chained_id) == 2 + # You can pass an object ref as an argument to another Ray remote function. + obj_ref2 = function_with_an_argument.remote(obj_ref1) + assert ray.get(obj_ref2) == 2 + .. code-tab:: java + + public static int functionWithAnArgument(int value) { + return value + 1; + } + + ObjectRef objRef1 = Ray.task(MyRayApp::myFunction).remote(); + Assert.assertTrue(objRef1.get() == 1); + + // You can pass an object ref as an argument to another Ray remote function. + ObjectRef objRef2 = Ray.task(MyRayApp::functionWithAnArgument, objRef1).remote(); + Assert.assertTrue(objRef2.get() == 2); Note the following behaviors: - The second task will not be executed until the first task has finished executing because the second task depends on the output of the first task. - If the two tasks are scheduled on different machines, the output of the - first task (the value corresponding to ``y1_id``) will be sent over the + first task (the value corresponding to ``obj_ref1/objRef1``) will be sent over the network to the machine where the second task is scheduled. -Oftentimes, you may want to specify a task's resource requirements (for example -one task may require a GPU). The ``ray.init()`` command will automatically -detect the available GPUs and CPUs on the machine. However, you can override -this default behavior by passing in specific resources, e.g., -``ray.init(num_cpus=8, num_gpus=4, resources={'Custom': 2})``. -To specify a task's CPU and GPU requirements, pass the ``num_cpus`` and -``num_gpus`` arguments into the remote decorator. The task will only run on a -machine if there are enough CPU and GPU (and other custom) resources available -to execute the task. Ray can also handle arbitrary custom resources. +Specifying required resources +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Oftentimes, you may want to specify a task's resource requirements (for example +one task may require a GPU). Ray will automatically +detect the available GPUs and CPUs on the machine. However, you can override +this default behavior by passing in specific resources. + +.. tabs:: + .. group-tab:: Python + + ``ray.init(num_cpus=8, num_gpus=4, resources={'Custom': 2})``` + + .. group-tab:: Java + + Set Java system property: ``-Dray.resources=CPU:8,GPU:4,Custom:2``. + +Ray also allows specifying a task's resources requirements (e.g., CPU, GPU, and custom resources). +The task will only run on a machine if there are enough resources +available to execute the task. + +.. tabs:: + .. code-tab:: python + + # Specify required resources. + @ray.remote(num_cpus=4, num_gpus=2) + def my_function(): + return 1 + + .. code-tab:: java + + // Specify required resources. + Ray.task(MyRayApp::myFunction).setResource("CPU", 1.0).setResource("GPU", 4.0).remote(); .. note:: - * If you do not specify any resources in the ``@ray.remote`` decorator, the - default is 1 CPU resource and no other resources. + * If you do not specify any resources, the default is 1 CPU resource and + no other resources. * If specifying CPUs, Ray does not enforce isolation (i.e., your task is expected to honor its request). * If specifying GPUs, Ray does provide isolation in forms of visible devices @@ -137,12 +214,6 @@ to execute the task. Ray can also handle arbitrary custom resources. task's responsibility to actually use the GPUs (e.g., through a deep learning framework like TensorFlow or PyTorch). -.. code-block:: python - - @ray.remote(num_cpus=4, num_gpus=2) - def f(): - return 1 - The resource requirements of a task have implications for the Ray's scheduling concurrency. In particular, the sum of the resource requirements of all of the concurrently executing tasks on a given node cannot exceed the node's total @@ -150,39 +221,64 @@ resources. Below are more examples of resource specifications: -.. code-block:: python +.. tabs:: + .. code-tab:: python - # Ray also supports fractional resource requirements - @ray.remote(num_gpus=0.5) - def h(): - return 1 + # Ray also supports fractional resource requirements. + @ray.remote(num_gpus=0.5) + def h(): + return 1 - # Ray support custom resources too. - @ray.remote(resources={'Custom': 1}) - def f(): - return 1 + # Ray support custom resources too. + @ray.remote(resources={'Custom': 1}) + def f(): + return 1 -Further, remote functions can return multiple object refs. + .. code-tab:: java -.. code-block:: python + // Ray aslo supports fractional and custom resources. + Ray.task(MyRayApp::myFunction).setResource("GPU", 0.5).setResource("Custom", 1.0).remote(); - @ray.remote(num_return_vals=3) - def return_multiple(): - return 1, 2, 3 +Multiple returns +~~~~~~~~~~~~~~~~ - a_id, b_id, c_id = return_multiple.remote() +.. tabs:: + .. group-tab:: Python -Remote functions can be canceled by calling ``ray.cancel`` (:ref:`docstring `) on the returned Object ref. Remote actor functions can be stopped by killing the actor using the ``ray.kill`` interface. + Python remote functions can return multiple object refs. -.. code-block:: python + .. code-block:: python - @ray.remote - def blocking_operation(): - time.sleep(10e6) - return 100 + @ray.remote(num_return_vals=3) + def return_multiple(): + return 1, 2, 3 - obj_ref = blocking_operation.remote() - ray.cancel(obj_ref) + a, b, c = return_multiple.remote() + + .. group-tab:: Java + + Java remote functions doesn't support returning multiple objects. + +Cancelling tasks +~~~~~~~~~~~~~~~~ + +.. tabs:: + .. group-tab:: Python + + Remote functions can be canceled by calling ``ray.cancel`` (:ref:`docstring `) on the returned Object ref. Remote actor functions can be stopped by killing the actor using the ``ray.kill`` interface. + + .. code-block:: python + + @ray.remote + def blocking_operation(): + time.sleep(10e6) + + obj_ref = blocking_operation.remote() + ray.cancel(obj_ref) + + .. group-tab:: Java + + Task cancellation hasn't been implemented in Java yet. Objects in Ray -------------- @@ -196,13 +292,21 @@ similar. Object refs can be created in multiple ways. 1. They are returned by remote function calls. - 2. They are returned by ``ray.put`` (:ref:`docstring `). + 2. They are returned by ``put`` (:ref:`docstring `). -.. code-block:: python +.. tabs:: + .. code-tab:: python + # Put an object in Ray's object store. y = 1 object_ref = ray.put(y) + .. code-tab:: java + + // Put an object in Ray's object store. + int y = 1; + ObjectRef objectRef = Ray.put(y); + .. note:: Remote objects are immutable. That is, their values cannot be changed after @@ -213,125 +317,215 @@ Object refs can be created in multiple ways. Fetching Results ---------------- -The command ``ray.get(x_id, timeout=None)`` (:ref:`docstring `) takes an object ref and creates a Python object -from the corresponding remote object. First, if the current node's object store -does not contain the object, the object is downloaded. Then, if the object is a `numpy array `__ -or a collection of numpy arrays, the ``get`` call is zero-copy and returns arrays backed by shared object store memory. -Otherwise, we deserialize the object data into a Python object. +You can use the ``get`` method (:ref:`docstring `) to fetch the result of a remote object from an object ref. +If the current node's object store does not contain the object, the object is downloaded. -.. code-block:: python +.. tabs:: + .. group-tab:: Python - y = 1 - obj_ref = ray.put(y) - assert ray.get(obj_ref) == 1 + If the object is a `numpy array `__ + or a collection of numpy arrays, the ``get`` call is zero-copy and returns arrays backed by shared object store memory. + Otherwise, we deserialize the object data into a Python object. -You can also set a timeout to return early from a ``get`` that's blocking for too long. + .. code-block:: python -.. code-block:: python + # Get the value of one object ref. + obj_ref = ray.put(1) + assert ray.get(obj_ref) == 1 - from ray.exceptions import RayTimeoutError + # Get the values of multiple object refs in parallel. + assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2] - @ray.remote - def long_running_function() - time.sleep(8) + # You can also set a timeout to return early from a ``get`` that's blocking for too long. + from ray.exceptions import RayTimeoutError - obj_ref = long_running_function.remote() - try: - ray.get(obj_ref, timeout=4) - except RayTimeoutError: - print("`get` timed out.") + @ray.remote + def long_running_function() + time.sleep(8) + obj_ref = long_running_function.remote() + try: + ray.get(obj_ref, timeout=4) + except RayTimeoutError: + print("`get` timed out.") + + .. group-tab:: Java + + .. code-block:: java + + // Get the value of one object ref. + ObjectRef objRef = Ray.put(1); + Assert.assertTrue(object.get() == 1); + + // Get the values of multiple object refs in parallel. + List> objRefs = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + objectRefs.add(Ray.put(i)); + } + List results = Ray.get(objectRefs); + Assert.assertEquals(results, ImmutableList.of(0, 1, 2)); After launching a number of tasks, you may want to know which ones have -finished executing. This can be done with ``ray.wait`` (:ref:`ray-wait-ref`). The function +finished executing. This can be done with ``wait`` (:ref:`ray-wait-ref`). The function works as follows. -.. code:: python +.. tabs:: + .. code-tab:: python - ready_ids, remaining_ids = ray.wait(object_refs, num_returns=1, timeout=None) + ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None) + + .. code-tab:: java + + WaitResult waitResult = Ray.wait(objectRefs, /*num_returns=*/0, /*timeoutMs=*/1000); + System.out.println(waitResult.getReady()); // List of ready objects. + System.out.println(waitResult.getUnready()); // list of unready objects. Object Eviction --------------- When the object store gets full, objects will be evicted to make room for new objects. This happens in approximate LRU (least recently used) order. To avoid objects from -being evicted, you can call ``ray.get`` and store their values instead. Numpy array +being evicted, you can call ``get`` and store their values instead. Numpy array objects cannot be evicted while they are mapped in any Python process. You can also configure `memory limits `__ to control object store usage by actors. .. note:: - Objects created with ``ray.put`` are pinned in memory while a Python reference + Objects created with ``put`` are pinned in memory while a Python/Java reference to the object ref returned by the put exists. This only applies to the specific - ID returned by put, not IDs in general or copies of that IDs. + ref returned by put, not refs in general or copies of that refs. Remote Classes (Actors) ----------------------- -Actors extend the Ray API from functions (tasks) to classes. The ``ray.remote`` -decorator indicates that instances of the ``Counter`` class will be actors. An -actor is essentially a stateful worker. Each actor runs in its own Python -process. +Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker. -.. code-block:: python +.. tabs:: - @ray.remote - class Counter(object): - def __init__(self): - self.value = 0 + .. group-tab:: Python - def increment(self): - self.value += 1 - return self.value + The ``ray.remote`` decorator indicates that instances of the ``Counter`` class will be actors. Each actor runs in its own Python process. -To create a couple actors, we can instantiate this class as follows: + .. code-block:: python -.. code-block:: python + @ray.remote + class Counter(object): + def __init__(self): + self.value = 0 - a1 = Counter.remote() - a2 = Counter.remote() + def increment(self): + self.value += 1 + return self.value -When an actor is instantiated, the following events happen. + # Create an actor from this class. + counter = Counter.remote() -1. A worker Python process is started on a node of the cluster. -2. A ``Counter`` object is instantiated on that worker. + .. group-tab:: Java -You can specify resource requirements in Actors too (see the `Actors section + ``Ray.actor`` is used to create actors from regular Java classes. Unlike Python, multiple Java actors may share one JVM process, in order to reduce JVM's memory overhead. But this is transparent to normal users. + + .. code-block:: java + + // A regular Java class. + public class Counter { + + private int value = 0; + + public int increment() { + this.value += 1; + return this.value; + } + } + + // Create an actor from this class. + // `Ray.actor` takes a factory method that can produce + // a `Counter` object. Here, we pass `Counter`'s constructor + // as the argument. + ActorHandle counter = Ray.actor(Counter::new).remote(); + +Specifying required resources +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can specify resource requirements in actors too (see the `Actors section `__ for more details.) -.. code-block:: python +.. tabs:: + .. code-tab:: python - @ray.remote(num_cpus=2, num_gpus=0.5) - class Actor(object): - pass + # Specify required resources for an actor. + @ray.remote(num_cpus=2, num_gpus=0.5) + class Actor(object): + pass -We can interact with the actor by calling its methods with the ``.remote`` -operator. We can then call ``ray.get`` on the object ref to retrieve the actual + .. code-tab:: java + + // Specify required resources for an actor. + Ray.actor(Counter::new).setResource("CPU", 2.0).setResource("GPU", 0.5).remote(); + + +Calling the actor +~~~~~~~~~~~~~~~~~ + +We can interact with the actor by calling its methods with the ``remote`` +operator. We can then call ``get`` on the object ref to retrieve the actual value. -.. code-block:: python +.. tabs:: + .. code-tab:: python - obj_ref = a1.increment.remote() - ray.get(obj_ref) == 1 + # Call the actor. + obj_ref = counter.increment.remote() + ray.get(obj_ref) == 1 + .. code-tab:: java + + // Call the actor. + ObjectRef objectRef = counter.task(Counter::increment).remote(); + Assert.assertTrue(objectRef.get() == 1); Methods called on different actors can execute in parallel, and methods called on the same actor are executed serially in the order that they are called. Methods on the same actor will share state with one another, as shown below. -.. code-block:: python +.. tabs:: + .. code-tab:: python - # Create ten Counter actors. - counters = [Counter.remote() for _ in range(10)] + # Create ten Counter actors. + counters = [Counter.remote() for _ in range(10)] - # Increment each Counter once and get the results. These tasks all happen in - # parallel. - results = ray.get([c.increment.remote() for c in counters]) - print(results) # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + # Increment each Counter once and get the results. These tasks all happen in + # parallel. + results = ray.get([c.increment.remote() for c in counters]) + print(results) # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] - # Increment the first Counter five times. These tasks are executed serially - # and share state. - results = ray.get([counters[0].increment.remote() for _ in range(5)]) - print(results) # prints [2, 3, 4, 5, 6] + # Increment the first Counter five times. These tasks are executed serially + # and share state. + results = ray.get([counters[0].increment.remote() for _ in range(5)]) + print(results) # prints [2, 3, 4, 5, 6] + .. code-tab:: java + + // Create ten Counter actors. + List> counters = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + counters.add(Ray.actor(Counter::new).remote()); + } + + // Increment each Counter once and get the results. These tasks all happen in + // parallel. + List> objectRefs = new ArrayList<>(); + for (ActorHandle counterActor : counters) { + objectRefs.add(counterActor.task(Counter::increment).remote()); + } + // prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + System.out.println(Ray.get(objectRefs)); + + // Increment the first Counter five times. These tasks are executed serially + // and share state. + objectRefs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + objectRefs.add(counters.get(0).task(Counter::increment).remote()); + } + // prints [2, 3, 4, 5, 6] + System.out.println(Ray.get(objectRefs)); To learn more about Ray Actors, see the `Actors section `__. diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java index e5aed36b0..dcd5d4ac7 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java @@ -53,4 +53,8 @@ public final class ObjectRefImpl implements ObjectRef, Serializable { return type; } + @Override + public String toString() { + return "ObjectRef(" + id.toString() + ")"; + } } diff --git a/java/test/src/main/java/io/ray/docdemo/WalkthroughDemo.java b/java/test/src/main/java/io/ray/docdemo/WalkthroughDemo.java new file mode 100644 index 000000000..00d1c6512 --- /dev/null +++ b/java/test/src/main/java/io/ray/docdemo/WalkthroughDemo.java @@ -0,0 +1,148 @@ +package io.ray.docdemo; + +import com.google.common.collect.ImmutableList; +import io.ray.api.ActorHandle; +import io.ray.api.ObjectRef; +import io.ray.api.Ray; +import io.ray.api.WaitResult; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.testng.Assert; + +/** + * This class contains demo code of the Ray core walkthrough doc (https://docs.ray.io/en/latest/walkthrough.html). + * + * Please keep them in sync. + */ +public class WalkthroughDemo { + + public static class MyRayApp { + + // A regular Java static method. + public static int myFunction() { + return 1; + } + + public static int slowFunction() throws InterruptedException { + TimeUnit.SECONDS.sleep(10); + return 1; + } + + public static int functionWithAnArgument(int value) { + return value + 1; + } + } + + public static void demoTasks() { + // Invoke the above method as a Ray remote function. + // This will immediately return an object ref (a future) and then create + // a task that will be executed on a worker process. + ObjectRef res = Ray.task(MyRayApp::myFunction).remote(); + + // The result can be retrieved with ``ObjectRef::get``. + Assert.assertTrue(res.get() == 1); + + // Invocations of Ray remote functions happen in parallel. + // All computation is performed in the background, driven by Ray's internal event loop. + for (int i = 0; i < 4; i++) { + // This doesn't block. + Ray.task(MyRayApp::slowFunction).remote(); + } + + ObjectRef objRef1 = Ray.task(MyRayApp::myFunction).remote(); + Assert.assertTrue(objRef1.get() == 1); + + // You can pass an `ObjectRef` as an argument to another Ray remote function. + ObjectRef objRef2 = Ray.task(MyRayApp::functionWithAnArgument, objRef1).remote(); + Assert.assertTrue(objRef2.get() == 2); + + // Specify required resources. + Ray.task(MyRayApp::myFunction).setResource("CPU", 2.0).setResource("GPU", 4.0).remote(); + + // Ray aslo supports fractional and custom resources. + Ray.task(MyRayApp::myFunction).setResource("GPU", 0.5).setResource("Custom", 1.0).remote(); + } + + public static void demoObjects() { + // Put an object in Ray's object store. + int y = 1; + ObjectRef objectRef = Ray.put(y); + // Get the value of one object ref. + Assert.assertTrue(objectRef.get() == 1); + + // Get the values of multiple object refs in parallel. + List> objectRefs = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + objectRefs.add(Ray.put(i)); + } + List results = Ray.get(objectRefs); + Assert.assertEquals(results, ImmutableList.of(0, 1, 2)); + + WaitResult waitResult = Ray.wait(objectRefs, /*num_returns=*/1, /*timeoutMs=*/1000); + System.out.println(waitResult.getReady()); // List of ready objects. + System.out.println(waitResult.getUnready()); // list of unready objects. + } + + // A regular Java class. + public static class Counter { + + private int value = 0; + + public int increment() { + this.value += 1; + return this.value; + } + } + + public static void demoActors() { + // Create an actor from this class. + // `Ray.actor` takes a factory method that can produce + // a `Counter` object. Here, we pass `Counter`'s constructor + // as the argument. + ActorHandle counter = Ray.actor(Counter::new).remote(); + + // Specify required resources for an actor. + Ray.actor(Counter::new).setResource("CPU", 2.0).setResource("GPU", 0.5).remote(); + + // Call the actor. + ObjectRef objectRef = counter.task(Counter::increment).remote(); + Assert.assertTrue(objectRef.get() == 1); + + // Create ten Counter actors. + List> counters = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + counters.add(Ray.actor(Counter::new).remote()); + } + + // Increment each Counter once and get the results. These tasks all happen in + // parallel. + List> objectRefs = new ArrayList<>(); + for (ActorHandle counterActor : counters) { + objectRefs.add(counterActor.task(Counter::increment).remote()); + } + // prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + System.out.println(Ray.get(objectRefs)); + + // Increment the first Counter five times. These tasks are executed serially + // and share state. + objectRefs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + objectRefs.add(counters.get(0).task(Counter::increment).remote()); + } + // prints [2, 3, 4, 5, 6] + System.out.println(Ray.get(objectRefs)); + } + + public static void main(String[] args) { + // Start Ray runtime. If you're connecting to an existing cluster, you can set + // the `-Dray.redis.address=` java system property. + Ray.init(); + + demoTasks(); + + demoObjects(); + + demoActors(); + } +}