diff --git a/python/ray/__init__.py b/python/ray/__init__.py index f16f67c24..44f4af054 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -17,5 +17,6 @@ import ray.experimental import ray.serialization from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log from ray.actor import actor +from ray.actor import get_gpu_ids from ray.worker import EnvironmentVariable, env from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE diff --git a/python/ray/actor.py b/python/ray/actor.py index ef0bab812..78d3398bc 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -4,6 +4,7 @@ from __future__ import print_function import hashlib import inspect +import json import numpy as np import photon import random @@ -12,6 +13,18 @@ import ray.pickling as pickling import ray.worker import ray.experimental.state as state +# This is a variable used by each actor to indicate the IDs of the GPUs that +# the worker is currently allowed to use. +gpu_ids = [] + +def get_gpu_ids(): + """Get the IDs of the GPU that are available to the worker. + + Each ID is an integer in the range [0, NUM_GPUS - 1], where NUM_GPUS is the + number of GPUs that the node has. + """ + return gpu_ids + def random_string(): return np.random.bytes(20) @@ -35,11 +48,13 @@ def get_actor_method_function_id(attr): def fetch_and_register_actor(key, worker): """Import an actor.""" - driver_id, actor_id_str, actor_name, module, pickled_class = \ - worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class"]) + driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids = \ + worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids"]) actor_id = photon.ObjectID(actor_id_str) actor_name = actor_name.decode("ascii") module = module.decode("ascii") + global gpu_ids + gpu_ids = json.loads(assigned_gpu_ids.decode("ascii")) try: unpickled_class = pickling.loads(pickled_class) except: @@ -54,13 +69,63 @@ def fetch_and_register_actor(key, worker): # We do not set worker.function_properties[driver_id][function_id] because # we currently do need the actor worker to submit new tasks for the actor. -def export_actor(actor_id, Class, actor_method_names, worker): +def select_local_scheduler(local_schedulers, num_gpus, worker): + """Select a local scheduler to assign this actor to. + + Args: + local_schedulers: A list of dictionaries of information about the local + schedulers. + num_gpus (int): The number of GPUs that must be reserved for this actor. + + Returns: + A tuple of the ID of the local scheduler that has been chosen and a list of + the gpu_ids that are reserved for the actor. + + Raises: + Exception: An exception is raised if no local scheduler can be found with + sufficient resources. + """ + # TODO(rkn): We should change this method to have a list of GPU IDs that we + # pop from and push to. The current implementation is not compatible with + # actors releasing GPU resources. + if num_gpus == 0: + local_scheduler_id = random.choice(local_schedulers)[b"ray_client_id"] + gpu_ids = [] + else: + # All of this logic is for finding a local scheduler that has enough + # available GPUs. + local_scheduler_id = None + # Loop through all of the local schedulers. + for local_scheduler in local_schedulers: + # See if there are enough available GPUs on this local scheduler. + local_scheduler_total_gpus = int(float(local_scheduler[b"num_gpus"].decode("ascii"))) + gpus_in_use = worker.redis_client.hget(local_scheduler[b"ray_client_id"], b"gpus_in_use") + gpus_in_use = 0 if gpus_in_use is None else int(gpus_in_use) + if gpus_in_use + num_gpus <= local_scheduler_total_gpus: + # Attempt to reserve some GPUs for this actor. + new_gpus_in_use = worker.redis_client.hincrby(local_scheduler[b"ray_client_id"], b"gpus_in_use", num_gpus) + if new_gpus_in_use > local_scheduler_total_gpus: + # If we failed to reserve the GPUs, undo the increment. + worker.redis_client.hincrby(local_scheduler[b"ray_client_id"], b"gpus_in_use", num_gpus) + else: + # We succeeded at reserving the GPUs, so we are done. + local_scheduler_id = local_scheduler[b"ray_client_id"] + gpu_ids = list(range(new_gpus_in_use - num_gpus, new_gpus_in_use)) + break + if local_scheduler_id is None: + raise Exception("Could not find a node with enough GPUs to create this " + "actor. The local scheduler information is {}.".format(local_schedulers)) + return local_scheduler_id, gpu_ids + +def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, worker): """Export an actor to redis. Args: actor_id: The ID of the actor. Class: Name of the class to be exported as an actor. actor_method_names (list): A list of the names of this actor's methods. + num_cpus (int): The number of CPUs that this actor requires. + num_gpus (int): The number of GPUs that this actor requires. """ ray.worker.check_main_thread() if worker.mode is None: @@ -68,15 +133,15 @@ def export_actor(actor_id, Class, actor_method_names, worker): key = "Actor:{}".format(actor_id.id()) pickled_class = pickling.dumps(Class) - # For now, all actor methods have 1 return value and require 0 CPUs and GPUs. + # For now, all actor methods have 1 return value. driver_id = worker.task_driver_id.id() for actor_method_name in actor_method_names: function_id = get_actor_method_function_id(actor_method_name).id() - worker.function_properties[driver_id][function_id] = (1, 0, 0) + worker.function_properties[driver_id][function_id] = (1, num_cpus, num_gpus) # Select a local scheduler for the actor. - local_schedulers = state.get_local_schedulers() - local_scheduler_id = random.choice(local_schedulers) + local_schedulers = state.get_local_schedulers(worker) + local_scheduler_id, gpu_ids = select_local_scheduler(local_schedulers, num_gpus, worker) worker.redis_client.publish("actor_notifications", actor_id.id() + local_scheduler_id) @@ -84,56 +149,73 @@ def export_actor(actor_id, Class, actor_method_names, worker): "actor_id": actor_id.id(), "name": Class.__name__, "module": Class.__module__, - "class": pickled_class} + "class": pickled_class, + "gpu_ids": json.dumps(gpu_ids)} worker.redis_client.hmset(key, d) worker.redis_client.rpush("Exports", key) -def actor(Class): - # The function actor_method_call gets called if somebody tries to call a - # method on their local actor stub object. - def actor_method_call(actor_id, attr, *args, **kwargs): - ray.worker.check_connected() - ray.worker.check_main_thread() - args = list(args) - if len(kwargs) > 0: - raise Exception("Actors currently do not support **kwargs.") - function_id = get_actor_method_function_id(attr) - # TODO(pcm): Extend args with keyword args. - # For now, actor methods should not require resources beyond the resources - # used by the actor. - num_cpus = 0 - num_gpus = 0 - object_ids = ray.worker.global_worker.submit_task(function_id, "", args, - actor_id=actor_id) - if len(object_ids) == 1: - return object_ids[0] - elif len(object_ids) > 1: - return object_ids +def actor(*args, **kwargs): + def make_actor_decorator(num_cpus=1, num_gpus=0): + def make_actor(Class): + # The function actor_method_call gets called if somebody tries to call a + # method on their local actor stub object. + def actor_method_call(actor_id, attr, *args, **kwargs): + ray.worker.check_connected() + ray.worker.check_main_thread() + args = list(args) + if len(kwargs) > 0: + raise Exception("Actors currently do not support **kwargs.") + function_id = get_actor_method_function_id(attr) + # TODO(pcm): Extend args with keyword args. + object_ids = ray.worker.global_worker.submit_task(function_id, "", args, + actor_id=actor_id) + if len(object_ids) == 1: + return object_ids[0] + elif len(object_ids) > 1: + return object_ids - class NewClass(object): - def __init__(self, *args, **kwargs): - self._ray_actor_id = random_actor_id() - self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))} - export_actor(self._ray_actor_id, Class, self._ray_actor_methods, ray.worker.global_worker) - # Call __init__ as a remote function. - if "__init__" in self._ray_actor_methods.keys(): - actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs) - else: - print("WARNING: this object has no __init__ method.") - # Make tab completion work. - def __dir__(self): - return self._ray_actor_methods - def __getattribute__(self, attr): - # The following is needed so we can still access self.actor_methods. - if attr in ["_ray_actor_id", "_ray_actor_methods"]: - return super(NewClass, self).__getattribute__(attr) - if attr in self._ray_actor_methods.keys(): - return lambda *args, **kwargs: actor_method_call(self._ray_actor_id, attr, *args, **kwargs) - # There is no method with this name, so raise an exception. - raise AttributeError("'{}' Actor object has no attribute '{}'".format(Class, attr)) - def __repr__(self): - return "Actor(" + self._ray_actor_id.hex() + ")" + class NewClass(object): + def __init__(self, *args, **kwargs): + self._ray_actor_id = random_actor_id() + self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))} + export_actor(self._ray_actor_id, Class, self._ray_actor_methods, num_cpus, num_gpus, ray.worker.global_worker) + # Call __init__ as a remote function. + if "__init__" in self._ray_actor_methods.keys(): + actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs) + else: + print("WARNING: this object has no __init__ method.") + # Make tab completion work. + def __dir__(self): + return self._ray_actor_methods + def __getattribute__(self, attr): + # The following is needed so we can still access self.actor_methods. + if attr in ["_ray_actor_id", "_ray_actor_methods"]: + return super(NewClass, self).__getattribute__(attr) + if attr in self._ray_actor_methods.keys(): + return lambda *args, **kwargs: actor_method_call(self._ray_actor_id, attr, *args, **kwargs) + # There is no method with this name, so raise an exception. + raise AttributeError("'{}' Actor object has no attribute '{}'".format(Class, attr)) + def __repr__(self): + return "Actor(" + self._ray_actor_id.hex() + ")" - return NewClass + return NewClass + return make_actor + + if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): + # In this case, the actor decorator was applied directly to a class + # definition. + Class = args[0] + return make_actor_decorator(num_cpus=1, num_gpus=0)(Class) + + # In this case, the actor decorator is something like @ray.actor(num_gpus=1). + if len(args) == 0 and len(kwargs) > 0 and all([key in ["num_cpus", "num_gpus"] for key in kwargs.keys()]): + num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs.keys() else 1 + num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs.keys() else 0 + return make_actor_decorator(num_cpus=num_cpus, num_gpus=num_gpus) + + raise Exception("The ray.actor decorator must either be applied with no " + "arguments as in '@ray.actor', or it must be applied using " + "some of the arguments 'num_cpus' or 'num_gpus' as in " + "'ray.actor(num_gpus=1)'.") ray.worker.global_worker.fetch_and_register["Actor"] = fetch_and_register_actor diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 5b1147afb..05ce00b59 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -2,12 +2,10 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import ray.worker - -def get_local_schedulers(): +def get_local_schedulers(worker): local_schedulers = [] - for client in ray.worker.global_worker.redis_client.keys("CL:*"): - client_type, ray_client_id = ray.worker.global_worker.redis_client.hmget(client, "client_type", "ray_client_id") - if client_type == b"photon": - local_schedulers.append(ray_client_id) + for client in worker.redis_client.keys("CL:*"): + client_info = worker.redis_client.hgetall(client) + if client_info[b"client_type"] == b"photon": + local_schedulers.append(client_info) return local_schedulers diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index dccd57f0f..eefcd248e 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -303,21 +303,38 @@ local_scheduler_state *init_local_scheduler( if (redis_addr != NULL) { int num_args; const char **db_connect_args = NULL; + /* Use UT_string to convert the resource value into a string. */ + UT_string *num_cpus; + UT_string *num_gpus; + utstring_new(num_cpus); + utstring_new(num_gpus); + utstring_printf(num_cpus, "%f", static_resource_conf[0]); + utstring_printf(num_gpus, "%f", static_resource_conf[1]); if (plasma_manager_address != NULL) { - num_args = 4; + num_args = 8; db_connect_args = malloc(sizeof(char *) * num_args); db_connect_args[0] = "local_scheduler_socket_name"; db_connect_args[1] = local_scheduler_socket_name; - db_connect_args[2] = "aux_address"; - db_connect_args[3] = plasma_manager_address; + db_connect_args[2] = "num_cpus"; + db_connect_args[3] = utstring_body(num_cpus); + db_connect_args[4] = "num_gpus"; + db_connect_args[5] = utstring_body(num_gpus); + db_connect_args[6] = "aux_address"; + db_connect_args[7] = plasma_manager_address; } else { - num_args = 2; + num_args = 6; db_connect_args = malloc(sizeof(char *) * num_args); db_connect_args[0] = "local_scheduler_socket_name"; db_connect_args[1] = local_scheduler_socket_name; + db_connect_args[2] = "num_cpus"; + db_connect_args[3] = utstring_body(num_cpus); + db_connect_args[4] = "num_gpus"; + db_connect_args[5] = utstring_body(num_gpus); } state->db = db_connect(redis_addr, redis_port, "photon", node_ip_address, num_args, db_connect_args); + utstring_free(num_cpus); + utstring_free(num_gpus); free(db_connect_args); db_attach(state->db, loop, false); } else { diff --git a/test/actor_test.py b/test/actor_test.py index f200088c8..d6f749265 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -132,6 +132,50 @@ class ActorAPI(unittest.TestCase): # # TODO(rkn): Implement this. # pass + def testDecoratorArgs(self): + ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + + # This is an invalid way of using the actor decorator. + with self.assertRaises(Exception): + @ray.actor() + class Actor(object): + def __init__(self): + pass + + # This is an invalid way of using the actor decorator. + with self.assertRaises(Exception): + @ray.actor(invalid_kwarg=0) + class Actor(object): + def __init__(self): + pass + + # This is an invalid way of using the actor decorator. + with self.assertRaises(Exception): + @ray.actor(num_cpus=0, invalid_kwarg=0) + class Actor(object): + def __init__(self): + pass + + # This is a valid way of using the decorator. + @ray.actor(num_cpus=1) + class Actor(object): + def __init__(self): + pass + + # This is a valid way of using the decorator. + @ray.actor(num_gpus=1) + class Actor(object): + def __init__(self): + pass + + # This is a valid way of using the decorator. + @ray.actor(num_cpus=1, num_gpus=1) + class Actor(object): + def __init__(self): + pass + + ray.worker.cleanup() + class ActorMethods(unittest.TestCase): def testDefineActor(self): @@ -479,5 +523,156 @@ class ActorsOnMultipleNodes(unittest.TestCase): ray.worker.cleanup() +class ActorsWithGPUs(unittest.TestCase): + + def testActorGPUs(self): + num_local_schedulers = 3 + num_gpus_per_scheduler = 4 + ray.worker._init(start_ray_local=True, num_workers=0, + num_local_schedulers=num_local_schedulers, + num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) + + @ray.actor(num_gpus=1) + class Actor1(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + def get_location_and_ids(self): + return ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids) + + # Create one actor per GPU. + actors = [Actor1() for _ in range(num_local_schedulers * num_gpus_per_scheduler)] + # Make sure that no two actors are assigned to the same GPU. + locations_and_ids = ray.get([actor.get_location_and_ids() for actor in actors]) + node_names = set([location for location, gpu_id in locations_and_ids]) + self.assertEqual(len(node_names), num_local_schedulers) + location_actor_combinations = [] + for node_name in node_names: + for gpu_id in range(num_gpus_per_scheduler): + location_actor_combinations.append((node_name, (gpu_id,))) + self.assertEqual(set(locations_and_ids), set(location_actor_combinations)) + + # Creating a new actor should fail because all of the GPUs are being used. + with self.assertRaises(Exception): + a = Actor1() + + ray.worker.cleanup() + + def testActorMultipleGPUs(self): + num_local_schedulers = 3 + num_gpus_per_scheduler = 5 + ray.worker._init(start_ray_local=True, num_workers=0, + num_local_schedulers=num_local_schedulers, + num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) + + @ray.actor(num_gpus=2) + class Actor1(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + def get_location_and_ids(self): + return ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids) + + # Create some actors. + actors = [Actor1() for _ in range(num_local_schedulers * 2)] + # Make sure that no two actors are assigned to the same GPU. + locations_and_ids = ray.get([actor.get_location_and_ids() for actor in actors]) + node_names = set([location for location, gpu_id in locations_and_ids]) + self.assertEqual(len(node_names), num_local_schedulers) + location_actor_combinations = [] + for node_name in node_names: + location_actor_combinations.append((node_name, (0, 1))) + location_actor_combinations.append((node_name, (2, 3))) + self.assertEqual(set(locations_and_ids), set(location_actor_combinations)) + + # Creating a new actor should fail because all of the GPUs are being used. + with self.assertRaises(Exception): + a = Actor1() + + # We should be able to create more actors that use only a single GPU. + @ray.actor(num_gpus=1) + class Actor2(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + def get_location_and_ids(self): + return ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids) + + # Create some actors. + actors = [Actor2() for _ in range(num_local_schedulers)] + # Make sure that no two actors are assigned to the same GPU. + locations_and_ids = ray.get([actor.get_location_and_ids() for actor in actors]) + node_names = set([location for location, gpu_id in locations_and_ids]) + self.assertEqual(len(node_names), num_local_schedulers) + location_actor_combinations = [] + for node_name in node_names: + location_actor_combinations.append((node_name, (4,))) + self.assertEqual(set(locations_and_ids), set(location_actor_combinations)) + + # Creating a new actor should fail because all of the GPUs are being used. + with self.assertRaises(Exception): + a = Actor2() + + ray.worker.cleanup() + + def testActorDifferentNumbersOfGPUs(self): + # Test that we can create actors on two nodes that have different numbers of + # GPUs. + ray.worker._init(start_ray_local=True, num_workers=0, + num_local_schedulers=3, num_gpus=[0, 5, 10]) + + @ray.actor(num_gpus=1) + class Actor1(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + def get_location_and_ids(self): + return ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids) + + # Create some actors. + actors = [Actor1() for _ in range(0 + 5 + 10)] + # Make sure that no two actors are assigned to the same GPU. + locations_and_ids = ray.get([actor.get_location_and_ids() for actor in actors]) + node_names = set([location for location, gpu_id in locations_and_ids]) + self.assertEqual(len(node_names), 2) + for node_name in node_names: + node_gpu_ids = [gpu_id for location, gpu_id in locations_and_ids if location == node_name] + self.assertIn(len(node_gpu_ids), [5, 10]) + self.assertEqual(set(node_gpu_ids), set([(i,) for i in range(len(node_gpu_ids))])) + + # Creating a new actor should fail because all of the GPUs are being used. + with self.assertRaises(Exception): + a = Actor1() + + ray.worker.cleanup() + + def testActorMultipleGPUsFromMultipleTasks(self): + num_local_schedulers = 10 + num_gpus_per_scheduler = 10 + ray.worker._init(start_ray_local=True, num_workers=0, + num_local_schedulers=num_local_schedulers, + num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) + + @ray.remote + def create_actors(n): + @ray.actor(num_gpus=1) + class Actor(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + def get_location_and_ids(self): + return ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids) + # Create n actors. + for _ in range(n): + Actor() + + ray.get([create_actors.remote(10) for _ in range(10)]) + + @ray.actor(num_gpus=1) + class Actor(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + def get_location_and_ids(self): + return ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids) + + # All the GPUs should be used up now. + with self.assertRaises(Exception): + Actor() + if __name__ == "__main__": unittest.main(verbosity=2)