diff --git a/ci/long_running_tests/workloads/many_actor_tasks.py b/ci/long_running_tests/workloads/many_actor_tasks.py index c37b22ccc..2dd395c0e 100644 --- a/ci/long_running_tests/workloads/many_actor_tasks.py +++ b/ci/long_running_tests/workloads/many_actor_tasks.py @@ -36,9 +36,7 @@ ray.init(redis_address=cluster.redis_address) # Run the workload. -# TODO (williamma12): Remove the num_cpus argument once -# https://github.com/ray-project/ray/issues/4312 gets resolved -@ray.remote(num_cpus=0.1) +@ray.remote class Actor(object): def __init__(self): self.value = 0 @@ -47,10 +45,8 @@ class Actor(object): self.value += 1 -# TODO (williamma12): Update the actors to each have only 0.1 of a cpu once -# https://github.com/ray-project/ray/issues/4312 gets resolved. actors = [ - Actor._remote([], {}, resources={str(i % num_nodes): 0.1}) + Actor._remote([], {}, num_cpus=0.1, resources={str(i % num_nodes): 0.1}) for i in range(num_nodes * 5) ] diff --git a/python/ray/actor.py b/python/ray/actor.py index f5c2a6658..9d2d25505 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -21,8 +21,6 @@ from ray.utils import _random_string from ray import (ObjectID, ActorID, ActorHandleID, ActorClassID, TaskID, DriverID) -DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1 - logger = logging.getLogger(__name__) @@ -166,7 +164,7 @@ class ActorClass(object): """ def __init__(self, modified_class, class_id, max_reconstructions, num_cpus, - num_gpus, resources, actor_method_cpus): + num_gpus, resources): self._modified_class = modified_class self._class_id = class_id self._class_name = modified_class.__name__ @@ -174,7 +172,6 @@ class ActorClass(object): self._num_cpus = num_cpus self._num_gpus = num_gpus self._resources = resources - self._actor_method_cpus = actor_method_cpus self._exported = False self._actor_methods = inspect.getmembers( @@ -215,7 +212,7 @@ class ActorClass(object): method.__ray_num_return_vals__) else: self._actor_method_num_return_vals[method_name] = ( - DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS) + ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS) def __call__(self, *args, **kwargs): raise Exception("Actors methods cannot be instantiated directly. " @@ -276,6 +273,25 @@ class ActorClass(object): # updated to reflect the new invocation. actor_cursor = None + # Set the actor's default resources if not already set. First three + # conditions are to check that no resources were specified in the + # decorator. Last three conditions are to check that no resources were + # specified when _remote() was called. + if (self._num_cpus is None and self._num_gpus is None + and self._resources is None and num_cpus is None + and num_gpus is None and resources is None): + # In the default case, actors acquire no resources for + # their lifetime, and actor methods will require 1 CPU. + cpus_to_use = ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE + actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE + else: + # If any resources are specified (here or in decorator), then + # all resources are acquired for the actor's lifetime and no + # resources are associated with methods. + cpus_to_use = (ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED + if self._num_cpus is None else self._num_cpus) + actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED + # Do not export the actor class or the actor if run in LOCAL_MODE # Instead, instantiate the actor locally and add it to the worker's # dictionary @@ -290,15 +306,15 @@ class ActorClass(object): self._exported = True resources = ray.utils.resources_from_resource_arguments( - self._num_cpus, self._num_gpus, self._resources, num_cpus, + cpus_to_use, self._num_gpus, self._resources, num_cpus, num_gpus, resources) # If the actor methods require CPU resources, then set the required # placement resources. If actor_placement_resources is empty, then # the required placement resources will be the same as resources. actor_placement_resources = {} - assert self._actor_method_cpus in [0, 1] - if self._actor_method_cpus == 1: + assert actor_method_cpu in [0, 1] + if actor_method_cpu == 1: actor_placement_resources = resources.copy() actor_placement_resources["CPU"] += 1 @@ -322,8 +338,8 @@ class ActorClass(object): actor_handle = ActorHandle( actor_id, self._modified_class.__module__, self._class_name, actor_cursor, self._actor_method_names, self._method_signatures, - self._actor_method_num_return_vals, actor_cursor, - self._actor_method_cpus, worker.task_driver_id) + self._actor_method_num_return_vals, actor_cursor, actor_method_cpu, + worker.task_driver_id) # We increment the actor counter by 1 to account for the actor creation # task. actor_handle._ray_actor_counter += 1 @@ -664,8 +680,7 @@ class ActorHandle(object): return self._deserialization_helper(state, False) -def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, - max_reconstructions): +def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions): # Give an error if cls is an old-style class. if not issubclass(cls, object): raise TypeError( @@ -720,7 +735,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, class_id = ActorClassID(_random_string()) return ActorClass(Class, class_id, max_reconstructions, num_cpus, num_gpus, - resources, actor_method_cpus) + resources) ray.worker.global_worker.make_actor = make_actor diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index a54a3679c..da9a26e98 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -25,6 +25,17 @@ DEFAULT_REDIS_MAX_MEMORY_BYTES = 10**10 # The smallest cap on the memory used by Redis that we allow. REDIS_MINIMUM_MEMORY_BYTES = 10**7 +# Default resource requirements for actors when no resource requirements are +# specified. +DEFAULT_ACTOR_METHOD_CPU_SIMPLE = 1 +DEFAULT_ACTOR_CREATION_CPU_SIMPLE = 0 +# Default resource requirements for actors when some resource requirements are +# specified in . +DEFAULT_ACTOR_METHOD_CPU_SPECIFIED = 0 +DEFAULT_ACTOR_CREATION_CPU_SPECIFIED = 1 +# Default number of return values for each actor method. +DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1 + # If a remote function or actor (or some other export) has serialized size # greater than this quantity, print an warning. PICKLE_OBJECT_WARNING_SIZE = 10**7 diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 13d14f3d1..b689629fe 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -502,6 +502,89 @@ def test_actor_class_methods(ray_start_regular): assert ray.get(a.g.remote(2)) == 4 +def test_resource_assignment(shutdown_only): + """Test to make sure that we assign resource to actors at instantiation.""" + # This test will create 16 actors. Declaring this many CPUs initially will + # speed up the test because the workers will be started ahead of time. + ray.init(num_cpus=16, num_gpus=1, resources={"Custom": 1}) + + class Actor(object): + def __init__(self): + self.resources = ray.get_resource_ids() + + def get_actor_resources(self): + return self.resources + + def get_actor_method_resources(self): + return ray.get_resource_ids() + + decorator_resource_args = [{}, { + "num_cpus": 0.1 + }, { + "num_gpus": 0.1 + }, { + "resources": { + "Custom": 0.1 + } + }] + instantiation_resource_args = [{}, { + "num_cpus": 0.2 + }, { + "num_gpus": 0.2 + }, { + "resources": { + "Custom": 0.2 + } + }] + for decorator_args in decorator_resource_args: + for instantiation_args in instantiation_resource_args: + if len(decorator_args) == 0: + actor_class = ray.remote(Actor) + else: + actor_class = ray.remote(**decorator_args)(Actor) + actor = actor_class._remote(**instantiation_args) + actor_resources = ray.get(actor.get_actor_resources.remote()) + actor_method_resources = ray.get( + actor.get_actor_method_resources.remote()) + if len(decorator_args) == 0 and len(instantiation_args) == 0: + assert len(actor_resources) == 0, ( + "Actor should not be assigned resources.") + assert list(actor_method_resources.keys()) == [ + "CPU" + ], ("Actor method should only have CPUs") + assert actor_method_resources["CPU"][0][1] == 1, ( + "Actor method should default to one cpu.") + else: + if ("num_cpus" not in decorator_args + and "num_cpus" not in instantiation_args): + assert actor_resources["CPU"][0][1] == 1, ( + "Actor should default to one cpu.") + correct_resources = {} + defined_resources = decorator_args.copy() + defined_resources.update(instantiation_args) + for resource, value in defined_resources.items(): + if resource == "num_cpus": + correct_resources["CPU"] = value + elif resource == "num_gpus": + correct_resources["GPU"] = value + elif resource == "resources": + for custom_resource, amount in value.items(): + correct_resources[custom_resource] = amount + for resource, amount in correct_resources.items(): + assert (actor_resources[resource][0][0] == + actor_method_resources[resource][0][0]), ( + "Should have assigned same {} for both actor ", + "and actor method.".format(resource)) + assert (actor_resources[resource][0][ + 1] == actor_method_resources[resource][0][1]), ( + "Should have assigned same amount of {} for both ", + "actor and actor method.".format(resource)) + assert actor_resources[resource][0][1] == amount, ( + "Actor should have {amount} {resource} but has ", + "{amount} {resource}".format( + amount=amount, resource=resource)) + + def test_multiple_actors(ray_start_regular): @ray.remote class Counter(object): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index fad8e1241..cf2a37125 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -807,7 +807,7 @@ def test_defining_remote_functions(shutdown_only): def test_submit_api(shutdown_only): - ray.init(num_cpus=1, num_gpus=1, resources={"Custom": 1}) + ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote def f(n): diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index b096d2a78..71016c62a 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -139,8 +139,11 @@ def test_actor_broadcast(ray_start_cluster_with_resource): pass actors = [ - Actor._remote(args=[], kwargs={}, resources={str(i % num_nodes): 1}) - for i in range(100) + Actor._remote( + args=[], + kwargs={}, + num_cpus=0.01, + resources={str(i % num_nodes): 1}) for i in range(100) ] # Wait for the actors to start up. diff --git a/python/ray/worker.py b/python/ray/worker.py index aa65def06..809e2e434 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -76,15 +76,6 @@ PYTHON_MODE = 3 ERROR_KEY_PREFIX = b"Error:" -# Default resource requirements for actors when no resource requirements are -# specified. -DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE = 1 -DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0 -# Default resource requirements for actors when some resource requirements are -# specified. -DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0 -DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1 - # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at # entry/init points. @@ -2480,23 +2471,8 @@ def make_decorator(num_return_vals=None, raise Exception("The keyword 'max_calls' is not allowed for " "actors.") - # Set the actor default resources. - if num_cpus is None and num_gpus is None and resources is None: - # In the default case, actors acquire no resources for - # their lifetime, and actor methods will require 1 CPU. - cpus_to_use = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE - actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE - else: - # If any resources are specified, then all resources are - # acquired for the actor's lifetime and no resources are - # associated with methods. - cpus_to_use = (DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE - if num_cpus is None else num_cpus) - actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE - - return worker.make_actor(function_or_class, cpus_to_use, num_gpus, - resources, actor_method_cpus, - max_reconstructions) + return worker.make_actor(function_or_class, num_cpus, num_gpus, + resources, max_reconstructions) raise Exception("The @ray.remote decorator must be applied to " "either a function or to a class.")