diff --git a/doc/source/actors.rst b/doc/source/actors.rst index 210cd93d6..2858febeb 100644 --- a/doc/source/actors.rst +++ b/doc/source/actors.rst @@ -7,35 +7,86 @@ An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker. +Java demo code in this documentation can be found `here `__. + Creating an actor ----------------- -You can convert a standard Python class into a Ray actor class as follows: +.. tabs:: + .. group-tab:: Python -.. code-block:: python + You can convert a standard Python class into a Ray actor class as follows: - @ray.remote - class Counter(object): - def __init__(self): - self.value = 0 + .. code-block:: python - def increment(self): - self.value += 1 - return self.value + @ray.remote + class Counter(object): + def __init__(self): + self.value = 0 -Note that the above is equivalent to the following: + def increment(self): + self.value += 1 + return self.value -.. code-block:: python + def get_counter(self): + return self.value - class Counter(object): - def __init__(self): - self.value = 0 + counter_actor = Counter.remote() - def increment(self): - self.value += 1 - return self.value + Note that the above is equivalent to the following: - Counter = ray.remote(Counter) + .. code-block:: python + + class Counter(object): + def __init__(self): + self.value = 0 + + def increment(self): + self.value += 1 + return self.value + + def get_counter(self): + return self.value + + Counter = ray.remote(Counter) + + counter_actor = Counter.remote() + + .. group-tab:: Java + + You can convert a standard Java class into a Ray actor class as follows: + + .. code-block:: java + + // A regular Java class. + public class Counter { + + private int value = 0; + + public int increment() { + this.value += 1; + return this.value; + } + + public int getCounter() { + return this.value; + } + + public void reset(int newValue) { + this.value = newValue; + } + } + + public class CounterFactory { + public static Counter createCounter() { + return new Counter(); + } + } + + // Create an actor with a constructor. + Ray.actor(Counter::new).remote(); + // Create an actor with a factory method. + Ray.actor(CounterFactory::createCounter).remote(); When the above actor is instantiated, the following events happen. @@ -47,13 +98,19 @@ When the above actor is instantiated, the following events happen. Actor Methods ------------- -Any method of the actor can return multiple object refs with the ``ray.method`` decorator: +Methods of the actor can be called remotely. -.. code-block:: python +.. tabs:: + .. code-tab:: python + + counter_actor = Counter.remote() + + assert counter_actor.increment.remote() == 1 @ray.remote class Foo(object): + # Any method of the actor can return multiple object refs. @ray.method(num_returns=2) def bar(self): return 1, 2 @@ -64,6 +121,15 @@ Any method of the actor can return multiple object refs with the ``ray.method`` assert ray.get(obj_ref1) == 1 assert ray.get(obj_ref2) == 2 + .. code-tab:: java + + ActorHandle counterActor = Ray.actor(Counter::new).remote(); + // Call an actor method with a return value + Assert.assertEquals((int) counterActor.task(Counter::increment).remote().get(), 1); + // Call an actor method without return value. In this case, the return type of `remote()` is void. + counterActor.task(Counter::reset, 10).remote(); + Assert.assertEquals((int) counterActor.task(Counter::increment).remote().get(), 11); + .. _actor-resource-guide: Resources with Actors @@ -71,15 +137,29 @@ Resources with Actors You can specify that an actor requires CPUs or GPUs in the decorator. While Ray has built-in support for CPUs and GPUs, Ray can also handle custom resources. -When using GPUs, Ray will automatically set the environment variable ``CUDA_VISIBLE_DEVICES`` for the actor after instantiated. The actor will have access to a list of the IDs of the GPUs -that it is allowed to use via ``ray.get_gpu_ids()``. This is a list of strings, -like ``[]``, or ``['1']``, or ``['2', '5', '6']``. Under some circumstances, the IDs of GPUs could be given as UUID strings instead of indices (see the `CUDA programming guide `__). +.. tabs:: + .. group-tab:: Python -.. code-block:: python + When using GPUs, Ray will automatically set the environment variable ``CUDA_VISIBLE_DEVICES`` for the actor after instantiated. The actor will have access to a list of the IDs of the GPUs + that it is allowed to use via ``ray.get_gpu_ids()``. This is a list of strings, + like ``[]``, or ``['1']``, or ``['2', '5', '6']``. Under some circumstances, the IDs of GPUs could be given as UUID strings instead of indices (see the `CUDA programming guide `__). - @ray.remote(num_cpus=2, num_gpus=1) - class GPUActor(object): - pass + .. code-block:: python + + @ray.remote(num_cpus=2, num_gpus=1) + class GPUActor(object): + pass + + .. group-tab:: Java + + .. In Java, we always specify resources when creating actors. There's no annotation available to act like the Python decorator ``@ray.remote(...)``. + + .. code-block:: java + + public class GpuActor { + } + + Ray.actor(GpuActor::new).setResource("CPU", 2.0).setResource("GPU", 0.5).remote(); When an ``GPUActor`` instance is created, it will be placed on a node that has at least 1 GPU, and the GPU will be reserved for the actor for the duration of @@ -103,30 +183,48 @@ have these resources (see `configuration instructions resource. -.. code-block:: python +.. tabs:: + .. code-tab:: python - @ray.remote(resources={'Resource2': 1}) - class GPUActor(object): - pass + @ray.remote(resources={'Resource2': 1}) + class GPUActor(object): + pass + + .. code-tab:: java + + public class GpuActor { + } + + Ray.actor(GpuActor::new).setResource("Resource2", 1.0).remote(); If you need to instantiate many copies of the same actor with varying resource requirements, you can do so as follows. -.. code-block:: python +.. tabs:: + .. code-tab:: python - @ray.remote(num_cpus=4) - class Counter(object): - def __init__(self): - self.value = 0 + @ray.remote(num_cpus=4) + class Counter(object): + ... - def increment(self): - self.value += 1 - return self.value + a1 = Counter.options(num_cpus=1, resources={"Custom1": 1}).remote() + a2 = Counter.options(num_cpus=2, resources={"Custom2": 1}).remote() + a3 = Counter.options(num_cpus=3, resources={"Custom3": 1}).remote() + + .. code-tab:: java + + public class Counter { + ... + } + + ActorHandle a1 = Ray.actor(Counter::new).setResource("CPU", 1.0) + .setResource("Custom1", 1.0).remote(); + ActorHandle a2 = Ray.actor(Counter::new).setResource("CPU", 2.0) + .setResource("Custom2", 1.0).remote(); + ActorHandle a3 = Ray.actor(Counter::new).setResource("CPU", 3.0) + .setResource("Custom3", 1.0).remote(); - a1 = Counter.options(num_cpus=1, resources={"Custom1": 1}).remote() - a2 = Counter.options(num_cpus=2, resources={"Custom2": 1}).remote() - a3 = Counter.options(num_cpus=3, resources={"Custom3": 1}).remote() Note that to create these actors successfully, Ray will need to be started with sufficient CPU resources and the relevant custom resources. @@ -135,117 +233,239 @@ sufficient CPU resources and the relevant custom resources. Terminating Actors ------------------ -Actor processes will be terminated automatically when the initial actor handle -goes out of scope in Python. If we create an actor with ``actor_handle = -Counter.remote()``, then when ``actor_handle`` goes out of scope and is -destructed, the actor process will be terminated. Note that this only applies to -the original actor handle created for the actor and not to subsequent actor -handles created by passing the actor handle to other tasks. +Automatic termination +^^^^^^^^^^^^^^^^^^^^^ + +.. tabs:: + .. group-tab:: Python + + Actor processes will be terminated automatically when the initial actor handle + goes out of scope in Python. If we create an actor with ``actor_handle = + Counter.remote()``, then when ``actor_handle`` goes out of scope and is + destructed, the actor process will be terminated. Note that this only applies to + the original actor handle created for the actor and not to subsequent actor + handles created by passing the actor handle to other tasks. + + .. group-tab:: Java + + Terminating an actor automatically when the initial actor handle goes out of scope hasn't been implemented in Java yet. + +Manual termination within the actor +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If necessary, you can manually terminate an actor from within one of the actor methods. +This will kill the actor process and release resources associated/assigned to the actor. + +.. tabs:: + .. group-tab:: Python + .. code-block:: python + + ray.actor.exit_actor() + + This approach should generally not be necessary as actors are automatically garbage + collected. The ``ObjectRef`` resulting from the task can be waited on to wait + for the actor to exit (calling ``ray.get()`` on it will raise a ``RayActorError``). + + .. group-tab:: Java + .. code-block:: java + + Ray.exitActor(); + + Garbage collection for actors haven't been implemented yet, so this is currently the + only way to terminate an actor gracefully. The ``ObjectRef`` resulting from the task + can be waited on to wait for the actor to exit (calling ``ObjectRef::get`` on it will + throw a ``RayActorException``). -If necessary, you can manually terminate an actor by calling -``ray.actor.exit_actor()`` from within one of the actor methods. This will kill -the actor process and release resources associated/assigned to the actor. This -approach should generally not be necessary as actors are automatically garbage -collected. The ``ObjectRef`` resulting from the task can be waited on to wait -for the actor to exit (calling ``ray.get()`` on it will raise a ``RayActorError``). Note that this method of termination will wait until any previously submitted -tasks finish executing and then exit the process gracefully with sys.exit. If you -want to terminate an actor forcefully, you can call ``ray.kill(actor_handle)``. +tasks finish executing and then exit the process gracefully with sys.exit. + +Manual termination via an actor handle +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can terminate an actor forcefully. + +.. tabs:: + .. code-tab:: python + + ray.kill(actor_handle) + + .. code-tab:: java + + actorHandle.kill(); + This will call the exit syscall from within the actor, causing it to exit -immediately and any pending tasks to fail. This will not go through the normal -Python sys.exit teardown logic, so any exit handlers installed in the actor using -``atexit`` will not be called. +immediately and any pending tasks to fail. + +.. tabs:: + + .. group-tab:: Python + + This will not go through the normal + Python sys.exit teardown logic, so any exit handlers installed in the actor using + ``atexit`` will not be called. + + .. group-tab:: Java + + This will not go through the normal Java System.exit teardown logic, so any + shutdown hooks installed in the actor using ``Runtime.addShutdownHook(...)`` will + not be called. Passing Around Actor Handles ---------------------------- -Actor handles can be passed into other tasks. To illustrate this with a -simple example, consider a simple actor definition. +Actor handles can be passed into other tasks. We can define remote functions (or actor methods) that use actor handles. -.. code-block:: python +.. tabs:: + .. code-tab:: python - @ray.remote - class Counter(object): - def __init__(self): - self.counter = 0 + import time - def inc(self): - self.counter += 1 + @ray.remote + def f(counter): + for _ in range(1000): + time.sleep(0.1) + counter.increment.remote() - def get_counter(self): - return self.counter + .. code-tab:: java -We can define remote functions (or actor methods) that use actor handles. + public static class MyRayApp { -.. code-block:: python - - import time - - @ray.remote - def f(counter): - for _ in range(1000): - time.sleep(0.1) - counter.inc.remote() + public static void foo(ActorHandle counter) throws InterruptedException { + for (int i = 0; i < 1000; i++) { + TimeUnit.MILLISECONDS.sleep(100); + counter.task(Counter::increment).remote(); + } + } + } If we instantiate an actor, we can pass the handle around to various tasks. -.. code-block:: python +.. tabs:: + .. code-tab:: python - counter = Counter.remote() + counter = Counter.remote() - # Start some tasks that use the actor. - [f.remote(counter) for _ in range(3)] + # Start some tasks that use the actor. + [f.remote(counter) for _ in range(3)] - # Print the counter value. - for _ in range(10): - time.sleep(1) - print(ray.get(counter.get_counter.remote())) + # Print the counter value. + for _ in range(10): + time.sleep(1) + print(ray.get(counter.get_counter.remote())) + + .. code-tab:: java + + ActorHandle counter = Ray.actor(Counter::new).remote(); + + // Start some tasks that use the actor. + for (int i = 0; i < 3; i++) { + Ray.task(MyRayApp::foo, counter).remote(); + } + + // Print the counter value. + for (int i = 0; i < 10; i++) { + TimeUnit.SECONDS.sleep(1); + System.out.println(counter.task(Counter::getCounter).remote().get()); + } Named Actors ------------ -An actor can be given a globally unique name via ``.options(name="some_name")``, -which allows you to retrieve the actor from any job in the Ray cluster via -``ray.get_actor("some_name")``. This can be useful if you cannot directly +An actor can be given a globally unique name. +This allows you to retrieve the actor from any job in the Ray cluster. +This can be useful if you cannot directly pass the actor handle to the task that needs it, or if you are trying to access an actor launched by another driver. +.. tabs:: + + .. code-tab:: python + + # Create an actor with a name + counter = Counter.options(name="some_name").remote() + + ... + + # Retrieve the actor later somewhere + counter = ray.get_actor("some_name") + + .. group-tab:: Java + + .. code-block:: java + + // Create an actor with a globally unique name + ActorHandle counter = Ray.actor(Counter::new).setGlobalName("some_name").remote(); + + ... + + // Retrieve the actor later somewhere + Optional> counter = Ray.getGlobalActor("some_name"); + Assert.assertTrue(counter.isPresent()); + + We also support non-global named actors in Java, which means that the actor name is only valid within the job and the actor cannot be accessed from another job. + + .. code-block:: java + + // Create an actor with a job-scope-unique name + ActorHandle counter = Ray.actor(Counter::new).setName("some_name_in_job").remote(); + + ... + + // Retrieve the actor later somewhere in the same job + Optional> counter = Ray.getActor("some_name_in_job"); + Assert.assertTrue(counter.isPresent()); + + Actor Lifetimes --------------- -Separately, actor lifetimes can be decoupled from the job, allowing an actor to -persist even after the driver process of the job exits. +.. tabs:: + .. group-tab:: Python -.. code-block:: python + Separately, actor lifetimes can be decoupled from the job, allowing an actor to + persist even after the driver process of the job exits. - counter = Counter.options(name="CounterActor", lifetime="detached").remote() + .. code-block:: python -The CounterActor will be kept alive even after the driver running above script -exits. Therefore it is possible to run the following script in a different -driver: + counter = Counter.options(name="CounterActor", lifetime="detached").remote() -.. code-block:: python + The CounterActor will be kept alive even after the driver running above script + exits. Therefore it is possible to run the following script in a different + driver: - counter = ray.get_actor("CounterActor") - print(ray.get(counter.get_counter.remote())) + .. code-block:: python -Note that the lifetime option is decoupled from the name. If we only specified -the name without specifying ``lifetime="detached"``, then the CounterActor can -only be retrieved as long as the original driver is still running. + counter = ray.get_actor("CounterActor") + print(ray.get(counter.get_counter.remote())) + + Note that the lifetime option is decoupled from the name. If we only specified + the name without specifying ``lifetime="detached"``, then the CounterActor can + only be retrieved as long as the original driver is still running. + + .. group-tab:: Java + + Customizing lifetime of an actor hasn't been implemented in Java yet. Actor Pool ---------- -The ``ray.util`` module contains a utility class, ``ActorPool``. -This class is similar to multiprocessing.Pool and lets you schedule Ray tasks over a fixed pool of actors. +.. tabs:: + .. group-tab:: Python -.. code-block:: python + The ``ray.util`` module contains a utility class, ``ActorPool``. + This class is similar to multiprocessing.Pool and lets you schedule Ray tasks over a fixed pool of actors. - from ray.util import ActorPool + .. code-block:: python - a1, a2 = Actor.remote(), Actor.remote() - pool = ActorPool([a1, a2]) - print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) - # [2, 4, 6, 8] + from ray.util import ActorPool -See the `package reference `_ for more information. + a1, a2 = Actor.remote(), Actor.remote() + pool = ActorPool([a1, a2]) + print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) + # [2, 4, 6, 8] + + See the `package reference `_ for more information. + + .. group-tab:: Java + + Actor pool hasn't been implemented in Java yet. diff --git a/doc/source/walkthrough.rst b/doc/source/walkthrough.rst index 2250e6a57..423f59e9f 100644 --- a/doc/source/walkthrough.rst +++ b/doc/source/walkthrough.rst @@ -434,7 +434,7 @@ Actors extend the Ray API from functions (tasks) to classes. An actor is essenti .. group-tab:: Java - ``Ray.actor`` is used to create actors from regular Java classes. Unlike Python, multiple Java actors may share one JVM process, in order to reduce JVM's memory overhead. But this is transparent to normal users. + ``Ray.actor`` is used to create actors from regular Java classes. .. code-block:: java diff --git a/java/test/src/main/java/io/ray/docdemo/UsingActorsDemo.java b/java/test/src/main/java/io/ray/docdemo/UsingActorsDemo.java new file mode 100644 index 000000000..80bb842a8 --- /dev/null +++ b/java/test/src/main/java/io/ray/docdemo/UsingActorsDemo.java @@ -0,0 +1,133 @@ +package io.ray.docdemo; + +import io.ray.api.ActorHandle; +import io.ray.api.Ray; +import io.ray.docdemo.WalkthroughDemo.Counter; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.testng.Assert; + +/** + * This class contains demo code of the Ray core Using Actors doc (https://docs.ray.io/en/latest/actors.html). + * + * Please keep them in sync. + */ +public class UsingActorsDemo { + + // A regular Java class. + public static class Counter { + + private int value = 0; + + public int increment() { + this.value += 1; + return this.value; + } + + public int getCounter() { + return this.value; + } + + public void reset(int newValue) { + this.value = newValue; + } + } + + public static class CounterFactory { + + public static Counter createCounter() { + return new Counter(); + } + } + + public static class GpuActor { + + } + + public static class MyRayApp { + + public static void foo(ActorHandle counter) throws InterruptedException { + for (int i = 0; i < 1000; i++) { + TimeUnit.MILLISECONDS.sleep(100); + counter.task(Counter::increment).remote(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + Ray.init(); + + { + // Create an actor with a constructor. + Ray.actor(Counter::new).remote(); + // Create an actor with a factory method. + Ray.actor(CounterFactory::createCounter).remote(); + } + + { + ActorHandle a = Ray.actor(Counter::new).remote(); + // Call an actor method with a return value + Assert.assertEquals((int) a.task(Counter::increment).remote().get(), 1); + // Call an actor method without return value + a.task(Counter::reset, 10).remote(); + Assert.assertEquals((int) a.task(Counter::increment).remote().get(), 11); + } + + { + Ray.actor(GpuActor::new).setResource("CPU", 2.0).setResource("GPU", 0.5).remote(); + } + + { + Ray.actor(GpuActor::new).setResource("Resource2", 1.0).remote(); + } + + { + ActorHandle a1 = Ray.actor(Counter::new).setResource("CPU", 1.0) + .setResource("Custom1", 1.0).remote(); + ActorHandle a2 = Ray.actor(Counter::new).setResource("CPU", 2.0) + .setResource("Custom2", 1.0).remote(); + ActorHandle a3 = Ray.actor(Counter::new).setResource("CPU", 3.0) + .setResource("Custom3", 1.0).remote(); + } + + { + ActorHandle actorHandle = Ray.actor(Counter::new).remote(); + actorHandle.kill(); + } + + { + // Create an actor with a globally unique name + ActorHandle counter = Ray.actor(Counter::new).setGlobalName("some_name").remote(); + } + { + // Retrieve the actor later somewhere + Optional> counter = Ray.getGlobalActor("some_name"); + Assert.assertTrue(counter.isPresent()); + } + + { + // Create an actor with a job-scope-unique name + ActorHandle counter = Ray.actor(Counter::new).setName("some_name_in_job").remote(); + } + { + // Retrieve the actor later somewhere in the same job + Optional> counter = Ray.getActor("some_name_in_job"); + Assert.assertTrue(counter.isPresent()); + } + + { + ActorHandle counter = Ray.actor(Counter::new).remote(); + + // Start some tasks that use the actor. + for (int i = 0; i < 3; i++) { + Ray.task(MyRayApp::foo, counter).remote(); + } + + // Print the counter value. + for (int i = 0; i < 10; i++) { + TimeUnit.SECONDS.sleep(1); + System.out.println(counter.task(Counter::getCounter).remote().get()); + } + } + } +}