mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 22:37:36 +08:00
Changes where actor resources are assigned (#4323)
This commit is contained in:
committed by
Robert Nishihara
parent
01699ce4ea
commit
11580fb7dc
+28
-13
@@ -21,8 +21,6 @@ from ray.utils import _random_string
|
||||
from ray import (ObjectID, ActorID, ActorHandleID, ActorClassID, TaskID,
|
||||
DriverID)
|
||||
|
||||
DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -166,7 +164,7 @@ class ActorClass(object):
|
||||
"""
|
||||
|
||||
def __init__(self, modified_class, class_id, max_reconstructions, num_cpus,
|
||||
num_gpus, resources, actor_method_cpus):
|
||||
num_gpus, resources):
|
||||
self._modified_class = modified_class
|
||||
self._class_id = class_id
|
||||
self._class_name = modified_class.__name__
|
||||
@@ -174,7 +172,6 @@ class ActorClass(object):
|
||||
self._num_cpus = num_cpus
|
||||
self._num_gpus = num_gpus
|
||||
self._resources = resources
|
||||
self._actor_method_cpus = actor_method_cpus
|
||||
self._exported = False
|
||||
|
||||
self._actor_methods = inspect.getmembers(
|
||||
@@ -215,7 +212,7 @@ class ActorClass(object):
|
||||
method.__ray_num_return_vals__)
|
||||
else:
|
||||
self._actor_method_num_return_vals[method_name] = (
|
||||
DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS)
|
||||
ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
raise Exception("Actors methods cannot be instantiated directly. "
|
||||
@@ -276,6 +273,25 @@ class ActorClass(object):
|
||||
# updated to reflect the new invocation.
|
||||
actor_cursor = None
|
||||
|
||||
# Set the actor's default resources if not already set. First three
|
||||
# conditions are to check that no resources were specified in the
|
||||
# decorator. Last three conditions are to check that no resources were
|
||||
# specified when _remote() was called.
|
||||
if (self._num_cpus is None and self._num_gpus is None
|
||||
and self._resources is None and num_cpus is None
|
||||
and num_gpus is None and resources is None):
|
||||
# In the default case, actors acquire no resources for
|
||||
# their lifetime, and actor methods will require 1 CPU.
|
||||
cpus_to_use = ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE
|
||||
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE
|
||||
else:
|
||||
# If any resources are specified (here or in decorator), then
|
||||
# all resources are acquired for the actor's lifetime and no
|
||||
# resources are associated with methods.
|
||||
cpus_to_use = (ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED
|
||||
if self._num_cpus is None else self._num_cpus)
|
||||
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED
|
||||
|
||||
# Do not export the actor class or the actor if run in LOCAL_MODE
|
||||
# Instead, instantiate the actor locally and add it to the worker's
|
||||
# dictionary
|
||||
@@ -290,15 +306,15 @@ class ActorClass(object):
|
||||
self._exported = True
|
||||
|
||||
resources = ray.utils.resources_from_resource_arguments(
|
||||
self._num_cpus, self._num_gpus, self._resources, num_cpus,
|
||||
cpus_to_use, self._num_gpus, self._resources, num_cpus,
|
||||
num_gpus, resources)
|
||||
|
||||
# If the actor methods require CPU resources, then set the required
|
||||
# placement resources. If actor_placement_resources is empty, then
|
||||
# the required placement resources will be the same as resources.
|
||||
actor_placement_resources = {}
|
||||
assert self._actor_method_cpus in [0, 1]
|
||||
if self._actor_method_cpus == 1:
|
||||
assert actor_method_cpu in [0, 1]
|
||||
if actor_method_cpu == 1:
|
||||
actor_placement_resources = resources.copy()
|
||||
actor_placement_resources["CPU"] += 1
|
||||
|
||||
@@ -322,8 +338,8 @@ class ActorClass(object):
|
||||
actor_handle = ActorHandle(
|
||||
actor_id, self._modified_class.__module__, self._class_name,
|
||||
actor_cursor, self._actor_method_names, self._method_signatures,
|
||||
self._actor_method_num_return_vals, actor_cursor,
|
||||
self._actor_method_cpus, worker.task_driver_id)
|
||||
self._actor_method_num_return_vals, actor_cursor, actor_method_cpu,
|
||||
worker.task_driver_id)
|
||||
# We increment the actor counter by 1 to account for the actor creation
|
||||
# task.
|
||||
actor_handle._ray_actor_counter += 1
|
||||
@@ -664,8 +680,7 @@ class ActorHandle(object):
|
||||
return self._deserialization_helper(state, False)
|
||||
|
||||
|
||||
def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus,
|
||||
max_reconstructions):
|
||||
def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions):
|
||||
# Give an error if cls is an old-style class.
|
||||
if not issubclass(cls, object):
|
||||
raise TypeError(
|
||||
@@ -720,7 +735,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus,
|
||||
class_id = ActorClassID(_random_string())
|
||||
|
||||
return ActorClass(Class, class_id, max_reconstructions, num_cpus, num_gpus,
|
||||
resources, actor_method_cpus)
|
||||
resources)
|
||||
|
||||
|
||||
ray.worker.global_worker.make_actor = make_actor
|
||||
|
||||
@@ -25,6 +25,17 @@ DEFAULT_REDIS_MAX_MEMORY_BYTES = 10**10
|
||||
# The smallest cap on the memory used by Redis that we allow.
|
||||
REDIS_MINIMUM_MEMORY_BYTES = 10**7
|
||||
|
||||
# Default resource requirements for actors when no resource requirements are
|
||||
# specified.
|
||||
DEFAULT_ACTOR_METHOD_CPU_SIMPLE = 1
|
||||
DEFAULT_ACTOR_CREATION_CPU_SIMPLE = 0
|
||||
# Default resource requirements for actors when some resource requirements are
|
||||
# specified in .
|
||||
DEFAULT_ACTOR_METHOD_CPU_SPECIFIED = 0
|
||||
DEFAULT_ACTOR_CREATION_CPU_SPECIFIED = 1
|
||||
# Default number of return values for each actor method.
|
||||
DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1
|
||||
|
||||
# If a remote function or actor (or some other export) has serialized size
|
||||
# greater than this quantity, print an warning.
|
||||
PICKLE_OBJECT_WARNING_SIZE = 10**7
|
||||
|
||||
@@ -502,6 +502,89 @@ def test_actor_class_methods(ray_start_regular):
|
||||
assert ray.get(a.g.remote(2)) == 4
|
||||
|
||||
|
||||
def test_resource_assignment(shutdown_only):
|
||||
"""Test to make sure that we assign resource to actors at instantiation."""
|
||||
# This test will create 16 actors. Declaring this many CPUs initially will
|
||||
# speed up the test because the workers will be started ahead of time.
|
||||
ray.init(num_cpus=16, num_gpus=1, resources={"Custom": 1})
|
||||
|
||||
class Actor(object):
|
||||
def __init__(self):
|
||||
self.resources = ray.get_resource_ids()
|
||||
|
||||
def get_actor_resources(self):
|
||||
return self.resources
|
||||
|
||||
def get_actor_method_resources(self):
|
||||
return ray.get_resource_ids()
|
||||
|
||||
decorator_resource_args = [{}, {
|
||||
"num_cpus": 0.1
|
||||
}, {
|
||||
"num_gpus": 0.1
|
||||
}, {
|
||||
"resources": {
|
||||
"Custom": 0.1
|
||||
}
|
||||
}]
|
||||
instantiation_resource_args = [{}, {
|
||||
"num_cpus": 0.2
|
||||
}, {
|
||||
"num_gpus": 0.2
|
||||
}, {
|
||||
"resources": {
|
||||
"Custom": 0.2
|
||||
}
|
||||
}]
|
||||
for decorator_args in decorator_resource_args:
|
||||
for instantiation_args in instantiation_resource_args:
|
||||
if len(decorator_args) == 0:
|
||||
actor_class = ray.remote(Actor)
|
||||
else:
|
||||
actor_class = ray.remote(**decorator_args)(Actor)
|
||||
actor = actor_class._remote(**instantiation_args)
|
||||
actor_resources = ray.get(actor.get_actor_resources.remote())
|
||||
actor_method_resources = ray.get(
|
||||
actor.get_actor_method_resources.remote())
|
||||
if len(decorator_args) == 0 and len(instantiation_args) == 0:
|
||||
assert len(actor_resources) == 0, (
|
||||
"Actor should not be assigned resources.")
|
||||
assert list(actor_method_resources.keys()) == [
|
||||
"CPU"
|
||||
], ("Actor method should only have CPUs")
|
||||
assert actor_method_resources["CPU"][0][1] == 1, (
|
||||
"Actor method should default to one cpu.")
|
||||
else:
|
||||
if ("num_cpus" not in decorator_args
|
||||
and "num_cpus" not in instantiation_args):
|
||||
assert actor_resources["CPU"][0][1] == 1, (
|
||||
"Actor should default to one cpu.")
|
||||
correct_resources = {}
|
||||
defined_resources = decorator_args.copy()
|
||||
defined_resources.update(instantiation_args)
|
||||
for resource, value in defined_resources.items():
|
||||
if resource == "num_cpus":
|
||||
correct_resources["CPU"] = value
|
||||
elif resource == "num_gpus":
|
||||
correct_resources["GPU"] = value
|
||||
elif resource == "resources":
|
||||
for custom_resource, amount in value.items():
|
||||
correct_resources[custom_resource] = amount
|
||||
for resource, amount in correct_resources.items():
|
||||
assert (actor_resources[resource][0][0] ==
|
||||
actor_method_resources[resource][0][0]), (
|
||||
"Should have assigned same {} for both actor ",
|
||||
"and actor method.".format(resource))
|
||||
assert (actor_resources[resource][0][
|
||||
1] == actor_method_resources[resource][0][1]), (
|
||||
"Should have assigned same amount of {} for both ",
|
||||
"actor and actor method.".format(resource))
|
||||
assert actor_resources[resource][0][1] == amount, (
|
||||
"Actor should have {amount} {resource} but has ",
|
||||
"{amount} {resource}".format(
|
||||
amount=amount, resource=resource))
|
||||
|
||||
|
||||
def test_multiple_actors(ray_start_regular):
|
||||
@ray.remote
|
||||
class Counter(object):
|
||||
|
||||
@@ -807,7 +807,7 @@ def test_defining_remote_functions(shutdown_only):
|
||||
|
||||
|
||||
def test_submit_api(shutdown_only):
|
||||
ray.init(num_cpus=1, num_gpus=1, resources={"Custom": 1})
|
||||
ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1})
|
||||
|
||||
@ray.remote
|
||||
def f(n):
|
||||
|
||||
@@ -139,8 +139,11 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
|
||||
pass
|
||||
|
||||
actors = [
|
||||
Actor._remote(args=[], kwargs={}, resources={str(i % num_nodes): 1})
|
||||
for i in range(100)
|
||||
Actor._remote(
|
||||
args=[],
|
||||
kwargs={},
|
||||
num_cpus=0.01,
|
||||
resources={str(i % num_nodes): 1}) for i in range(100)
|
||||
]
|
||||
|
||||
# Wait for the actors to start up.
|
||||
|
||||
+2
-26
@@ -76,15 +76,6 @@ PYTHON_MODE = 3
|
||||
|
||||
ERROR_KEY_PREFIX = b"Error:"
|
||||
|
||||
# Default resource requirements for actors when no resource requirements are
|
||||
# specified.
|
||||
DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE = 1
|
||||
DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0
|
||||
# Default resource requirements for actors when some resource requirements are
|
||||
# specified.
|
||||
DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0
|
||||
DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1
|
||||
|
||||
# Logger for this module. It should be configured at the entry point
|
||||
# into the program using Ray. Ray provides a default configuration at
|
||||
# entry/init points.
|
||||
@@ -2480,23 +2471,8 @@ def make_decorator(num_return_vals=None,
|
||||
raise Exception("The keyword 'max_calls' is not allowed for "
|
||||
"actors.")
|
||||
|
||||
# Set the actor default resources.
|
||||
if num_cpus is None and num_gpus is None and resources is None:
|
||||
# In the default case, actors acquire no resources for
|
||||
# their lifetime, and actor methods will require 1 CPU.
|
||||
cpus_to_use = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE
|
||||
actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE
|
||||
else:
|
||||
# If any resources are specified, then all resources are
|
||||
# acquired for the actor's lifetime and no resources are
|
||||
# associated with methods.
|
||||
cpus_to_use = (DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE
|
||||
if num_cpus is None else num_cpus)
|
||||
actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE
|
||||
|
||||
return worker.make_actor(function_or_class, cpus_to_use, num_gpus,
|
||||
resources, actor_method_cpus,
|
||||
max_reconstructions)
|
||||
return worker.make_actor(function_or_class, num_cpus, num_gpus,
|
||||
resources, max_reconstructions)
|
||||
|
||||
raise Exception("The @ray.remote decorator must be applied to "
|
||||
"either a function or to a class.")
|
||||
|
||||
Reference in New Issue
Block a user