From c688a64235852361276ea446ce79b63fa3a7b785 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 7 May 2017 13:03:49 -0700 Subject: [PATCH] Expose GPU IDs to remote functions. (#496) * Change local scheduler bookkeeping to use GPU IDs. * Update actor test. * Add tests for actors and tasks simultaneously using GPUs. * Add additional task GPU ID test. * Fix linting. * Make redis GPU assignment ignore GPU IDs. * Small fix. --- python/ray/__init__.py | 3 +- python/ray/actor.py | 95 ++++----- python/ray/global_scheduler/test/test.py | 2 +- python/ray/local_scheduler/test/test.py | 2 +- python/ray/monitor.py | 6 +- python/ray/worker.py | 20 +- .../format/local_scheduler.fbs | 8 +- src/local_scheduler/local_scheduler.cc | 70 +++++-- .../local_scheduler_algorithm.cc | 13 +- src/local_scheduler/local_scheduler_client.cc | 36 +++- src/local_scheduler/local_scheduler_client.h | 14 +- .../local_scheduler_extension.cc | 26 ++- src/local_scheduler/local_scheduler_shared.h | 18 +- .../test/local_scheduler_tests.cc | 2 +- test/actor_test.py | 192 +++++++++++++++++- test/runtest.py | 85 ++++++++ 16 files changed, 461 insertions(+), 131 deletions(-) diff --git a/python/ray/__init__.py b/python/ray/__init__.py index f5e47c133..9683559e6 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -4,9 +4,8 @@ from __future__ import print_function from ray.worker import (register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, - flush_log) + flush_log, get_gpu_ids) from ray.actor import actor -from ray.actor import get_gpu_ids from ray.worker import EnvironmentVariable, env from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE from ray.worker import global_state diff --git a/python/ray/actor.py b/python/ray/actor.py index 648ffd380..65f03bbd6 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -15,19 +15,6 @@ import ray.signature as signature import ray.worker from ray.utils import random_string, binary_to_hex, hex_to_binary -# This is a variable used by each actor to indicate the IDs of the GPUs that -# the worker is currently allowed to use. -gpu_ids = [] - - -def get_gpu_ids(): - """Get the IDs of the GPU that are available to the worker. - - Each ID is an integer in the range [0, NUM_GPUS - 1], where NUM_GPUS is the - number of GPUs that the node has. - """ - return gpu_ids - def random_actor_id(): return ray.local_scheduler.ObjectID(random_string()) @@ -60,8 +47,6 @@ def fetch_and_register_actor(key, worker): actor_name = actor_name.decode("ascii") module = module.decode("ascii") actor_method_names = json.loads(actor_method_names.decode("ascii")) - global gpu_ids - gpu_ids = json.loads(assigned_gpu_ids.decode("ascii")) # Create a temporary actor with some temporary methods so that if the actor # fails to be unpickled, the temporary actor can be used (just to produce @@ -110,13 +95,13 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, worker): local_scheduler: Information about the local scheduler. Returns: - A list of the GPU IDs that were successfully acquired. This should have - length either equal to num_gpus or equal to 0. + True if the GPUs were successfully reserved and false otherwise. """ + assert num_gpus != 0 local_scheduler_id = local_scheduler["DBClientID"] local_scheduler_total_gpus = int(local_scheduler["NumGPUs"]) - gpus_to_acquire = [] + success = False # Attempt to acquire GPU IDs atomically. with worker.redis_client.pipeline() as pipe: @@ -129,29 +114,25 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, worker): # Figure out which GPUs are currently in use. result = worker.redis_client.hget(local_scheduler_id, "gpus_in_use") gpus_in_use = dict() if result is None else json.loads(result) - all_gpu_ids_in_use = [] + num_gpus_in_use = 0 for key in gpus_in_use: - all_gpu_ids_in_use += gpus_in_use[key] - assert len(all_gpu_ids_in_use) <= local_scheduler_total_gpus - assert len(set(all_gpu_ids_in_use)) == len(all_gpu_ids_in_use) + num_gpus_in_use += gpus_in_use[key] + assert num_gpus_in_use <= local_scheduler_total_gpus pipe.multi() - if local_scheduler_total_gpus - len(all_gpu_ids_in_use) >= num_gpus: - # There are enough available GPUs, so try to reserve some. - all_gpu_ids = set(range(local_scheduler_total_gpus)) - for gpu_id in all_gpu_ids_in_use: - all_gpu_ids.remove(gpu_id) - gpus_to_acquire = list(all_gpu_ids)[:num_gpus] - - # Use the hex driver ID so that the dictionary is JSON serializable. + if local_scheduler_total_gpus - num_gpus_in_use >= num_gpus: + # There are enough available GPUs, so try to reserve some. We use the + # hex driver ID in hex as a dictionary key so that the dictionary is + # JSON serializable. driver_id_hex = binary_to_hex(driver_id) if driver_id_hex not in gpus_in_use: - gpus_in_use[driver_id_hex] = [] - gpus_in_use[driver_id_hex] += gpus_to_acquire + gpus_in_use[driver_id_hex] = 0 + gpus_in_use[driver_id_hex] += num_gpus # Stick the updated GPU IDs back in Redis pipe.hset(local_scheduler_id, "gpus_in_use", json.dumps(gpus_in_use)) + success = True pipe.execute() # If a WatchError is not raised, then the operations should have gone @@ -161,10 +142,10 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, worker): # Another client must have changed the watched key between the time we # started WATCHing it and the pipeline's execution. We should just # retry. - gpus_to_acquire = [] + success = False continue - return gpus_to_acquire + return success def select_local_scheduler(local_schedulers, num_gpus, worker): @@ -176,8 +157,7 @@ def select_local_scheduler(local_schedulers, num_gpus, worker): num_gpus (int): The number of GPUs that must be reserved for this actor. Returns: - A tuple of the ID of the local scheduler that has been chosen and a list of - the gpu_ids that are reserved for the actor. + The ID of the local scheduler that has been chosen. Raises: Exception: An exception is raised if no local scheduler can be found with @@ -188,7 +168,6 @@ def select_local_scheduler(local_schedulers, num_gpus, worker): if num_gpus == 0: local_scheduler_id = hex_to_binary( random.choice(local_schedulers)["DBClientID"]) - gpus_aquired = [] else: # All of this logic is for finding a local scheduler that has enough # available GPUs. @@ -196,20 +175,17 @@ def select_local_scheduler(local_schedulers, num_gpus, worker): # Loop through all of the local schedulers. for local_scheduler in local_schedulers: # Try to reserve enough GPUs on this local scheduler. - gpus_aquired = attempt_to_reserve_gpus(num_gpus, driver_id, - local_scheduler, worker) - if len(gpus_aquired) == num_gpus: + success = attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, + worker) + if success: local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"]) break - else: - # We should have either acquired as many GPUs as we need or none. - assert len(gpus_aquired) == 0 if local_scheduler_id is None: raise Exception("Could not find a node with enough GPUs to create this " "actor. The local scheduler information is {}." .format(local_schedulers)) - return local_scheduler_id, gpus_aquired + return local_scheduler_id def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, @@ -233,8 +209,7 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, driver_id = worker.task_driver_id.id() for actor_method_name in actor_method_names: function_id = get_actor_method_function_id(actor_method_name).id() - worker.function_properties[driver_id][function_id] = (1, num_cpus, - num_gpus) + worker.function_properties[driver_id][function_id] = (1, num_cpus, 0) # Get a list of the local schedulers from the client table. client_table = ray.global_state.client_table() @@ -244,8 +219,22 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, if client["ClientType"] == "local_scheduler": local_schedulers.append(client) # Select a local scheduler for the actor. - local_scheduler_id, gpu_ids = select_local_scheduler(local_schedulers, - num_gpus, worker) + local_scheduler_id = select_local_scheduler(local_schedulers, num_gpus, + worker) + + d = {"driver_id": driver_id, + "actor_id": actor_id.id(), + "name": Class.__name__, + "module": Class.__module__, + "class": pickled_class, + "num_gpus": num_gpus, + "actor_method_names": json.dumps(list(actor_method_names))} + worker.redis_client.hmset(key, d) + worker.redis_client.rpush("Exports", key) + + # We publish the actor notification after the call to hmset so that when the + # newly created actor queries Redis to find the number of GPUs assigned to + # it, that value is present. # Really we should encode this message as a flatbuffer object. However, we're # having trouble getting that to work. It almost works, but in Python 2.7, @@ -254,16 +243,6 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, worker.redis_client.publish("actor_notifications", actor_id.id() + driver_id + local_scheduler_id) - d = {"driver_id": driver_id, - "actor_id": actor_id.id(), - "name": Class.__name__, - "module": Class.__module__, - "class": pickled_class, - "gpu_ids": json.dumps(gpu_ids), - "actor_method_names": json.dumps(list(actor_method_names))} - worker.redis_client.hmset(key, d) - worker.redis_client.rpush("Exports", key) - def actor(*args, **kwargs): def make_actor_decorator(num_cpus=1, num_gpus=0): diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index f8219e22b..29df06f3b 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -102,7 +102,7 @@ class TestGlobalScheduler(unittest.TestCase): static_resource_list=[10, 0]) # Connect to the scheduler. local_scheduler_client = local_scheduler.LocalSchedulerClient( - local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False) + local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0) self.local_scheduler_clients.append(local_scheduler_client) self.local_scheduler_pids.append(p4) diff --git a/python/ray/local_scheduler/test/test.py b/python/ray/local_scheduler/test/test.py index f34e0673d..1a77173ce 100644 --- a/python/ray/local_scheduler/test/test.py +++ b/python/ray/local_scheduler/test/test.py @@ -48,7 +48,7 @@ class TestLocalSchedulerClient(unittest.TestCase): plasma_store_name, use_valgrind=USE_VALGRIND) # Connect to the scheduler. self.local_scheduler_client = local_scheduler.LocalSchedulerClient( - scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False) + scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0) def tearDown(self): # Check that the processes are still alive. diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 7247cb263..eaeb4d746 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -243,7 +243,7 @@ class Monitor(object): if int(local_scheduler["NumGPUs"]) > 0: local_scheduler_id = local_scheduler["DBClientID"] - returned_gpu_ids = [] + num_gpus_returned = 0 # Perform a transaction to return the GPUs. with self.redis.pipeline() as pipe: @@ -258,7 +258,7 @@ class Monitor(object): driver_id_hex = ray.utils.binary_to_hex(driver_id) if driver_id_hex in gpus_in_use: - returned_gpu_ids = gpus_in_use.pop(driver_id_hex) + num_gpus_returned = gpus_in_use.pop(driver_id_hex) pipe.multi() @@ -276,7 +276,7 @@ class Monitor(object): continue log.info("Driver {} is returning GPU IDs {} to local scheduler {}." - .format(driver_id, returned_gpu_ids, local_scheduler_id)) + .format(driver_id, num_gpus_returned, local_scheduler_id)) def process_messages(self): """Process all messages ready in the subscription channels. diff --git a/python/ray/worker.py b/python/ray/worker.py index ab2365ad4..006262254 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -673,6 +673,15 @@ class Worker(object): self.redis_client.rpush("ErrorKeys", error_key) +def get_gpu_ids(): + """Get the IDs of the GPU that are available to the worker. + + Each ID is an integer in the range [0, NUM_GPUS - 1], where NUM_GPUS is the + number of GPUs that the node has. + """ + return global_worker.local_scheduler_client.gpu_ids() + + global_worker = Worker() """Worker: The global Worker object for this worker process. @@ -1339,8 +1348,12 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, Args: info (dict): A dictionary with address of the Redis server and the sockets of the plasma store, plasma manager, and local scheduler. + object_id_seed: A seed to use to make the generation of object IDs + deterministic. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. + actor_id: The ID of the actor running on this worker. If this worker is not + an actor, then this is NIL_ACTOR_ID. """ check_main_thread() # Do some basic checking to make sure we didn't call ray.init twice. @@ -1407,9 +1420,14 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.plasma_client = ray.plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) # Create the local scheduler client. + if worker.actor_id != NIL_ACTOR_ID: + num_gpus = int(worker.redis_client.hget("Actor:{}".format(actor_id), + "num_gpus")) + else: + num_gpus = 0 worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient( info["local_scheduler_socket_name"], worker.worker_id, worker.actor_id, - is_worker) + is_worker, num_gpus) # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index ccbf6ee1b..bb7a49fec 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -4,7 +4,7 @@ enum MessageType:int { // Task is submitted to the local scheduler. This is sent from a worker to a // local scheduler. SubmitTask = 1, - // Notify the local scheduler that a task has finished. This is sent from a + // Notify the local scheduler that a task has finished. This is sent from a // worker to a local scheduler. TaskDone, // Log a message to the event table. This is sent from a worker to a local @@ -37,6 +37,8 @@ enum MessageType:int { table GetTaskReply { // A string of bytes representing the task specification. task_spec: string; + // The IDs of the GPUs that the worker is allowed to use for this task. + gpu_ids: [int]; } table EventLogMessage { @@ -55,9 +57,13 @@ table RegisterClientRequest { actor_id: string; // The process ID of this worker. worker_pid: long; + // The number of GPUs required by this actor. + num_gpus: long; } table RegisterClientReply { + // The IDs of the GPUs that are reserved for this worker. + gpu_ids: [int]; } table ReconstructObject { diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index cd2aa9ae4..2188b7ac7 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -126,7 +126,8 @@ void kill_worker(LocalSchedulerState *state, } /* Release any resources held by the worker. */ - release_resources(state, worker, worker->cpus_in_use, worker->gpus_in_use); + release_resources(state, worker, worker->cpus_in_use, + worker->gpus_in_use.size()); /* Clean up the task in progress. */ if (worker->task_in_progress) { @@ -382,6 +383,10 @@ LocalSchedulerState *LocalSchedulerState_init( state->static_resources[i] = state->dynamic_resources[i] = static_resource_conf[i]; } + /* Initialize available GPUs. */ + for (int i = 0; i < state->static_resources[ResourceIndex_GPU]; ++i) { + state->available_gpus.push_back(i); + } /* Print some debug information about resource configuration. */ print_resource_info(state, NULL); @@ -427,8 +432,13 @@ void acquire_resources(LocalSchedulerState *state, /* Acquire the GPU resources. */ if (num_gpus != 0) { /* Make sure that the worker isn't using any GPUs already. */ - CHECK(worker->gpus_in_use == 0); - worker->gpus_in_use += num_gpus; + CHECK(worker->gpus_in_use.size() == 0); + CHECK(state->available_gpus.size() >= num_gpus); + /* Reserve GPUs for the worker. */ + for (int i = 0; i < num_gpus; i++) { + worker->gpus_in_use.push_back(state->available_gpus.back()); + state->available_gpus.pop_back(); + } /* Update the total quantity of GPU resources available. */ CHECK(state->dynamic_resources[ResourceIndex_GPU] >= num_gpus); state->dynamic_resources[ResourceIndex_GPU] -= num_gpus; @@ -446,9 +456,13 @@ void release_resources(LocalSchedulerState *state, /* Release the GPU resources. */ if (num_gpus != 0) { - CHECK(num_gpus == worker->gpus_in_use); + CHECK(num_gpus == worker->gpus_in_use.size()); + /* Move the GPU IDs the worker was using back to the local scheduler. */ + for (auto const &gpu_id : worker->gpus_in_use) { + state->available_gpus.push_back(gpu_id); + } + worker->gpus_in_use.clear(); state->dynamic_resources[ResourceIndex_GPU] += num_gpus; - worker->gpus_in_use = 0; } } @@ -460,6 +474,14 @@ void assign_task_to_worker(LocalSchedulerState *state, TaskSpec *spec, int64_t task_spec_size, LocalSchedulerClient *worker) { + /* Acquire the necessary resources for running this task. TODO(rkn): We are + * currently ignoring resource bookkeeping for actor methods. */ + if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + acquire_resources(state, worker, + TaskSpec_get_required_resource(spec, ResourceIndex_CPU), + TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); + } + CHECK(ActorID_equal(worker->actor_id, TaskSpec_actor_id(spec))); /* Make sure the driver for this task is still alive. */ WorkerID driver_id = TaskSpec_driver_id(spec); @@ -468,14 +490,15 @@ void assign_task_to_worker(LocalSchedulerState *state, /* Construct a flatbuffer object to send to the worker. */ flatbuffers::FlatBufferBuilder fbb; auto message = - CreateGetTaskReply(fbb, fbb.CreateString((char *) spec, task_spec_size)); + CreateGetTaskReply(fbb, fbb.CreateString((char *) spec, task_spec_size), + fbb.CreateVector(worker->gpus_in_use)); fbb.Finish(message); if (write_message(worker->sock, MessageType_ExecuteTask, fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) { if (errno == EPIPE || errno == EBADF) { - /* TODO(rkn): If this happens, the task should be added back to the task - * queue. */ + /* Something went wrong, so kill the worker. */ + kill_worker(state, worker, false, false); LOG_WARN( "Failed to give task to worker on fd %d. The client may have hung " "up.", @@ -485,14 +508,6 @@ void assign_task_to_worker(LocalSchedulerState *state, } } - /* Acquire the necessary resources for running this task. TODO(rkn): We are - * currently ignoring resource bookkeeping for actor methods. */ - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { - acquire_resources(state, worker, - TaskSpec_get_required_resource(spec, ResourceIndex_CPU), - TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); - } - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_RUNNING, state->db ? get_db_client_id(state->db) : NIL_ID); /* Record which task this worker is executing. This will be freed in @@ -667,7 +682,8 @@ void reconstruct_object(LocalSchedulerState *state, void send_client_register_reply(LocalSchedulerState *state, LocalSchedulerClient *worker) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreateRegisterClientReply(fbb); + auto message = + CreateRegisterClientReply(fbb, fbb.CreateVector(worker->gpus_in_use)); fbb.Finish(message); /* Send the message to the client. */ @@ -716,6 +732,21 @@ void handle_client_register(LocalSchedulerState *state, * worker. */ handle_actor_worker_connect(state, state->algorithm_state, actor_id, worker); + + /* If there are enough GPUs available, allocate them and reply to the + * actor. */ + double num_gpus_required = (double) message->num_gpus(); + if (check_dynamic_resources(state, 0, num_gpus_required)) { + acquire_resources(state, worker, 0, num_gpus_required); + } else { + /* TODO(rkn): This means that an actor wants to register but that there + * aren't enough GPUs for it. We should queue this request, and reply to + * the actor when GPUs become available. */ + LOG_WARN( + "Attempting to create an actor but there aren't enough available " + "GPUs. We'll start the worker anyway without any GPUs, but this is " + "incorrect behavior."); + } } /* Register worker process id with the scheduler. */ @@ -859,10 +890,10 @@ void process_message(event_loop *loop, if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { CHECK(worker->cpus_in_use == TaskSpec_get_required_resource(spec, ResourceIndex_CPU)); - CHECK(worker->gpus_in_use == + CHECK(worker->gpus_in_use.size() == TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); release_resources(state, worker, worker->cpus_in_use, - worker->gpus_in_use); + worker->gpus_in_use.size()); } /* If we're connected to Redis, update tables. */ if (state->db != NULL) { @@ -965,7 +996,6 @@ void new_client_connection(event_loop *loop, worker->client_id = NIL_WORKER_ID; worker->task_in_progress = NULL; worker->cpus_in_use = 0; - worker->gpus_in_use = 0; worker->is_blocked = false; worker->pid = 0; worker->is_child = false; diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index a78ab6df8..8fa0d20cf 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -588,16 +588,9 @@ void dispatch_tasks(LocalSchedulerState *state, return; } /* Skip to the next task if this task cannot currently be satisfied. */ - bool task_satisfied = true; - for (int i = 0; i < ResourceIndex_MAX; i++) { - if (TaskSpec_get_required_resource(task.spec, i) > - state->dynamic_resources[i]) { - /* Insufficient capacity for this task, proceed to the next task. */ - task_satisfied = false; - break; - } - } - if (!task_satisfied) { + if (!check_dynamic_resources( + state, TaskSpec_get_required_resource(task.spec, ResourceIndex_CPU), + TaskSpec_get_required_resource(task.spec, ResourceIndex_GPU))) { /* This task could not be satisfied -- proceed to the next task. */ ++it; continue; diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index c2605eb3c..16e99bf16 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -11,18 +11,19 @@ LocalSchedulerConnection *LocalSchedulerConnection_init( const char *local_scheduler_socket, UniqueID client_id, ActorID actor_id, - bool is_worker) { - LocalSchedulerConnection *result = - (LocalSchedulerConnection *) malloc(sizeof(LocalSchedulerConnection)); + bool is_worker, + int64_t num_gpus) { + LocalSchedulerConnection *result = new LocalSchedulerConnection(); result->conn = connect_ipc_sock_retry(local_scheduler_socket, -1, -1); + result->actor_id = actor_id; /* Register with the local scheduler. * NOTE(swang): If the local scheduler exits and we are registered as a * worker, we will get killed. */ flatbuffers::FlatBufferBuilder fbb; - auto message = - CreateRegisterClientRequest(fbb, is_worker, to_flatbuf(fbb, client_id), - to_flatbuf(fbb, actor_id), getpid()); + auto message = CreateRegisterClientRequest( + fbb, is_worker, to_flatbuf(fbb, client_id), + to_flatbuf(fbb, result->actor_id), getpid(), num_gpus); fbb.Finish(message); /* Register the process ID with the local scheduler. */ int success = write_message(result->conn, MessageType_RegisterClientRequest, @@ -40,8 +41,16 @@ LocalSchedulerConnection *LocalSchedulerConnection_init( } CHECK(type == MessageType_RegisterClientReply); - /* Parse the reply object. We currently don't do anything with it. */ + /* Parse the reply object. */ auto reply_message = flatbuffers::GetRoot(reply); + for (int i = 0; i < reply_message->gpu_ids()->size(); ++i) { + result->gpu_ids.push_back(reply_message->gpu_ids()->Get(i)); + } + /* If the worker is not an actor, there should not be any GPU IDs here. */ + if (ActorID_equal(result->actor_id, NIL_ACTOR_ID)) { + CHECK(reply_message->gpu_ids()->size() == 0); + } + free(reply); return result; @@ -49,7 +58,7 @@ LocalSchedulerConnection *LocalSchedulerConnection_init( void LocalSchedulerConnection_free(LocalSchedulerConnection *conn) { close(conn->conn); - free(conn); + delete conn; } void local_scheduler_log_event(LocalSchedulerConnection *conn, @@ -90,6 +99,17 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, /* Parse the flatbuffer object. */ auto reply_message = flatbuffers::GetRoot(message); + + /* Set the GPU IDs for this task. We only do this for non-actor tasks because + * for actors the GPUs are associated with the actor itself and not with the + * actor methods. */ + if (ActorID_equal(conn->actor_id, NIL_ACTOR_ID)) { + conn->gpu_ids.clear(); + for (int i = 0; i < reply_message->gpu_ids()->size(); ++i) { + conn->gpu_ids.push_back(reply_message->gpu_ids()->Get(i)); + } + } + /* Create a copy of the task spec so we can free the reply. */ *task_size = reply_message->task_spec()->size(); TaskSpec *data = (TaskSpec *) reply_message->task_spec()->data(); diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 8e63a70f7..80c60bf57 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -4,11 +4,16 @@ #include "common/task.h" #include "local_scheduler_shared.h" -typedef struct { +struct LocalSchedulerConnection { /** File descriptor of the Unix domain socket that connects to local * scheduler. */ int conn; -} LocalSchedulerConnection; + /** The actor ID of this client. If this client is not an actor, then this + * should be NIL_ACTOR_ID. */ + ActorID actor_id; + /** The IDs of the GPUs that this client can use. */ + std::vector gpu_ids; +}; /** * Connect to the local scheduler. @@ -19,13 +24,16 @@ typedef struct { * running on this actor, this should be NIL_ACTOR_ID. * @param is_worker Whether this client is a worker. If it is a worker, an * additional message will be sent to register as one. + * @param num_gpus The number of GPUs required by this worker. This is only + * used if the worker is an actor. * @return The connection information. */ LocalSchedulerConnection *LocalSchedulerConnection_init( const char *local_scheduler_socket, UniqueID worker_id, ActorID actor_id, - bool is_worker); + bool is_worker, + int64_t num_gpus); /** * Disconnect from the local scheduler. diff --git a/src/local_scheduler/local_scheduler_extension.cc b/src/local_scheduler/local_scheduler_extension.cc index e9f399ea4..3c4fe620a 100644 --- a/src/local_scheduler/local_scheduler_extension.cc +++ b/src/local_scheduler/local_scheduler_extension.cc @@ -20,15 +20,17 @@ static int PyLocalSchedulerClient_init(PyLocalSchedulerClient *self, UniqueID client_id; ActorID actor_id; PyObject *is_worker; - self->local_scheduler_connection = NULL; - if (!PyArg_ParseTuple(args, "sO&O&O", &socket_name, PyStringToUniqueID, - &client_id, PyStringToUniqueID, &actor_id, - &is_worker)) { + int num_gpus; + if (!PyArg_ParseTuple(args, "sO&O&Oi", &socket_name, PyStringToUniqueID, + &client_id, PyStringToUniqueID, &actor_id, &is_worker, + &num_gpus)) { + self->local_scheduler_connection = NULL; return -1; } /* Connect to the local scheduler. */ self->local_scheduler_connection = LocalSchedulerConnection_init( - socket_name, client_id, actor_id, (bool) PyObject_IsTrue(is_worker)); + socket_name, client_id, actor_id, (bool) PyObject_IsTrue(is_worker), + num_gpus); return 0; } @@ -112,6 +114,18 @@ static PyObject *PyLocalSchedulerClient_compute_put_id(PyObject *self, return PyObjectID_make(put_id); } +static PyObject *PyLocalSchedulerClient_gpu_ids(PyObject *self) { + /* Construct a Python list of GPU IDs. */ + std::vector gpu_ids = + ((PyLocalSchedulerClient *) self)->local_scheduler_connection->gpu_ids; + int num_gpu_ids = gpu_ids.size(); + PyObject *gpu_ids_list = PyList_New((Py_ssize_t) num_gpu_ids); + for (int i = 0; i < num_gpu_ids; ++i) { + PyList_SetItem(gpu_ids_list, i, PyLong_FromLong(gpu_ids[i])); + } + return gpu_ids_list; +} + static PyMethodDef PyLocalSchedulerClient_methods[] = { {"submit", (PyCFunction) PyLocalSchedulerClient_submit, METH_VARARGS, "Submit a task to the local scheduler."}, @@ -126,6 +140,8 @@ static PyMethodDef PyLocalSchedulerClient_methods[] = { METH_NOARGS, "Notify the local scheduler that we are unblocked."}, {"compute_put_id", (PyCFunction) PyLocalSchedulerClient_compute_put_id, METH_VARARGS, "Return the object ID for a put call within a task."}, + {"gpu_ids", (PyCFunction) PyLocalSchedulerClient_gpu_ids, METH_NOARGS, + "Get the IDs of the GPUs that are reserved for this client."}, {NULL} /* Sentinel */ }; diff --git a/src/local_scheduler/local_scheduler_shared.h b/src/local_scheduler/local_scheduler_shared.h index 0f7cd2f4e..4913af3fc 100644 --- a/src/local_scheduler/local_scheduler_shared.h +++ b/src/local_scheduler/local_scheduler_shared.h @@ -74,6 +74,10 @@ struct LocalSchedulerState { /** Vector of dynamic attributes associated with the node owned by this local * scheduler. */ double dynamic_resources[ResourceIndex_MAX]; + /** The IDs of the available GPUs. There is redundancy here in that + * available_gpus.size() == dynamic_resources[ResourceIndex_GPU] should + * always be true. */ + std::vector available_gpus; }; /** Contains all information associated with a local scheduler client. */ @@ -95,13 +99,13 @@ struct LocalSchedulerClient { * nonzero when the worker is actively executing a task. If the worker is * blocked, then this value will be zero. */ double cpus_in_use; - /** The number of GPUs that the worker is currently using. If the worker is an - * actor, this will be constant throughout the lifetime of the actor (and - * will be equal to the number of GPUs requested by the actor). If the worker - * is not an actor, this will be constant for the duration of a task and will - * have length equal to the number of GPUs requested by the task (in - * particular it will not change if the task blocks). */ - double gpus_in_use; + /** A vector of the IDs of the GPUs that the worker is currently using. If the + * worker is an actor, this will be constant throughout the lifetime of the + * actor (and will be equal to the number of GPUs requested by the actor). If + * the worker is not an actor, this will be constant for the duration of a + * task and will have length equal to the number of GPUs requested by the + * task (in particular it will not change if the task blocks). */ + std::vector gpus_in_use; /** A flag to indicate whether this worker is currently blocking on an * object(s) that isn't available locally yet. */ bool is_blocked; diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 042500f63..31ba4c0bc 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -123,7 +123,7 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, for (int i = 0; i < num_mock_workers; ++i) { mock->conns[i] = LocalSchedulerConnection_init( utstring_body(local_scheduler_socket_name), NIL_WORKER_ID, NIL_ACTOR_ID, - true); + true, 0); } background_thread.join(); diff --git a/test/actor_test.py b/test/actor_test.py index 6ddf37b78..ffe05a798 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -2,8 +2,10 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import numpy as np +import collections import random +import numpy as np +import time import unittest import ray @@ -621,6 +623,7 @@ class ActorsWithGPUs(unittest.TestCase): self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): + assert ray.get_gpu_ids() == self.gpu_ids return (ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids)) @@ -668,11 +671,13 @@ class ActorsWithGPUs(unittest.TestCase): for actor in actors]) node_names = set([location for location, gpu_id in locations_and_ids]) self.assertEqual(len(node_names), num_local_schedulers) - location_actor_combinations = [] + + # Keep track of which GPU IDs are being used for each location. + gpus_in_use = {node_name: [] for node_name in node_names} + for location, gpu_ids in locations_and_ids: + gpus_in_use[location].extend(gpu_ids) for node_name in node_names: - location_actor_combinations.append((node_name, (0, 1))) - location_actor_combinations.append((node_name, (2, 3))) - self.assertEqual(set(locations_and_ids), set(location_actor_combinations)) + self.assertEqual(len(set(gpus_in_use[node_name])), 4) # Creating a new actor should fail because all of the GPUs are being used. with self.assertRaises(Exception): @@ -693,12 +698,13 @@ class ActorsWithGPUs(unittest.TestCase): # Make sure that no two actors are assigned to the same GPU. locations_and_ids = ray.get([actor.get_location_and_ids() for actor in actors]) - node_names = set([location for location, gpu_id in locations_and_ids]) - self.assertEqual(len(node_names), num_local_schedulers) - location_actor_combinations = [] + self.assertEqual(node_names, + set([location for location, gpu_id in locations_and_ids])) + for location, gpu_ids in locations_and_ids: + gpus_in_use[location].extend(gpu_ids) for node_name in node_names: - location_actor_combinations.append((node_name, (4,))) - self.assertEqual(set(locations_and_ids), set(location_actor_combinations)) + self.assertEqual(len(gpus_in_use[node_name]), 5) + self.assertEqual(set(gpus_in_use[node_name]), set(range(5))) # Creating a new actor should fail because all of the GPUs are being used. with self.assertRaises(Exception): @@ -781,6 +787,172 @@ class ActorsWithGPUs(unittest.TestCase): ray.worker.cleanup() + def testActorsAndTasksWithGPUs(self): + num_local_schedulers = 3 + num_gpus_per_scheduler = 6 + ray.worker._init( + start_ray_local=True, num_workers=0, + num_local_schedulers=num_local_schedulers, + num_cpus=num_gpus_per_scheduler, + num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) + + def check_intervals_non_overlapping(list_of_intervals): + for i in range(len(list_of_intervals)): + for j in range(i): + first_interval = list_of_intervals[i] + second_interval = list_of_intervals[j] + # Check that list_of_intervals[i] and list_of_intervals[j] don't + # overlap. + assert first_interval[0] < first_interval[1] + assert second_interval[0] < second_interval[1] + assert (first_interval[1] < second_interval[0] or + second_interval[1] < first_interval[0]) + + @ray.remote(num_gpus=1) + def f1(): + t1 = time.time() + time.sleep(0.1) + t2 = time.time() + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + assert gpu_ids[0] in range(num_gpus_per_scheduler) + return (ray.worker.global_worker.plasma_client.store_socket_name, + tuple(gpu_ids), [t1, t2]) + + @ray.remote(num_gpus=2) + def f2(): + t1 = time.time() + time.sleep(0.1) + t2 = time.time() + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 2 + assert gpu_ids[0] in range(num_gpus_per_scheduler) + assert gpu_ids[1] in range(num_gpus_per_scheduler) + return (ray.worker.global_worker.plasma_client.store_socket_name, + tuple(gpu_ids), [t1, t2]) + + @ray.actor(num_gpus=1) + class Actor1(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + assert len(self.gpu_ids) == 1 + assert self.gpu_ids[0] in range(num_gpus_per_scheduler) + + def get_location_and_ids(self): + assert ray.get_gpu_ids() == self.gpu_ids + return (ray.worker.global_worker.plasma_client.store_socket_name, + tuple(self.gpu_ids)) + + def locations_to_intervals_for_many_tasks(): + # Launch a bunch of GPU tasks. + locations_ids_and_intervals = ray.get( + [f1.remote() for _ + in range(5 * num_local_schedulers * num_gpus_per_scheduler)] + + [f2.remote() for _ + in range(5 * num_local_schedulers * num_gpus_per_scheduler)] + + [f1.remote() for _ + in range(5 * num_local_schedulers * num_gpus_per_scheduler)]) + + locations_to_intervals = collections.defaultdict(lambda: []) + for location, gpu_ids, interval in locations_ids_and_intervals: + for gpu_id in gpu_ids: + locations_to_intervals[(location, gpu_id)].append(interval) + return locations_to_intervals + + # Run a bunch of GPU tasks. + locations_to_intervals = locations_to_intervals_for_many_tasks() + # Make sure that all GPUs were used. + self.assertEqual(len(locations_to_intervals), + num_local_schedulers * num_gpus_per_scheduler) + # For each GPU, verify that the set of tasks that used this specific GPU + # did not overlap in time. + for locations in locations_to_intervals: + check_intervals_non_overlapping(locations_to_intervals[locations]) + + # Create an actor that uses a GPU. + a = Actor1() + actor_location = ray.get(a.get_location_and_ids()) + actor_location = (actor_location[0], actor_location[1][0]) + # This check makes sure that actor_location is formatted the same way that + # the keys of locations_to_intervals are formatted. + self.assertIn(actor_location, locations_to_intervals) + + # Run a bunch of GPU tasks. + locations_to_intervals = locations_to_intervals_for_many_tasks() + # Make sure that all but one of the GPUs were used. + self.assertEqual(len(locations_to_intervals), + num_local_schedulers * num_gpus_per_scheduler - 1) + # For each GPU, verify that the set of tasks that used this specific GPU + # did not overlap in time. + for locations in locations_to_intervals: + check_intervals_non_overlapping(locations_to_intervals[locations]) + # Make sure that the actor's GPU was not used. + self.assertNotIn(actor_location, locations_to_intervals) + + # Create several more actors that use GPUs. + actors = [Actor1() for _ in range(3)] + actor_locations = ray.get([actor.get_location_and_ids() + for actor in actors]) + + # Run a bunch of GPU tasks. + locations_to_intervals = locations_to_intervals_for_many_tasks() + # Make sure that all but 11 of the GPUs were used. + self.assertEqual(len(locations_to_intervals), + num_local_schedulers * num_gpus_per_scheduler - 1 - 3) + # For each GPU, verify that the set of tasks that used this specific GPU + # did not overlap in time. + for locations in locations_to_intervals: + check_intervals_non_overlapping(locations_to_intervals[locations]) + # Make sure that the GPUs were not used. + self.assertNotIn(actor_location, locations_to_intervals) + for location in actor_locations: + self.assertNotIn(location, locations_to_intervals) + + # Create more actors to fill up all the GPUs. + more_actors = [Actor1() for _ in + range(num_local_schedulers * + num_gpus_per_scheduler - 1 - 3)] + # Wait for the actors to finish being created. + ray.get([actor.get_location_and_ids() for actor in more_actors]) + + # Now if we run some GPU tasks, they should not be scheduled. + results = [f1.remote() for _ in range(30)] + ready_ids, remaining_ids = ray.wait(results, timeout=1000) + self.assertEqual(len(ready_ids), 0) + + ray.worker.cleanup() + + def testActorsAndTasksWithGPUsVersionTwo(self): + # Create tasks and actors that both use GPUs and make sure that they are + # given different GPUs + ray.init(num_cpus=10, num_gpus=10) + + @ray.remote(num_gpus=1) + def f(): + time.sleep(4) + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + return gpu_ids[0] + + @ray.actor(num_gpus=1) + class Actor(object): + def __init__(self): + self.gpu_ids = ray.get_gpu_ids() + assert len(self.gpu_ids) == 1 + + def get_gpu_id(self): + assert ray.get_gpu_ids() == self.gpu_ids + return self.gpu_ids[0] + + results = [] + for _ in range(5): + results.append(f.remote()) + a = Actor() + results.append(a.get_gpu_id()) + + gpu_ids = ray.get(results) + self.assertEqual(set(gpu_ids), set(range(10))) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/runtest.py b/test/runtest.py index f9f7c7990..fd760c57a 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1187,6 +1187,91 @@ class ResourcesTest(unittest.TestCase): ray.worker.cleanup() + def testGPUIDs(self): + num_gpus = 10 + ray.init(num_cpus=10, num_gpus=num_gpus) + + @ray.remote(num_gpus=0) + def f0(): + time.sleep(0.1) + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 0 + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + @ray.remote(num_gpus=1) + def f1(): + time.sleep(0.1) + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + @ray.remote(num_gpus=2) + def f2(): + time.sleep(0.1) + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 2 + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + @ray.remote(num_gpus=3) + def f3(): + time.sleep(0.1) + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 3 + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + @ray.remote(num_gpus=4) + def f4(): + time.sleep(0.1) + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 4 + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + @ray.remote(num_gpus=5) + def f5(): + time.sleep(0.1) + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 5 + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + list_of_ids = ray.get([f0.remote() for _ in range(10)]) + self.assertEqual(list_of_ids, 10 * [[]]) + + list_of_ids = ray.get([f1.remote() for _ in range(10)]) + set_of_ids = set([tuple(gpu_ids) for gpu_ids in list_of_ids]) + self.assertEqual(set_of_ids, set([(i,) for i in range(10)])) + + list_of_ids = ray.get([f2.remote(), f4.remote(), f4.remote()]) + all_ids = [gpu_id for gpu_ids in list_of_ids for gpu_id in gpu_ids] + self.assertEqual(set(all_ids), set(range(10))) + + remaining = [f5.remote() for _ in range(20)] + for _ in range(10): + t1 = time.time() + ready, remaining = ray.wait(remaining, num_returns=2) + t2 = time.time() + # There are only 10 GPUs, and each task uses 2 GPUs, so there should only + # be 2 tasks scheduled at a given time, so if we wait for 2 tasks to + # finish, then it should take at least 0.1 seconds for each pair of tasks + # to finish. + self.assertGreater(t2 - t1, 0.09) + list_of_ids = ray.get(ready) + all_ids = [gpu_id for gpu_ids in list_of_ids for gpu_id in gpu_ids] + self.assertEqual(set(all_ids), set(range(10))) + + ray.worker.cleanup() + def testMultipleLocalSchedulers(self): # This test will define a bunch of tasks that can only be assigned to # specific local schedulers, and we will check that they are assigned to