From 61139e15099106047f5d6553c44aaa87381ef456 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 10 Jun 2018 15:31:43 -0700 Subject: [PATCH] Enable fractional resources and resource IDs for xray. (#2187) * Implement GPU IDs and fractional resources. * Add documentation and python exceptions. * Fix signed/unsigned comparison. * Fix linting. * Fixes from rebase. * Re-enable tests that use ray.wait. * Don't kill the raylet if an infeasible task is submitted. * Ignore tests that require better load balancing. * Linting * Ignore array test. * Ignore stress test reconstructions tests. * Don't kill node manager if remote node manager disconnects. * Ignore more stress tests. * Naming changes * Remove outdated todo * Small fix * Re-enable test. * Linting * Fix resource bookkeeping for blocked tasks. * Fix linting * Fix Java client. * Ignore test * Ignore put error tests --- python/ray/__init__.py | 8 +- python/ray/global_scheduler/test/test.py | 2 +- python/ray/local_scheduler/test/test.py | 2 +- python/ray/ray_constants.py | 13 +- python/ray/services.py | 17 +- python/ray/worker.py | 41 ++- ...ay_spi_impl_DefaultLocalSchedulerClient.cc | 5 +- .../lib/python/local_scheduler_extension.cc | 56 +++- src/local_scheduler/local_scheduler_client.cc | 64 +++- src/local_scheduler/local_scheduler_client.h | 25 +- .../test/local_scheduler_tests.cc | 2 +- src/ray/raylet/format/node_manager.fbs | 16 +- src/ray/raylet/node_manager.cc | 101 ++++-- src/ray/raylet/node_manager.h | 2 + src/ray/raylet/scheduling_policy.cc | 25 +- src/ray/raylet/scheduling_policy.h | 4 +- src/ray/raylet/scheduling_resources.cc | 288 ++++++++++++++++++ src/ray/raylet/scheduling_resources.h | 194 +++++++++++- src/ray/raylet/worker.cc | 33 ++ src/ray/raylet/worker.h | 17 ++ test/actor_test.py | 33 +- test/array_test.py | 8 +- test/failure_test.py | 3 + test/recursion_test.py | 2 +- test/runtest.py | 74 +++-- test/stress_tests.py | 15 + 26 files changed, 945 insertions(+), 105 deletions(-) diff --git a/python/ray/__init__.py b/python/ray/__init__.py index c45b81d5e..b3a538e2a 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -49,7 +49,7 @@ except ImportError as e: from ray.local_scheduler import ObjectID, _config # noqa: E402 from ray.worker import (error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log, get_gpu_ids, - get_webui_url, + get_resource_ids, get_webui_url, register_custom_serializer) # noqa: E402 from ray.worker import (SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE) # noqa: E402 @@ -66,9 +66,9 @@ __version__ = "0.4.0" __all__ = [ "error_info", "init", "connect", "disconnect", "get", "put", "wait", "remote", "log_event", "log_span", "flush_log", "actor", "method", - "get_gpu_ids", "get_webui_url", "register_custom_serializer", - "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE", "global_state", - "ObjectID", "_config", "__version__" + "get_gpu_ids", "get_resource_ids", "get_webui_url", + "register_custom_serializer", "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", + "SILENT_MODE", "global_state", "ObjectID", "_config", "__version__" ] import ctypes # noqa: E402 diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index dc1aa5e2b..64d8e4047 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -101,7 +101,7 @@ class TestGlobalScheduler(unittest.TestCase): static_resources={"CPU": 10}) # Connect to the scheduler. local_scheduler_client = local_scheduler.LocalSchedulerClient( - local_scheduler_name, NIL_WORKER_ID, False) + local_scheduler_name, NIL_WORKER_ID, False, False) self.local_scheduler_clients.append(local_scheduler_client) self.local_scheduler_pids.append(p4) diff --git a/python/ray/local_scheduler/test/test.py b/python/ray/local_scheduler/test/test.py index b990676c8..46479e3af 100644 --- a/python/ray/local_scheduler/test/test.py +++ b/python/ray/local_scheduler/test/test.py @@ -46,7 +46,7 @@ class TestLocalSchedulerClient(unittest.TestCase): plasma_store_name, use_valgrind=USE_VALGRIND) # Connect to the scheduler. self.local_scheduler_client = local_scheduler.LocalSchedulerClient( - scheduler_name, NIL_WORKER_ID, False) + scheduler_name, NIL_WORKER_ID, False, False) def tearDown(self): # Check that the processes are still alive. diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index fd5059986..9925f08a7 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -12,6 +12,15 @@ def env_integer(key, default): return default +# If a remote function or actor (or some other export) has serialized size +# greater than this quantity, print an warning. +PICKLE_OBJECT_WARNING_SIZE = 10**7 + +# The maximum resource quantity that is allowed. TODO(rkn): This could be +# relaxed, but the current implementation of the node manager will be slower +# for large resource quantities due to bookkeeping of specific resource IDs. +MAX_RESOURCE_QUANTITY = 512 + # Different types of Ray errors that can be pushed to the driver. # TODO(rkn): These should be defined in flatbuffers and must be synced with # the existing C++ definitions. @@ -29,10 +38,6 @@ WORKER_DIED_PUSH_ERROR = "worker_died" PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction" HASH_MISMATCH_PUSH_ERROR = "object_hash_mismatch" -# If a remote function or actor (or some other export) has serialized size -# greater than this quantity, print an warning. -PICKLE_OBJECT_WARNING_SIZE = 10**7 - # Abort autoscaling if more than this number of errors are encountered. This # is a safety feature to prevent e.g. runaway node launches. AUTOSCALER_MAX_NUM_FAILURES = env_integer("AUTOSCALER_MAX_NUM_FAILURES", 5) diff --git a/python/ray/services.py b/python/ray/services.py index 3245ee5cf..559b89142 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -22,6 +22,7 @@ import redis import pyarrow # Ray modules +import ray.ray_constants import ray.global_scheduler as global_scheduler import ray.local_scheduler import ray.plasma @@ -798,11 +799,13 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True): return webui_url -def check_and_update_resources(resources): +def check_and_update_resources(resources, use_raylet): """Sanity check a resource dictionary and add sensible defaults. Args: resources: A dictionary mapping resource names to resource quantities. + use_raylet: True if we are using the raylet code path and false + otherwise. Returns: A new resource dictionary. @@ -837,6 +840,14 @@ def check_and_update_resources(resources): for _, resource_quantity in resources.items(): assert (isinstance(resource_quantity, int) or isinstance(resource_quantity, float)) + if (isinstance(resource_quantity, float) + and not resource_quantity.is_integer()): + raise ValueError("Resource quantities must all be whole numbers.") + + if (use_raylet and + resource_quantity > ray.ray_constants.MAX_RESOURCE_QUANTITY): + raise ValueError("Resource quantities must be at most {}." + .format(ray.ray_constants.MAX_RESOURCE_QUANTITY)) return resources @@ -879,7 +890,7 @@ def start_local_scheduler(redis_address, Return: The name of the local scheduler socket. """ - resources = check_and_update_resources(resources) + resources = check_and_update_resources(resources, False) print("Starting local scheduler with the following resources: {}." .format(resources)) @@ -932,7 +943,7 @@ def start_raylet(redis_address, Returns: The raylet socket name. """ - static_resources = check_and_update_resources(resources) + static_resources = check_and_update_resources(resources, True) # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'. resource_argument = ",".join([ diff --git a/python/ray/worker.py b/python/ray/worker.py index 76db62ca6..e2077aa24 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -592,6 +592,15 @@ class Worker(object): if resources is None: raise ValueError("The resources dictionary is required.") + for value in resources.values(): + assert (isinstance(value, int) or isinstance(value, float)) + if value < 0: + raise ValueError( + "Resource quantities must be nonnegative.") + if (value >= 1 and isinstance(value, float) + and not value.is_integer()): + raise ValueError( + "Resource quantities must all be whole numbers.") # Submit the task to local scheduler. task = ray.local_scheduler.Task( @@ -1063,7 +1072,13 @@ def get_gpu_ids(): raise Exception("ray.get_gpu_ids() currently does not work in PYTHON " "MODE.") - assigned_ids = global_worker.local_scheduler_client.gpu_ids() + if not global_worker.use_raylet: + assigned_ids = global_worker.local_scheduler_client.gpu_ids() + else: + all_resource_ids = global_worker.local_scheduler_client.resource_ids() + assigned_ids = [ + resource_id for resource_id, _ in all_resource_ids.get("GPU", []) + ] # If the user had already set CUDA_VISIBLE_DEVICES, then respect that (in # the sense that only GPU IDs that appear in CUDA_VISIBLE_DEVICES should be # returned). @@ -1075,6 +1090,26 @@ def get_gpu_ids(): return assigned_ids +def get_resource_ids(): + """Get the IDs of the resources that are available to the worker. + + Returns: + A dictionary mapping the name of a resource to a list of pairs, where + each pair consists of the ID of a resource and the fraction of that + resource reserved for this worker. + """ + if not global_worker.use_raylet: + raise Exception("ray.get_resource_ids() is only supported in the " + "raylet code path.") + + if _mode() == PYTHON_MODE: + raise Exception( + "ray.get_resource_ids() currently does not work in PYTHON " + "MODE.") + + return global_worker.local_scheduler_client.resource_ids() + + def _webui_url_helper(client): """Parsing for getting the url of the web UI. @@ -1424,7 +1459,7 @@ def _init(address_info=None, plasma_directory=None, huge_pages=False, include_webui=True, - use_raylet=False): + use_raylet=None): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -2149,7 +2184,7 @@ def connect(info, local_scheduler_socket = info["raylet_socket_name"] worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient( - local_scheduler_socket, worker.worker_id, is_worker) + local_scheduler_socket, worker.worker_id, is_worker, worker.use_raylet) # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. diff --git a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc index dfd47adbc..0cc711db7 100644 --- a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc +++ b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc @@ -47,8 +47,9 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient__1init(JNIEnv *env, // byte[] workerId, byte[] actorId, boolean isWorker, long numGpus); UniqueIdFromJByteArray worker_id(env, wid); const char *nativeString = env->GetStringUTFChars(sockName, JNI_FALSE); - auto client = - LocalSchedulerConnection_init(nativeString, *worker_id.PID, isWorker); + bool use_raylet = false; + auto client = LocalSchedulerConnection_init(nativeString, *worker_id.PID, + isWorker, use_raylet); env->ReleaseStringUTFChars(sockName, nativeString); return reinterpret_cast(client); } diff --git a/src/local_scheduler/lib/python/local_scheduler_extension.cc b/src/local_scheduler/lib/python/local_scheduler_extension.cc index 3645c3111..ce68c6bfe 100644 --- a/src/local_scheduler/lib/python/local_scheduler_extension.cc +++ b/src/local_scheduler/lib/python/local_scheduler_extension.cc @@ -20,14 +20,16 @@ static int PyLocalSchedulerClient_init(PyLocalSchedulerClient *self, char *socket_name; UniqueID client_id; PyObject *is_worker; - if (!PyArg_ParseTuple(args, "sO&O", &socket_name, PyStringToUniqueID, - &client_id, &is_worker)) { + PyObject *use_raylet; + if (!PyArg_ParseTuple(args, "sO&OO", &socket_name, PyStringToUniqueID, + &client_id, &is_worker, &use_raylet)) { self->local_scheduler_connection = NULL; return -1; } /* Connect to the local scheduler. */ self->local_scheduler_connection = LocalSchedulerConnection_init( - socket_name, client_id, (bool) PyObject_IsTrue(is_worker)); + socket_name, client_id, static_cast(PyObject_IsTrue(is_worker)), + static_cast(PyObject_IsTrue(use_raylet))); return 0; } @@ -73,9 +75,15 @@ static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) { /* Drop the global interpreter lock while we get a task because * local_scheduler_get_task may block for a long time. */ Py_BEGIN_ALLOW_THREADS - task_spec = local_scheduler_get_task( - ((PyLocalSchedulerClient *) self)->local_scheduler_connection, - &task_size); + if (!reinterpret_cast(self)->local_scheduler_connection->use_raylet) { + task_spec = local_scheduler_get_task( + reinterpret_cast(self)->local_scheduler_connection, + &task_size); + } else { + task_spec = local_scheduler_get_task_raylet( + reinterpret_cast(self)->local_scheduler_connection, + &task_size); + } Py_END_ALLOW_THREADS return PyTask_make(task_spec, task_size); } @@ -164,6 +172,39 @@ static PyObject *PyLocalSchedulerClient_gpu_ids(PyObject *self) { return gpu_ids_list; } +// NOTE(rkn): This function only makes sense for the raylet code path. +static PyObject *PyLocalSchedulerClient_resource_ids(PyObject *self) { + // Construct a Python dictionary of resource IDs and resource fractions. + PyObject *resource_ids = PyDict_New(); + + for (auto const &resource_info : + reinterpret_cast(self) + ->local_scheduler_connection->resource_ids_) { + auto const &resource_name = resource_info.first; + auto const &ids_and_fractions = resource_info.second; + +#if PY_MAJOR_VERSION >= 3 + PyObject *key = + PyUnicode_FromStringAndSize(resource_name.data(), resource_name.size()); +#else + PyObject *key = + PyBytes_FromStringAndSize(resource_name.data(), resource_name.size()); +#endif + PyObject *value = PyList_New(ids_and_fractions.size()); + for (size_t i = 0; i < ids_and_fractions.size(); ++i) { + auto const &id_and_fraction = ids_and_fractions[i]; + PyObject *id_fraction_pair = + Py_BuildValue("(Ld)", id_and_fraction.first, id_and_fraction.second); + PyList_SetItem(value, i, id_fraction_pair); + } + PyDict_SetItem(resource_ids, key, value); + Py_DECREF(key); + Py_DECREF(value); + } + + return resource_ids; +} + static PyObject *PyLocalSchedulerClient_get_actor_frontier(PyObject *self, PyObject *args) { ActorID actor_id; @@ -263,6 +304,9 @@ static PyMethodDef PyLocalSchedulerClient_methods[] = { METH_VARARGS, "Return the object ID for a put call within a task."}, {"gpu_ids", (PyCFunction) PyLocalSchedulerClient_gpu_ids, METH_NOARGS, "Get the IDs of the GPUs that are reserved for this client."}, + {"resource_ids", (PyCFunction) PyLocalSchedulerClient_resource_ids, + METH_NOARGS, + "Get the IDs of the resources that are reserved for this client."}, {"get_actor_frontier", (PyCFunction) PyLocalSchedulerClient_get_actor_frontier, METH_VARARGS, ""}, {"set_actor_frontier", diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 8d838e1e8..68642d813 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -15,8 +15,10 @@ using MessageType = ray::local_scheduler::protocol::MessageType; LocalSchedulerConnection *LocalSchedulerConnection_init( const char *local_scheduler_socket, UniqueID client_id, - bool is_worker) { + bool is_worker, + bool use_raylet) { LocalSchedulerConnection *result = new LocalSchedulerConnection(); + result->use_raylet = use_raylet; result->conn = connect_ipc_sock_retry(local_scheduler_socket, -1, -1); /* Register with the local scheduler. @@ -133,6 +135,66 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, return spec; } +// This is temporarily duplicated from local_scheduler_get_task while we have +// the raylet and non-raylet code paths. +TaskSpec *local_scheduler_get_task_raylet(LocalSchedulerConnection *conn, + int64_t *task_size) { + write_message(conn->conn, static_cast(MessageType::GetTask), 0, + NULL); + int64_t type; + int64_t reply_size; + uint8_t *reply; + // Receive a task from the local scheduler. This will block until the local + // scheduler gives this client a task. + read_message(conn->conn, &type, &reply_size, &reply); + if (type == static_cast(CommonMessageType::DISCONNECT_CLIENT)) { + RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection."; + exit(1); + } + RAY_CHECK(type == static_cast(MessageType::ExecuteTask)); + + // Parse the flatbuffer object. + auto reply_message = flatbuffers::GetRoot(reply); + + // Create a copy of the task spec so we can free the reply. + *task_size = reply_message->task_spec()->size(); + const TaskSpec *data = + reinterpret_cast(reply_message->task_spec()->data()); + TaskSpec *spec = TaskSpec_copy(const_cast(data), *task_size); + + // Set the resource IDs for this task. + conn->resource_ids_.clear(); + for (size_t i = 0; i < reply_message->fractional_resource_ids()->size(); + ++i) { + auto const &fractional_resource_ids = + reply_message->fractional_resource_ids()->Get(i); + auto &acquired_resources = conn->resource_ids_[string_from_flatbuf( + *fractional_resource_ids->resource_name())]; + + size_t num_resource_ids = fractional_resource_ids->resource_ids()->size(); + size_t num_resource_fractions = + fractional_resource_ids->resource_fractions()->size(); + RAY_CHECK(num_resource_ids == num_resource_fractions); + RAY_CHECK(num_resource_ids > 0); + for (size_t j = 0; j < num_resource_ids; ++j) { + int64_t resource_id = fractional_resource_ids->resource_ids()->Get(j); + double resource_fraction = + fractional_resource_ids->resource_fractions()->Get(j); + if (num_resource_ids > 1) { + int64_t whole_fraction = resource_fraction; + RAY_CHECK(whole_fraction == resource_fraction); + } + acquired_resources.push_back( + std::make_pair(resource_id, resource_fraction)); + } + } + + // Free the original message from the local scheduler. + free(reply); + // Return the copy of the task spec and pass ownership to the caller. + return spec; +} + void local_scheduler_task_done(LocalSchedulerConnection *conn) { write_message(conn->conn, static_cast(MessageType::TaskDone), 0, NULL); diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 5608c5aa8..1a8cbd240 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -6,11 +6,19 @@ #include "ray/raylet/task_spec.h" struct LocalSchedulerConnection { + /// True if we should use the raylet code path and false otherwise. + bool use_raylet; /** File descriptor of the Unix domain socket that connects to local * scheduler. */ int conn; - /** The IDs of the GPUs that this client can use. */ + /** The IDs of the GPUs that this client can use. NOTE(rkn): This is only used + * by legacy Ray and will be deprecated. */ std::vector gpu_ids; + /// A map from resource name to the resource IDs that are currently reserved + /// for this worker. Each pair consists of the resource ID and the fraction + /// of that resource allocated for this worker. + std::unordered_map>> + resource_ids_; }; /** @@ -20,12 +28,15 @@ struct LocalSchedulerConnection { * local scheduler. * @param is_worker Whether this client is a worker. If it is a worker, an * additional message will be sent to register as one. + * @param use_raylet True if we should use the raylet code path and false + * otherwise. * @return The connection information. */ LocalSchedulerConnection *LocalSchedulerConnection_init( const char *local_scheduler_socket, UniqueID worker_id, - bool is_worker); + bool is_worker, + bool use_raylet); /** * Disconnect from the local scheduler. @@ -102,6 +113,16 @@ void local_scheduler_log_event(LocalSchedulerConnection *conn, TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, int64_t *task_size); +/// Get next task for this client. This will block until the scheduler assigns +/// a task to this worker. This allocates and returns a task, and so the task +/// must be freed by the caller. +/// +/// \param conn The connection information. +/// \param task_size A pointer to fill out with the task size. +/// \return The address of the assigned task. +TaskSpec *local_scheduler_get_task_raylet(LocalSchedulerConnection *conn, + int64_t *task_size); + /** * Tell the local scheduler that the client has finished executing a task. * diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index dfcf0053f..f75bcb750 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -125,7 +125,7 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, for (int i = 0; i < num_mock_workers; ++i) { mock->conns[i] = LocalSchedulerConnection_init( - local_scheduler_socket_name.c_str(), WorkerID::nil(), true); + local_scheduler_socket_name.c_str(), WorkerID::nil(), true, false); } background_thread.join(); diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index ca25120eb..b56243cfc 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -81,12 +81,24 @@ table SubmitTaskRequest { task_spec: string; } +// This message describes a given resource that is reserved for a worker. +table ResourceIdSetInfo { + // The name of the resource. + resource_name: string; + // The resource IDs reserved for this worker. + resource_ids: [long]; + // The fraction of each resource ID that is reserved for this worker. Note + // that the length of this list must be the same as the length of + // resource_ids. + resource_fractions: [double]; +} + // This message is sent from the local scheduler to a worker. table GetTaskReply { // A string of bytes representing the task specification. task_spec: string; - // The IDs of the GPUs that the worker is allowed to use for this task. - gpu_ids: [int]; + // A list of the resources reserved for this worker. + fractional_resource_ids: [ResourceIdSetInfo]; } // This struct is used to register a new worker with the local scheduler. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c05742dfb..bace170bd 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -82,6 +82,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, heartbeat_timer_(io_service), heartbeat_period_ms_(config.heartbeat_period_ms), local_resources_(config.resource_config), + local_available_resources_(config.resource_config), worker_pool_(config.num_initial_workers, static_cast(config.resource_config.GetNumCpus()), config.worker_command), @@ -321,14 +322,12 @@ void NodeManager::DispatchTasks() { if (scheduled_tasks.empty()) { return; } - const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); for (const auto &task : scheduled_tasks) { - const auto &local_resources = - cluster_resource_map_[my_client_id].GetAvailableResources(); const auto &task_resources = task.GetTaskSpecification().GetRequiredResources(); - if (!task_resources.IsSubset(local_resources)) { + if (!local_available_resources_.Contains(task_resources)) { // Not enough local resources for this task right now, skip this task. + // TODO(rkn): We should always skip node managers that have 0 CPUs. continue; } // We have enough resources for this task. Assign task. @@ -370,6 +369,8 @@ void NodeManager::ProcessClientMessage( case protocol::MessageType::DisconnectClient: { // Remove the dead worker from the pool and stop listening for messages. const std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); + + // This if statement distinguishes workers from drivers. if (worker) { // TODO(swang): Handle the case where the worker is killed while // executing a task. Clean up the assigned task's resources, return an @@ -377,6 +378,22 @@ void NodeManager::ProcessClientMessage( // RAY_CHECK(worker->GetAssignedTaskId().is_nil()) // << "Worker died while executing task: " << worker->GetAssignedTaskId(); worker_pool_.DisconnectWorker(worker); + + const ClientID &client_id = gcs_client_->client_table().GetLocalClientId(); + + // Return the resources that were being used by this worker. + auto const &task_resources = worker->GetTaskResourceIds(); + local_available_resources_.Release(task_resources); + cluster_resource_map_[client_id].Release(task_resources.ToResourceSet()); + worker->ResetTaskResourceIds(); + + auto const &lifetime_resources = worker->GetLifetimeResourceIds(); + local_available_resources_.Release(lifetime_resources); + cluster_resource_map_[client_id].Release(lifetime_resources.ToResourceSet()); + worker->ResetLifetimeResourceIds(); + + // Since some resources may have been released, we can try to dispatch more tasks. + DispatchTasks(); } return; } break; @@ -420,9 +437,13 @@ void NodeManager::ProcessClientMessage( double required_cpus = required_resources.GetNumCpus(); const std::unordered_map cpu_resources = { {kCPU_ResourceLabel, required_cpus}}; + // Release the CPU resources. + auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); + local_available_resources_.Release(cpu_resource_ids); RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()] .Release(ResourceSet(cpu_resources))); + // Mark the task as blocked. local_queues_.QueueBlockedTasks(tasks); worker->MarkBlocked(); @@ -447,18 +468,32 @@ void NodeManager::ProcessClientMessage( // Get the CPU resources required by the running task. const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); double required_cpus = required_resources.GetNumCpus(); - const std::unordered_map cpu_resources = { - {kCPU_ResourceLabel, required_cpus}}; - // Acquire the CPU resources. - bool oversubscribed = - !cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( - ResourceSet(cpu_resources)); - if (oversubscribed) { - const SchedulingResources &local_resources = - cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()]; - RAY_LOG(WARNING) << "Resources oversubscribed: " - << local_resources.GetAvailableResources().ToString(); + const ResourceSet cpu_resources( + std::unordered_map({{kCPU_ResourceLabel, required_cpus}})); + + // Check if we can reacquire the CPU resources. + bool oversubscribed = !local_available_resources_.Contains(cpu_resources); + + if (!oversubscribed) { + // Reacquire the CPU resources for the worker. Note that care needs to be + // taken if the user is using the specific CPU IDs since the IDs that we + // reacquire here may be different from the ones that the task started with. + auto const resource_ids = local_available_resources_.Acquire(cpu_resources); + worker->AcquireTaskCpuResources(resource_ids); + RAY_CHECK( + cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( + cpu_resources)); + } else { + // In this case, we simply don't reacquire the CPU resources for the worker. + // The worker can keep running and when the task finishes, it will simply + // not have any CPU resources to release. + RAY_LOG(WARNING) + << "Resources oversubscribed: " + << cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()] + .GetAvailableResources() + .ToString(); } + // Mark the task as running again. local_queues_.QueueRunningTasks(tasks); worker->MarkUnblocked(); @@ -513,6 +548,11 @@ void NodeManager::ProcessNodeManagerMessage(TcpClientConnection &node_manager_cl << " spillback=" << task.GetTaskExecutionSpecReadonly().NumForwards(); SubmitTask(task, uncommitted_lineage); } break; + case protocol::MessageType::DisconnectClient: { + // TODO(rkn): We need to do some cleanup here. + RAY_LOG(INFO) << "Received disconnect message from remote node manager. " + << "We need to do some cleanup here."; + } break; default: RAY_LOG(FATAL) << "Received unexpected message type " << message_type; } @@ -670,18 +710,32 @@ void NodeManager::AssignTask(Task &task) { RAY_LOG(DEBUG) << "Assigning task to worker with pid " << worker->Pid(); flatbuffers::FlatBufferBuilder fbb; + + const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); + + // Resource accounting: acquire resources for the assigned task. + auto acquired_resources = + local_available_resources_.Acquire(spec.GetRequiredResources()); + RAY_CHECK( + this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); + + if (spec.IsActorCreationTask()) { + worker->SetLifetimeResourceIds(acquired_resources); + } else { + worker->SetTaskResourceIds(acquired_resources); + } + + ResourceIdSet resource_id_set = + worker->GetTaskResourceIds().Plus(worker->GetLifetimeResourceIds()); + auto resource_id_set_flatbuf = resource_id_set.ToFlatbuf(fbb); + auto message = protocol::CreateGetTaskReply(fbb, spec.ToFlatbuffer(fbb), - fbb.CreateVector(std::vector())); + fbb.CreateVector(resource_id_set_flatbuf)); fbb.Finish(message); auto status = worker->Connection()->WriteMessage( static_cast(protocol::MessageType::ExecuteTask), fbb.GetSize(), fbb.GetBufferPointer()); if (status.ok()) { - // Resource accounting: acquire resources for the assigned task. - const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); - RAY_CHECK( - this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); - // We successfully assigned the task to the worker. worker->AssignTaskId(spec.TaskId()); // If the task was an actor task, then record this execution to guarantee @@ -715,7 +769,7 @@ void NodeManager::AssignTask(Task &task) { // We failed to send the task to the worker, so disconnect the worker. ProcessClientMessage(worker->Connection(), static_cast(protocol::MessageType::DisconnectClient), - NULL); + nullptr); // Queue this task for future assignment. The task will be assigned to a // worker once one becomes available. local_queues_.QueueScheduledTasks(std::vector({task})); @@ -751,6 +805,9 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // lifetime of the actor, so we do not release any resources here. } else { // Release task's resources. + local_available_resources_.Release(worker.GetTaskResourceIds()); + worker.ResetTaskResourceIds(); + RAY_CHECK(this->cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()] .Release(task.GetTaskSpecification().GetRequiredResources())); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fcd48277f..7610ae142 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -115,6 +115,8 @@ class NodeManager { uint64_t heartbeat_period_ms_; /// The resources local to this node. const SchedulingResources local_resources_; + /// The resources (and specific resource IDs) that are currently available. + ResourceIdSet local_available_resources_; // TODO(atumanov): Add resource information from other nodes. std::unordered_map cluster_resource_map_; /// A pool of workers. diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index 1808a5aa2..9f3de5424 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -53,16 +53,23 @@ std::unordered_map SchedulingPolicy::Schedule( client_keys.push_back(node_client_id); } } - RAY_CHECK(!client_keys.empty()); - // Choose index at random. - // Initialize a uniform integer distribution over the key space. - // TODO(atumanov): change uniform random to discrete, weighted by resource capacity. - std::uniform_int_distribution distribution(0, client_keys.size() - 1); - int client_key_index = distribution(gen_); - decision[task_id] = client_keys[client_key_index]; - RAY_LOG(DEBUG) << "[SchedulingPolicy] idx=" << client_key_index << " " << task_id - << " --> " << client_keys[client_key_index]; + if (!client_keys.empty()) { + // Choose index at random. + // Initialize a uniform integer distribution over the key space. + // TODO(atumanov): change uniform random to discrete, weighted by resource capacity. + std::uniform_int_distribution distribution(0, client_keys.size() - 1); + int client_key_index = distribution(gen_); + decision[task_id] = client_keys[client_key_index]; + RAY_LOG(DEBUG) << "[SchedulingPolicy] idx=" << client_key_index << " " << task_id + << " --> " << client_keys[client_key_index]; + } else { + // There are no nodes that can feasibily execute this task. TODO(rkn): Propagate a + // warning to the user. + RAY_LOG(WARNING) << "This task requires " + << t.GetTaskSpecification().GetRequiredResources().ToString() + << ", but no nodes have the necessary resources."; + } } return decision; } diff --git a/src/ray/raylet/scheduling_policy.h b/src/ray/raylet/scheduling_policy.h index 6785f189f..d89201c3b 100644 --- a/src/ray/raylet/scheduling_policy.h +++ b/src/ray/raylet/scheduling_policy.h @@ -18,8 +18,8 @@ class SchedulingPolicy { /// \brief SchedulingPolicy constructor. /// /// \param scheduling_queue: reference to a scheduler queues object for access to - /// tasks. - /// \return None. + /// tasks. + /// \return Void. SchedulingPolicy(const SchedulingQueue &scheduling_queue); /// Perform a scheduling operation, given a set of cluster resources and diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index 30a6f4a16..ec5e2ba3d 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -135,6 +135,294 @@ const std::unordered_map &ResourceSet::GetResourceMap() con return this->resource_capacity_; }; +/// ResourceIds class implementation + +ResourceIds::ResourceIds() {} + +ResourceIds::ResourceIds(double resource_quantity) { + RAY_CHECK(IsWhole(resource_quantity)); + int64_t whole_quantity = resource_quantity; + for (int64_t i = 0; i < whole_quantity; ++i) { + whole_ids_.push_back(i); + } +} + +ResourceIds::ResourceIds(const std::vector &whole_ids) : whole_ids_(whole_ids) {} + +ResourceIds::ResourceIds(const std::vector> &fractional_ids) + : fractional_ids_(fractional_ids) {} + +ResourceIds::ResourceIds(const std::vector &whole_ids, + const std::vector> &fractional_ids) + : whole_ids_(whole_ids), fractional_ids_(fractional_ids) {} + +bool ResourceIds::Contains(double resource_quantity) const { + RAY_CHECK(resource_quantity >= 0); + if (resource_quantity >= 1) { + RAY_CHECK(IsWhole(resource_quantity)); + return whole_ids_.size() >= resource_quantity; + } else { + if (whole_ids_.size() > 0) { + return true; + } else { + for (auto const &fractional_pair : fractional_ids_) { + if (fractional_pair.second >= resource_quantity) { + return true; + } + } + return false; + } + } +} + +ResourceIds ResourceIds::Acquire(double resource_quantity) { + RAY_CHECK(resource_quantity >= 0); + if (resource_quantity >= 1) { + // Handle the whole case. + RAY_CHECK(IsWhole(resource_quantity)); + int64_t whole_quantity = resource_quantity; + RAY_CHECK(static_cast(whole_ids_.size()) >= whole_quantity); + + std::vector ids_to_return; + for (int64_t i = 0; i < whole_quantity; ++i) { + ids_to_return.push_back(whole_ids_.back()); + whole_ids_.pop_back(); + } + + return ResourceIds(ids_to_return); + + } else { + // Handle the fractional case. + for (auto &fractional_pair : fractional_ids_) { + if (fractional_pair.second >= resource_quantity) { + auto return_pair = std::make_pair(fractional_pair.first, resource_quantity); + fractional_pair.second -= resource_quantity; + return ResourceIds({return_pair}); + } + } + + // If we get here then there weren't enough available fractional IDs, so we + // need to use a whole ID. + RAY_CHECK(whole_ids_.size() > 0); + int64_t whole_id = whole_ids_.back(); + whole_ids_.pop_back(); + + auto return_pair = std::make_pair(whole_id, resource_quantity); + fractional_ids_.push_back(std::make_pair(whole_id, 1 - resource_quantity)); + return ResourceIds({return_pair}); + } +} + +void ResourceIds::Release(const ResourceIds &resource_ids) { + auto const &whole_ids_to_return = resource_ids.WholeIds(); + + // Return the whole IDs. + whole_ids_.insert(whole_ids_.end(), whole_ids_to_return.begin(), + whole_ids_to_return.end()); + + // Return the fractional IDs. + auto const &fractional_ids_to_return = resource_ids.FractionalIds(); + for (auto const &fractional_pair_to_return : fractional_ids_to_return) { + int64_t resource_id = fractional_pair_to_return.first; + auto const &fractional_pair_it = + std::find_if(fractional_ids_.begin(), fractional_ids_.end(), + [resource_id](std::pair &fractional_pair) { + return fractional_pair.first == resource_id; + }); + if (fractional_pair_it == fractional_ids_.end()) { + fractional_ids_.push_back(fractional_pair_to_return); + } else { + fractional_pair_it->second += fractional_pair_to_return.second; + RAY_CHECK(fractional_pair_it->second <= 1); + // If this makes the ID whole, then return it to the list of whole IDs. + if (fractional_pair_it->second == 1) { + whole_ids_.push_back(resource_id); + fractional_ids_.erase(fractional_pair_it); + } + } + } +} + +ResourceIds ResourceIds::Plus(const ResourceIds &resource_ids) const { + ResourceIds resource_ids_to_return(whole_ids_, fractional_ids_); + resource_ids_to_return.Release(resource_ids); + return resource_ids_to_return; +} + +const std::vector &ResourceIds::WholeIds() const { return whole_ids_; } + +const std::vector> &ResourceIds::FractionalIds() const { + return fractional_ids_; +} + +double ResourceIds::TotalQuantity() const { + double total_quantity = whole_ids_.size(); + for (auto const &fractional_pair : fractional_ids_) { + total_quantity += fractional_pair.second; + } + return total_quantity; +} + +std::string ResourceIds::ToString() const { + std::string return_string = "Whole IDs: ["; + for (auto const &whole_id : whole_ids_) { + return_string += std::to_string(whole_id) + ", "; + } + return_string += "], Fractional IDs: "; + for (auto const &fractional_pair : fractional_ids_) { + return_string += "(" + std::to_string(fractional_pair.first) + ", " + + std::to_string(fractional_pair.second) + "), "; + } + return_string += "]"; + return return_string; +} + +bool ResourceIds::IsWhole(double resource_quantity) const { + int64_t whole_quantity = resource_quantity; + return whole_quantity == resource_quantity; +} + +/// ResourceIdSet class implementation + +ResourceIdSet::ResourceIdSet() {} + +ResourceIdSet::ResourceIdSet(const ResourceSet &resource_set) { + for (auto const &resource_pair : resource_set.GetResourceMap()) { + auto const &resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; + available_resources_[resource_name] = ResourceIds(resource_quantity); + } +} + +ResourceIdSet::ResourceIdSet( + const std::unordered_map &available_resources) + : available_resources_(available_resources) {} + +bool ResourceIdSet::Contains(const ResourceSet &resource_set) const { + for (auto const &resource_pair : resource_set.GetResourceMap()) { + auto const &resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; + if (resource_quantity == 0) { + continue; + } + + auto it = available_resources_.find(resource_name); + if (it == available_resources_.end()) { + return false; + } + + if (!it->second.Contains(resource_quantity)) { + return false; + } + } + return true; +} + +ResourceIdSet ResourceIdSet::Acquire(const ResourceSet &resource_set) { + std::unordered_map acquired_resources; + + for (auto const &resource_pair : resource_set.GetResourceMap()) { + auto const &resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; + + if (resource_quantity == 0) { + continue; + } + + auto it = available_resources_.find(resource_name); + RAY_CHECK(it != available_resources_.end()); + acquired_resources[resource_name] = it->second.Acquire(resource_quantity); + } + return ResourceIdSet(acquired_resources); +} + +void ResourceIdSet::Release(const ResourceIdSet &resource_id_set) { + for (auto const &resource_pair : resource_id_set.AvailableResources()) { + auto const &resource_name = resource_pair.first; + auto const &resource_ids = resource_pair.second; + + if (resource_ids.TotalQuantity() == 0) { + continue; + } + + auto it = available_resources_.find(resource_name); + if (it == available_resources_.end()) { + // This should not happen when Release is called on resources that were obtained + // through a corresponding call to Acquire. + available_resources_[resource_name] = resource_ids; + } else { + it->second.Release(resource_ids); + } + } +} + +void ResourceIdSet::Clear() { available_resources_.clear(); } + +ResourceIdSet ResourceIdSet::Plus(const ResourceIdSet &resource_id_set) const { + ResourceIdSet resource_id_set_to_return(available_resources_); + resource_id_set_to_return.Release(resource_id_set); + return resource_id_set_to_return; +} + +const std::unordered_map &ResourceIdSet::AvailableResources() + const { + return available_resources_; +} + +ResourceIdSet ResourceIdSet::GetCpuResources() const { + std::unordered_map cpu_resources; + + auto it = available_resources_.find(kCPU_ResourceLabel); + if (it != available_resources_.end()) { + cpu_resources.insert(*it); + } + return ResourceIdSet(cpu_resources); +} + +ResourceSet ResourceIdSet::ToResourceSet() const { + std::unordered_map resource_set; + for (auto const &resource_pair : available_resources_) { + resource_set[resource_pair.first] = resource_pair.second.TotalQuantity(); + } + return ResourceSet(resource_set); +} + +std::string ResourceIdSet::ToString() const { + std::string return_string = "AvailableResources: "; + for (auto const &resource_pair : available_resources_) { + return_string += resource_pair.first + ": {"; + return_string += resource_pair.second.ToString(); + return_string += "}, "; + } + return return_string; +} + +std::vector> ResourceIdSet::ToFlatbuf( + flatbuffers::FlatBufferBuilder &fbb) const { + std::vector> return_message; + for (auto const &resource_pair : available_resources_) { + std::vector resource_ids; + std::vector resource_fractions; + for (auto whole_id : resource_pair.second.WholeIds()) { + resource_ids.push_back(whole_id); + resource_fractions.push_back(1); + } + + for (auto const &fractional_pair : resource_pair.second.FractionalIds()) { + resource_ids.push_back(fractional_pair.first); + resource_fractions.push_back(fractional_pair.second); + } + + auto resource_id_set_message = protocol::CreateResourceIdSetInfo( + fbb, fbb.CreateString(resource_pair.first), fbb.CreateVector(resource_ids), + fbb.CreateVector(resource_fractions)); + + return_message.push_back(resource_id_set_message); + } + + return return_message; +} + /// SchedulingResources class implementation SchedulingResources::SchedulingResources() diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index 249d45b61..153118a8e 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -6,6 +6,8 @@ #include #include +#include "ray/raylet/format/node_manager_generated.h" + namespace ray { namespace raylet { @@ -55,14 +57,14 @@ class ResourceSet { /// /// \param other: The resource set we check being a subset of. /// \return True if the current resource set is the subset of other. False - /// otherwise. + /// otherwise. bool IsSubset(const ResourceSet &other) const; /// \brief Test if this ResourceSet is a superset of the other ResourceSet. /// /// \param other: The resource set we check being a superset of. /// \return True if the current resource set is the superset of other. - /// False otherwise. + /// False otherwise. bool IsSuperset(const ResourceSet &other) const; /// \brief Add a new resource to the resource set. @@ -88,7 +90,7 @@ class ResourceSet { /// /// \param other: The resource set to subtract from the current resource set. /// \return True if the resource set was subtracted successfully. - /// False otherwise. + /// False otherwise. bool SubtractResources(const ResourceSet &other); /// Return the capacity value associated with the specified resource. @@ -96,7 +98,7 @@ class ResourceSet { /// \param resource_name: Resource name for which capacity is requested. /// \param[out] value: Resource capacity value. /// \return True if the resource capacity value was successfully retrieved. - /// False otherwise. + /// False otherwise. bool GetResource(const std::string &resource_name, double *value) const; /// Return the number of CPUs. @@ -119,6 +121,186 @@ class ResourceSet { std::unordered_map resource_capacity_; }; +/// \class ResourceIds +/// \brief This class generalizes the concept of a resource "quantity" to +/// include specific resource IDs and fractions of those resources. A typical example +/// is GPUs, where the GPUs are numbered 0 through N-1, where N is the total number +/// of GPUs. This information is ultimately passed through to the worker processes +/// which need to know which GPUs to use. +class ResourceIds { + public: + /// \brief empty ResourceIds constructor. + ResourceIds(); + + /// \brief Constructs ResourceIds with a given amount of resource. + /// + /// \param resource_quantity: The total amount of resource. This must either be + /// a whole number or a fraction less than 1. + ResourceIds(double resource_quantity); + + /// \brief Constructs ResourceIds with a given set of whole IDs. + /// + /// \param whole_ids: A vector of the resource IDs that are completely available. + ResourceIds(const std::vector &whole_ids); + + /// \brief Constructs ResourceIds with a given set of fractional IDs. + /// + /// \param fractional_ids: A vector of the resource IDs that are partially available. + ResourceIds(const std::vector> &fractional_ids); + + /// \brief Constructs ResourceIds with a given set of whole IDs and fractional IDs. + /// + /// \param whole_ids: A vector of the resource IDs that are completely available. + /// \param fractional_ids: A vector of the resource IDs that are partially available. + ResourceIds(const std::vector &whole_ids, + const std::vector> &fractional_ids); + + /// \brief Check if we have at least the requested amount. + /// + /// If the argument is a whole number, then we return True precisely when + /// we have enough whole IDs (ignoring fractional IDs). If the argument is a + /// fraction, then there must either be a whole ID or a single fractional ID with + /// a sufficiently large availability. E.g., if there are two IDs that have + /// availability 0.5, then Contains(0.75) will return false. + /// + /// \param resource_quantity Either a whole number or a fraction less than 1. + /// \return True if there we have enough of the resource. + bool Contains(double resource_quantity) const; + + /// \brief Acquire the requested amount of the resource. + /// + /// \param resource_quantity The amount to acquire. Either a whole number or a + /// fraction less than 1. + /// \return A ResourceIds representing the specific acquired IDs. + ResourceIds Acquire(double resource_quantity); + + /// \brief Return some resource IDs. + /// + /// \param resource_ids The specific resource IDs to return. + /// \return Void. + void Release(const ResourceIds &resource_ids); + + /// \brief Combine these IDs with some other IDs and return the result. + /// + /// \param resource_ids The IDs to add to these ones. + /// \return The combination of the IDs. + ResourceIds Plus(const ResourceIds &resource_ids) const; + + /// \brief Return just the whole IDs. + /// + /// \return The whole IDs. + const std::vector &WholeIds() const; + + /// \brief Return just the fractional IDs. + /// + /// \return The fractional IDs. + const std::vector> &FractionalIds() const; + + /// \brief Return the total quantity of resources, ignoring the specific IDs. + /// + /// \return The total quantity of the resource. + double TotalQuantity() const; + + /// \brief Return a string representation of the object. + /// + /// \return A human-readable string representing the object. + std::string ToString() const; + + private: + /// Check that a double is in fact a whole number. + /// + /// \param resource_quantity A double. + /// \return True if the double is an integer and false otherwise. + bool IsWhole(double resource_quantity) const; + + /// A vector of distinct whole resource IDs. + std::vector whole_ids_; + /// A vector of pairs of resource ID and a fraction of that ID (the fraction + /// is at least zero and strictly less than 1). + std::vector> fractional_ids_; +}; + +/// \class ResourceIdSet +/// \brief This class keeps track of the specific IDs that are available for a +/// collection of resources. +class ResourceIdSet { + public: + /// \brief empty ResourceIdSet constructor. + ResourceIdSet(); + + /// \brief Construct a ResourceIdSet from a ResourceSet. + /// + /// \param resource_set A mapping from resource name to quantity. + ResourceIdSet(const ResourceSet &resource_set); + + /// \brief Construct a ResourceIdSet from a mapping from resource names to ResourceIds. + /// + /// \param resource_set A mapping from resource name to IDs. + ResourceIdSet(const std::unordered_map &available_resources); + + /// \brief See if a requested collection of resources is contained. + /// + /// \param resource_set A mapping from resource name to quantity. + /// \return True if each resource in resource_set is contained in the corresponding + /// ResourceIds in this ResourceIdSet. + bool Contains(const ResourceSet &resource_set) const; + + /// \brief Acquire a set of resources and return the specific acquired IDs. + /// + /// \param resource_set A mapping from resource name to quantity. This specifies + /// the amount of each resource to acquire. + /// \return A ResourceIdSet with the requested quantities, but with specific IDs. + ResourceIdSet Acquire(const ResourceSet &resource_set); + + /// \brief Return a set of resource IDs. + /// + /// \param resource_id_set The resource IDs to return. + /// \return Void. + void Release(const ResourceIdSet &resource_id_set); + + /// \brief Clear out all of the resource IDs. + /// + /// \return Void. + void Clear(); + + /// \brief Combine another ResourceIdSet with this one. + /// + /// \param resource_id_set The other set of resource IDs to combine with this one. + /// \return The combination of the two sets of resource IDs. + ResourceIdSet Plus(const ResourceIdSet &resource_id_set) const; + + /// \brief Get the underlying mapping from resource name to resource IDs. + /// + /// \return The resource name to resource IDs mapping. + const std::unordered_map &AvailableResources() const; + + /// Return the CPU resources. + /// + /// \return The CPU resources. + ResourceIdSet GetCpuResources() const; + + /// \brief Get a mapping from each resource to the total quantity. + /// + /// \return A mapping from each resource to the total quantity. + ResourceSet ToResourceSet() const; + + /// \brief Get a string representation of the object. + /// + /// \return A human-readable string version of the object. + std::string ToString() const; + + /// \brief Serialize this object using flatbuffers. + /// + /// \param fbb A flatbuffer builder object. + /// \return A flatbuffer serialized version of this object. + std::vector> ToFlatbuf( + flatbuffers::FlatBufferBuilder &fbb) const; + + private: + /// A mapping from reosurce name to a set of resource IDs for that resource. + std::unordered_map available_resources_; +}; + /// \class SchedulingResources /// SchedulingResources class encapsulates the state of all local resources and /// manages accounting of those resources. Resources include configured resource @@ -142,7 +324,7 @@ class SchedulingResources { /// /// \param set: The set of resources representing the resource request. /// \return Availability status that specifies if the requested resource set - /// is feasible, infeasible, or feasible but unavailable. + /// is feasible, infeasible, or feasible but unavailable. ResourceAvailabilityStatus CheckResourcesSatisfied(ResourceSet &set) const; /// \brief Request the set and capacity of resources currently available. @@ -153,7 +335,7 @@ class SchedulingResources { /// \brief Overwrite available resource capacity with the specified resource set. /// /// \param newset: The set of resources that replaces available resource capacity. - /// \return None. + /// \return Void. void SetAvailableResources(ResourceSet &&newset); const ResourceSet &GetTotalResources() const; diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index 328cd1d7a..17289d480 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -43,6 +43,39 @@ const std::shared_ptr Worker::Connection() const { return connection_; } +const ResourceIdSet &Worker::GetLifetimeResourceIds() const { + return lifetime_resource_ids_; +} + +void Worker::ResetLifetimeResourceIds() { lifetime_resource_ids_.Clear(); } + +void Worker::SetLifetimeResourceIds(ResourceIdSet &resource_ids) { + lifetime_resource_ids_ = resource_ids; +} + +const ResourceIdSet &Worker::GetTaskResourceIds() const { return task_resource_ids_; } + +void Worker::ResetTaskResourceIds() { task_resource_ids_.Clear(); } + +void Worker::SetTaskResourceIds(ResourceIdSet &resource_ids) { + task_resource_ids_ = resource_ids; +} + +ResourceIdSet Worker::ReleaseTaskCpuResources() { + auto cpu_resources = task_resource_ids_.GetCpuResources(); + // The "acquire" terminology is a bit confusing here. The resources are being + // "acquired" from the task_resource_ids_ object, and so the worker is losing + // some resources. + task_resource_ids_.Acquire(cpu_resources.ToResourceSet()); + return cpu_resources; +} + +void Worker::AcquireTaskCpuResources(const ResourceIdSet &cpu_resources) { + // The "release" terminology is a bit confusing here. The resources are being + // given back to the worker and so "released" by the caller. + task_resource_ids_.Release(cpu_resources); +} + } // namespace raylet } // end namespace ray diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 4690df88d..cb1be04d8 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -5,6 +5,7 @@ #include "ray/common/client_connection.h" #include "ray/id.h" +#include "ray/raylet/scheduling_resources.h" namespace ray { @@ -31,6 +32,16 @@ class Worker { /// Return the worker's connection. const std::shared_ptr Connection() const; + const ResourceIdSet &GetLifetimeResourceIds() const; + void SetLifetimeResourceIds(ResourceIdSet &resource_ids); + void ResetLifetimeResourceIds(); + + const ResourceIdSet &GetTaskResourceIds() const; + void SetTaskResourceIds(ResourceIdSet &resource_ids); + void ResetTaskResourceIds(); + ResourceIdSet ReleaseTaskCpuResources(); + void AcquireTaskCpuResources(const ResourceIdSet &cpu_resources); + private: /// The worker's PID. pid_t pid_; @@ -43,6 +54,12 @@ class Worker { /// Whether the worker is blocked. Workers become blocked in a `ray.get`, if /// they require a data dependency while executing a task. bool blocked_; + /// The specific resource IDs that this worker owns for its lifetime. This is + /// only used for actors. + ResourceIdSet lifetime_resource_ids_; + /// The specific resource IDs that this worker currently owns for the duration + // of a task. + ResourceIdSet task_resource_ids_; }; } // namespace raylet diff --git a/test/actor_test.py b/test/actor_test.py index 79a571bd5..92334b404 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -378,9 +378,6 @@ class ActorMethods(unittest.TestCase): # called. self.assertEqual(ray.get(Actor.remote().method.remote()), 1) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorDeletionWithGPUs(self): ray.init(num_workers=0, num_gpus=1) @@ -735,9 +732,6 @@ class ActorsOnMultipleNodes(unittest.TestCase): def tearDown(self): ray.worker.cleanup() - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorsOnNodesWithNoCPUs(self): ray.init(num_cpus=0) @@ -799,6 +793,9 @@ class ActorsWithGPUs(unittest.TestCase): @unittest.skipIf( os.environ.get('RAY_USE_NEW_GCS', False), "Crashing with new GCS API.") + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testActorGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 4 @@ -843,6 +840,9 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) self.assertEqual(ready_ids, []) + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testActorMultipleGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 5 @@ -916,6 +916,9 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) self.assertEqual(ready_ids, []) + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testActorDifferentNumbersOfGPUs(self): # Test that we can create actors on two nodes that have different # numbers of GPUs. @@ -959,6 +962,9 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) self.assertEqual(ready_ids, []) + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testActorMultipleGPUsFromMultipleTasks(self): num_local_schedulers = 10 num_gpus_per_scheduler = 10 @@ -1006,6 +1012,9 @@ class ActorsWithGPUs(unittest.TestCase): self.assertEqual(ready_ids, []) @unittest.skipIf(sys.version_info < (3, 0), "This test requires Python 3.") + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testActorsAndTasksWithGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 6 @@ -1153,9 +1162,6 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, remaining_ids = ray.wait(results, timeout=1000) self.assertEqual(len(ready_ids), 0) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorsAndTasksWithGPUsVersionTwo(self): # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs @@ -1228,9 +1234,6 @@ class ActorsWithGPUs(unittest.TestCase): self.assertLess(interval1[1], interval2[0]) self.assertLess(interval2[0], interval2[1]) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testBlockingActorTask(self): ray.init(num_cpus=1, num_gpus=1) @@ -1283,6 +1286,9 @@ class ActorsWithGPUs(unittest.TestCase): self.assertEqual(remaining_ids, [x_id]) +@unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") class ActorReconstruction(unittest.TestCase): def tearDown(self): ray.worker.cleanup() @@ -2038,9 +2044,6 @@ class ActorPlacementAndResources(unittest.TestCase): for location in locations2: self.assertNotEqual(location, local_plasma) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testCreatingMoreActorsThanResources(self): ray.init( num_workers=0, diff --git a/test/array_test.py b/test/array_test.py index 04ae66d37..07ccac99a 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -2,12 +2,13 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import unittest -import ray +import os import numpy as np from numpy.testing import assert_equal, assert_almost_equal import sys +import unittest +import ray import ray.experimental.array.remote as ra import ray.experimental.array.distributed as da @@ -74,6 +75,9 @@ class DistributedArrayTest(unittest.TestCase): np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE]) ])) + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testMethods(self): for module in [ ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg diff --git a/test/failure_test.py b/test/failure_test.py index 61fdded80..7d1511f30 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -371,6 +371,9 @@ class WorkerDeath(unittest.TestCase): self.assertRaises(Exception, lambda: ray.get(task2)) +@unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") class PutErrorTest(unittest.TestCase): def tearDown(self): ray.worker.cleanup() diff --git a/test/recursion_test.py b/test/recursion_test.py index a769c52e8..38134c108 100644 --- a/test/recursion_test.py +++ b/test/recursion_test.py @@ -8,7 +8,7 @@ from __future__ import print_function import ray -ray.init() +ray.init(num_cpus=1) @ray.remote diff --git a/test/runtest.py b/test/runtest.py index e9d05b77c..0a20dae38 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -698,9 +698,6 @@ class APITest(unittest.TestCase): self.assertEqual(ray.get(k2.remote(1)), 2) self.assertEqual(ray.get(m.remote(1)), 2) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testSubmitAPI(self): self.init_ray(num_gpus=1, resources={"Custom": 1}, num_workers=1) @@ -1128,9 +1125,6 @@ class PythonModeTest(unittest.TestCase): def tearDown(self): ray.worker.cleanup() - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testPythonMode(self): reload(test_functions) ray.init(driver_mode=ray.PYTHON_MODE) @@ -1327,9 +1321,6 @@ class ResourcesTest(unittest.TestCase): self.assertLess(duration, 1 + time_buffer) self.assertGreater(duration, 1) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testGPUIDs(self): num_gpus = 10 ray.init(num_cpus=10, num_gpus=num_gpus) @@ -1482,6 +1473,9 @@ class ResourcesTest(unittest.TestCase): a1 = Actor1.remote() ray.get(a1.test.remote()) + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testZeroCPUs(self): ray.worker._init( start_ray_local=True, num_local_schedulers=2, num_cpus=[0, 2]) @@ -1502,6 +1496,59 @@ class ResourcesTest(unittest.TestCase): a = Foo.remote() self.assertNotEqual(ray.get(a.method.remote()), local_plasma) + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") != "1", + "This test only works with xray.") + def testFractionalResources(self): + ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) + + @ray.remote(num_gpus=0.5) + class Foo1(object): + def method(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + return gpu_ids[0] + + foos = [Foo1.remote() for _ in range(6)] + gpu_ids = ray.get([f.method.remote() for f in foos]) + for i in range(3): + assert gpu_ids.count(i) == 2 + del foos + + @ray.remote + class Foo2(object): + def method(self): + pass + + # Create an actor that requires 0.7 of the custom resource. + f1 = Foo2._submit([], {}, resources={"Custom": 0.7}) + ray.get(f1.method.remote()) + # Make sure that we cannot create an actor that requires 0.7 of the + # custom resource. TODO(rkn): Re-enable this once ray.wait is + # implemented. + f2 = Foo2._submit([], {}, resources={"Custom": 0.7}) + ready, _ = ray.wait([f2.method.remote()], timeout=500) + assert len(ready) == 0 + # Make sure we can start an actor that requries only 0.3 of the custom + # resource. + f3 = Foo2._submit([], {}, resources={"Custom": 0.3}) + ray.get(f3.method.remote()) + + del f1, f3 + + # Make sure that we get exceptions if we submit tasks that require a + # fractional number of resources greater than 1. + + @ray.remote(num_cpus=1.5) + def test(): + pass + + with self.assertRaises(ValueError): + test.remote() + + with self.assertRaises(ValueError): + Foo2._submit([], {}, resources={"Custom": 1.5}) + def testMultipleLocalSchedulers(self): # This test will define a bunch of tasks that can only be assigned to # specific local schedulers, and we will check that they are assigned @@ -1760,9 +1807,6 @@ class CudaVisibleDevicesTest(unittest.TestCase): else: del os.environ["CUDA_VISIBLE_DEVICES"] - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testSpecificGPUs(self): allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join( @@ -1803,9 +1847,6 @@ class WorkerPoolTests(unittest.TestCase): ray.get([f.remote() for _ in range(100)]) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testBlockingTasks(self): ray.init(num_cpus=1) @@ -1835,9 +1876,6 @@ class WorkerPoolTests(unittest.TestCase): ray.get(sleep.remote()) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testMaxCallTasks(self): ray.init(num_cpus=1) diff --git a/test/stress_tests.py b/test/stress_tests.py index b929110b3..d3dff0043 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -12,6 +12,9 @@ import ray.ray_constants as ray_constants class TaskTests(unittest.TestCase): + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testSubmittingTasks(self): for num_local_schedulers in [1, 4]: for num_workers_per_scheduler in [4]: @@ -41,6 +44,9 @@ class TaskTests(unittest.TestCase): self.assertTrue(ray.services.all_processes_alive()) ray.worker.cleanup() + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testDependencies(self): for num_local_schedulers in [1, 4]: for num_workers_per_scheduler in [4]: @@ -123,6 +129,9 @@ class TaskTests(unittest.TestCase): self.assertTrue(ray.services.all_processes_alive()) ray.worker.cleanup() + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") def testWait(self): for num_local_schedulers in [1, 4]: for num_workers_per_scheduler in [4]: @@ -158,6 +167,9 @@ class TaskTests(unittest.TestCase): ray.worker.cleanup() +@unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") class ReconstructionTests(unittest.TestCase): num_local_schedulers = 1 @@ -504,6 +516,9 @@ class ReconstructionTests(unittest.TestCase): for error in errors)) +@unittest.skipIf( + os.environ.get("RAY_USE_XRAY") == "1", + "This test does not work with xray yet.") class ReconstructionTestsMultinode(ReconstructionTests): # Run the same tests as the single-node suite, but with 4 local schedulers,