diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index b035f3b52..ddbb93c70 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -286,4 +286,3 @@ public abstract class AbstractRayRuntime implements RayRuntime { return functionManager; } } - diff --git a/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java b/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java index 8c0512afb..011130960 100644 --- a/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java +++ b/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java @@ -48,9 +48,12 @@ public final class TaskInfo extends Table { public ResourcePair requiredResources(int j) { return requiredResources(new ResourcePair(), j); } public ResourcePair requiredResources(ResourcePair obj, int j) { int o = __offset(30); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } public int requiredResourcesLength() { int o = __offset(30); return o != 0 ? __vector_len(o) : 0; } - public int language() { int o = __offset(32); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public String functionDescriptor(int j) { int o = __offset(34); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int functionDescriptorLength() { int o = __offset(34); return o != 0 ? __vector_len(o) : 0; } + public ResourcePair requiredPlacementResources(int j) { return requiredPlacementResources(new ResourcePair(), j); } + public ResourcePair requiredPlacementResources(ResourcePair obj, int j) { int o = __offset(32); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int requiredPlacementResourcesLength() { int o = __offset(32); return o != 0 ? __vector_len(o) : 0; } + public int language() { int o = __offset(34); return o != 0 ? bb.getInt(o + bb_pos) : 0; } + public String functionDescriptor(int j) { int o = __offset(36); return o != 0 ? __string(__vector(o) + j * 4) : null; } + public int functionDescriptorLength() { int o = __offset(36); return o != 0 ? __vector_len(o) : 0; } public static int createTaskInfo(FlatBufferBuilder builder, int driver_idOffset, @@ -67,11 +70,13 @@ public final class TaskInfo extends Table { int argsOffset, int returnsOffset, int required_resourcesOffset, + int required_placement_resourcesOffset, int language, int function_descriptorOffset) { - builder.startObject(16); + builder.startObject(17); TaskInfo.addFunctionDescriptor(builder, function_descriptorOffset); TaskInfo.addLanguage(builder, language); + TaskInfo.addRequiredPlacementResources(builder, required_placement_resourcesOffset); TaskInfo.addRequiredResources(builder, required_resourcesOffset); TaskInfo.addReturns(builder, returnsOffset); TaskInfo.addArgs(builder, argsOffset); @@ -89,7 +94,7 @@ public final class TaskInfo extends Table { return TaskInfo.endTaskInfo(builder); } - public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(16); } + public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(17); } public static void addDriverId(FlatBufferBuilder builder, int driverIdOffset) { builder.addOffset(0, driverIdOffset, 0); } public static void addTaskId(FlatBufferBuilder builder, int taskIdOffset) { builder.addOffset(1, taskIdOffset, 0); } public static void addParentTaskId(FlatBufferBuilder builder, int parentTaskIdOffset) { builder.addOffset(2, parentTaskIdOffset, 0); } @@ -110,8 +115,11 @@ public final class TaskInfo extends Table { public static void addRequiredResources(FlatBufferBuilder builder, int requiredResourcesOffset) { builder.addOffset(13, requiredResourcesOffset, 0); } public static int createRequiredResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } public static void startRequiredResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(14, language, 0); } - public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(15, functionDescriptorOffset, 0); } + public static void addRequiredPlacementResources(FlatBufferBuilder builder, int requiredPlacementResourcesOffset) { builder.addOffset(14, requiredPlacementResourcesOffset, 0); } + public static int createRequiredPlacementResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startRequiredPlacementResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(15, language, 0); } + public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(16, functionDescriptorOffset, 0); } public static int createFunctionDescriptorVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } public static void startFunctionDescriptorVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } public static int endTaskInfo(FlatBufferBuilder builder) { @@ -136,4 +144,3 @@ public final class TaskInfo extends Table { return src; } } - diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index 1a78f22de..215249504 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -209,6 +209,11 @@ public class RayletClientImpl implements RayletClient { ResourcePair.createResourcePair(fbb, keyOffset, entry.getValue()); } int requiredResourcesOffset = fbb.createVectorOfTables(requiredResourcesOffsets); + + int[] requiredPlacementResourcesOffsets = new int[0]; + int requiredPlacementResourcesOffset = + fbb.createVectorOfTables(requiredPlacementResourcesOffsets); + int[] functionDescriptorOffsets = new int[]{ fbb.createString(task.functionDescriptor.className), fbb.createString(task.functionDescriptor.name), @@ -222,7 +227,8 @@ public class RayletClientImpl implements RayletClient { actorCreateIdOffset, actorCreateDummyIdOffset, actorIdOffset, actorHandleIdOffset, actorCounter, false, functionIdOffset, - argsOffset, returnsOffset, requiredResourcesOffset, TaskLanguage.JAVA, + argsOffset, returnsOffset, requiredResourcesOffset, + requiredPlacementResourcesOffset, TaskLanguage.JAVA, functionDescriptorOffset); fbb.finish(root); ByteBuffer buffer = fbb.dataBuffer(); diff --git a/python/ray/actor.py b/python/ray/actor.py index d61fac7d7..65ddc266f 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -373,6 +373,15 @@ class ActorClass(object): self._num_cpus, self._num_gpus, self._resources, num_cpus, num_gpus, resources) + # If the actor methods require CPU resources, then set the required + # placement resources. If actor_placement_resources is empty, then + # the required placement resources will be the same as resources. + actor_placement_resources = {} + assert self._actor_method_cpus in [0, 1] + if self._actor_method_cpus == 1: + actor_placement_resources = resources.copy() + actor_placement_resources["CPU"] += 1 + creation_args = [self._class_id] function_id = compute_actor_creation_function_id(self._class_id) [actor_cursor] = worker.submit_task( @@ -380,7 +389,8 @@ class ActorClass(object): creation_args, actor_creation_id=actor_id, num_return_vals=1, - resources=resources) + resources=resources, + placement_resources=actor_placement_resources) # We initialize the actor counter at 1 to account for the actor # creation task. @@ -566,6 +576,7 @@ class ActorHandle(object): # We add one for the dummy return ID. num_return_vals=num_return_vals + 1, resources={"CPU": self._ray_actor_method_cpus}, + placement_resources={}, driver_id=self._ray_actor_driver_id) # Update the actor counter and cursor to reflect the most recent # invocation. diff --git a/python/ray/worker.py b/python/ray/worker.py index a88503bf0..4739b2e7c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -550,6 +550,7 @@ class Worker(object): execution_dependencies=None, num_return_vals=None, resources=None, + placement_resources=None, driver_id=None): """Submit a remote task to the scheduler. @@ -575,6 +576,9 @@ class Worker(object): num_return_vals: The number of return values this function should have. resources: The resource requirements for this task. + placement_resources: The resources required for placing the task. + If this is not provided or if it is an empty dictionary, then + the placement resources will be equal to resources. driver_id: The ID of the relevant driver. This is almost always the driver ID of the driver that is currently running. However, in the exceptional case that an actor task is being dispatched to @@ -628,6 +632,9 @@ class Worker(object): raise ValueError( "Resource quantities must all be whole numbers.") + if placement_resources is None: + placement_resources = {} + with self.state_lock: # Increment the worker's task index to track how many tasks # have been submitted by the current task so far. @@ -640,7 +647,8 @@ class Worker(object): num_return_vals, self.current_task_id, task_index, actor_creation_id, actor_creation_dummy_object_id, actor_id, actor_handle_id, actor_counter, is_actor_checkpoint_method, - execution_dependencies, resources, self.use_raylet) + execution_dependencies, resources, placement_resources, + self.use_raylet) self.local_scheduler_client.submit(task) return task.returns() @@ -2138,7 +2146,7 @@ def connect(info, worker.current_task_id, worker.task_index, ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), - nil_actor_counter, False, [], {"CPU": 0}, worker.use_raylet) + nil_actor_counter, False, [], {"CPU": 0}, {}, worker.use_raylet) # Add the driver task to the task table. if not worker.use_raylet: diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index 9dc9f651a..a5b2177f1 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -60,6 +60,9 @@ table TaskInfo { // The required_resources vector indicates the quantities of the different // resources required by this task. required_resources: [ResourcePair]; + // The resources required for placing this task on a node. If this is empty, + // then the placement resources are equal to the required_resources. + required_placement_resources: [ResourcePair]; // The language that this task belongs to language: TaskLanguage; // Function descriptor, which is a list of strings that can diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 68965e270..3a1a44f5e 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -295,49 +295,100 @@ PyTypeObject PyObjectIDType = { PyType_GenericNew, /* tp_new */ }; -/* Define the PyTask class. */ +// Define the PyTask class. + +int resource_map_from_python_dict( + PyObject *resource_map, + std::unordered_map &out) { + RAY_CHECK(out.size() == 0); + + PyObject *key, *value; + Py_ssize_t position = 0; + if (!PyDict_Check(resource_map)) { + PyErr_SetString(PyExc_TypeError, "resource_map must be a dictionary"); + return -1; + } + + while (PyDict_Next(resource_map, &position, &key, &value)) { +#if PY_MAJOR_VERSION >= 3 + if (!PyUnicode_Check(key)) { + PyErr_SetString(PyExc_TypeError, + "the keys in resource_map must be strings"); + return -1; + } +#else + if (!PyBytes_Check(key)) { + PyErr_SetString(PyExc_TypeError, + "the keys in resource_map must be strings"); + return -1; + } +#endif + + // Check that the resource quantities are numbers. + if (!(PyFloat_Check(value) || PyInt_Check(value) || PyLong_Check(value))) { + PyErr_SetString(PyExc_TypeError, + "the values in resource_map must be floats"); + return -1; + } + // Handle the case where the key is a bytes object and the case where it + // is a unicode object. + std::string resource_name; + if (PyUnicode_Check(key)) { + PyObject *ascii_key = PyUnicode_AsASCIIString(key); + resource_name = + std::string(PyBytes_AsString(ascii_key), PyBytes_Size(ascii_key)); + Py_DECREF(ascii_key); + } else { + resource_name = std::string(PyBytes_AsString(key), PyBytes_Size(key)); + } + out[resource_name] = PyFloat_AsDouble(value); + } + return 0; +} static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { - /* ID of the driver that this task originates from. */ + // ID of the driver that this task originates from. UniqueID driver_id; - /* ID of the actor this task should run on. */ + // ID of the actor this task should run on. UniqueID actor_id = ActorID::nil(); - /* ID of the actor handle used to submit this task. */ + // ID of the actor handle used to submit this task. UniqueID actor_handle_id = ActorHandleID::nil(); - /* How many tasks have been launched on the actor so far? */ + // How many tasks have been launched on the actor so far? int actor_counter = 0; - /* True if this is an actor checkpoint task and false otherwise. */ + // True if this is an actor checkpoint task and false otherwise. PyObject *is_actor_checkpoint_method_object = nullptr; - /* ID of the function this task executes. */ + // ID of the function this task executes. FunctionID function_id; - /* Arguments of the task (can be PyObjectIDs or Python values). */ + // Arguments of the task (can be PyObjectIDs or Python values). PyObject *arguments; - /* Number of return values of this task. */ + // Number of return values of this task. int num_returns; - /* The ID of the task that called this task. */ + // The ID of the task that called this task. TaskID parent_task_id; - /* The number of tasks that the parent task has called prior to this one. */ + // The number of tasks that the parent task has called prior to this one. int parent_counter; // The actor creation ID. ActorID actor_creation_id = ActorID::nil(); // The dummy object for the actor creation task (if this is an actor method). ObjectID actor_creation_dummy_object_id = ObjectID::nil(); - /* Arguments of the task that are execution-dependent. These must be - * PyObjectIDs). */ + // Arguments of the task that are execution-dependent. These must be + // PyObjectIDs). PyObject *execution_arguments = nullptr; - /* Dictionary of resource requirements for this task. */ + // Dictionary of resource requirements for this task. PyObject *resource_map = nullptr; + // Dictionary of required placement resources for this task. + PyObject *placement_resource_map = nullptr; // True if we should use the raylet code path and false otherwise. PyObject *use_raylet_object = nullptr; if (!PyArg_ParseTuple( - args, "O&O&OiO&i|O&O&O&O&iOOOO", &PyObjectToUniqueID, &driver_id, + args, "O&O&OiO&i|O&O&O&O&iOOOOO", &PyObjectToUniqueID, &driver_id, &PyObjectToUniqueID, &function_id, &arguments, &num_returns, &PyObjectToUniqueID, &parent_task_id, &parent_counter, &PyObjectToUniqueID, &actor_creation_id, &PyObjectToUniqueID, &actor_creation_dummy_object_id, &PyObjectToUniqueID, &actor_id, &PyObjectToUniqueID, &actor_handle_id, &actor_counter, &is_actor_checkpoint_method_object, &execution_arguments, - &resource_map, &use_raylet_object)) { + &resource_map, &placement_resource_map, &use_raylet_object)) { return -1; } @@ -349,48 +400,25 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { // Parse the resource map. std::unordered_map required_resources; + std::unordered_map required_placement_resources; - bool found_CPU_requirements = false; - PyObject *key, *value; - Py_ssize_t position = 0; if (resource_map != nullptr) { - if (!PyDict_Check(resource_map)) { - PyErr_SetString(PyExc_TypeError, "resource_map must be a dictionary"); + if (resource_map_from_python_dict(resource_map, required_resources) != 0) { return -1; } - while (PyDict_Next(resource_map, &position, &key, &value)) { - if (!(PyBytes_Check(key) || PyUnicode_Check(key))) { - PyErr_SetString(PyExc_TypeError, - "the keys in resource_map must be strings"); - return -1; - } - if (!(PyFloat_Check(value) || PyInt_Check(value) || - PyLong_Check(value))) { - PyErr_SetString(PyExc_TypeError, - "the values in resource_map must be floats"); - return -1; - } - // Handle the case where the key is a bytes object and the case where it - // is a unicode object. - std::string resource_name; - if (PyUnicode_Check(key)) { - PyObject *ascii_key = PyUnicode_AsASCIIString(key); - resource_name = - std::string(PyBytes_AsString(ascii_key), PyBytes_Size(ascii_key)); - Py_DECREF(ascii_key); - } else { - resource_name = std::string(PyBytes_AsString(key), PyBytes_Size(key)); - } - if (resource_name == std::string("CPU")) { - found_CPU_requirements = true; - } - required_resources[resource_name] = PyFloat_AsDouble(value); - } } - if (!found_CPU_requirements) { + + if (required_resources.count("CPU") == 0) { required_resources["CPU"] = 1.0; } + if (placement_resource_map != nullptr) { + if (resource_map_from_python_dict(placement_resource_map, + required_placement_resources) != 0) { + return -1; + } + } + Py_ssize_t num_args = PyList_Size(arguments); bool use_raylet = false; @@ -463,7 +491,7 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { driver_id, parent_task_id, parent_counter, actor_creation_id, actor_creation_dummy_object_id, actor_id, actor_handle_id, actor_counter, function_id, args, num_returns, required_resources, - Language::PYTHON); + required_placement_resources, Language::PYTHON); } /* Set the task's execution dependencies. */ diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2d0bcf861..c4e75dbb8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -945,17 +945,20 @@ void NodeManager::ScheduleTasks( // TODO(rkn): Define this constant somewhere else. std::string type = "infeasible_task"; std::ostringstream error_message; - error_message << "The task with ID " << task.GetTaskSpecification().TaskId() - << " is infeasible and cannot currently be executed. " - << "It requested " - << task.GetTaskSpecification().GetRequiredResources().ToString(); + error_message + << "The task with ID " << task.GetTaskSpecification().TaskId() + << " is infeasible and cannot currently be executed. It requires " + << task.GetTaskSpecification().GetRequiredResources().ToString() + << " for execution and " + << task.GetTaskSpecification().GetRequiredPlacementResources().ToString() + << " for placement. Check the client table to view node resources."; RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( task.GetTaskSpecification().DriverId(), type, error_message.str(), current_time_ms())); } // Assert that this placeable task is not feasible locally (necessary but not // sufficient). - RAY_CHECK(!task.GetTaskSpecification().GetRequiredResources().IsSubset( + RAY_CHECK(!task.GetTaskSpecification().GetRequiredPlacementResources().IsSubset( cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()] .GetTotalResources())); } @@ -1009,7 +1012,7 @@ void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) { void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage, bool forwarded) { - const TaskID task_id = task.GetTaskSpecification().TaskId(); + const TaskID &task_id = task.GetTaskSpecification().TaskId(); if (local_queues_.HasTask(task_id)) { RAY_LOG(WARNING) << "Submitted task " << task_id << " is already queued and will not be reconstructed. This is most " @@ -1287,6 +1290,8 @@ void NodeManager::AssignTask(Task &task) { this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); if (spec.IsActorCreationTask()) { + // Check that we are not placing an actor creation task on a node with 0 CPUs. + RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetNumCpus() != 0); worker->SetLifetimeResourceIds(acquired_resources); } else { worker->SetTaskResourceIds(acquired_resources); diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index 0ed4efb27..eb2d41632 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -35,12 +35,9 @@ std::unordered_map SchedulingPolicy::Schedule( // Iterate over running tasks, get their resource demand and try to schedule. for (const auto &t : scheduling_queue_.GetPlaceableTasks()) { // Get task's resource demand - const auto &resource_demand = t.GetTaskSpecification().GetRequiredResources(); - const TaskID &task_id = t.GetTaskSpecification().TaskId(); - RAY_LOG(DEBUG) << "[SchedulingPolicy]: task=" << task_id - << " numforwards=" << t.GetTaskExecutionSpec().NumForwards() - << " resources=" - << t.GetTaskSpecification().GetRequiredResources().ToString(); + const auto &spec = t.GetTaskSpecification(); + const auto &resource_demand = spec.GetRequiredPlacementResources(); + const TaskID &task_id = spec.TaskId(); // TODO(atumanov): try to place tasks locally first. // Construct a set of viable node candidates and randomly pick between them. @@ -97,7 +94,7 @@ std::unordered_map SchedulingPolicy::Schedule( std::uniform_int_distribution distribution(0, client_keys.size() - 1); int client_key_index = distribution(gen_); const ClientID &dst_client_id = client_keys[client_key_index]; - decision[t.GetTaskSpecification().TaskId()] = dst_client_id; + decision[task_id] = dst_client_id; // Update dst_client_id's load to keep track of remote task load until // the next heartbeat. ResourceSet new_load(cluster_resources[dst_client_id].GetLoadResources()); @@ -107,9 +104,11 @@ std::unordered_map SchedulingPolicy::Schedule( // There are no nodes that can feasibly execute this task. The task remains // placeable until cluster capacity becomes available. // TODO(rkn): Propagate a warning to the user. - RAY_LOG(INFO) << "This task requires " - << t.GetTaskSpecification().GetRequiredResources().ToString() - << ", but no nodes have the necessary resources."; + RAY_LOG(INFO) << "The task with ID " << task_id << " requires " + << spec.GetRequiredResources().ToString() << " for execution and " + << spec.GetRequiredPlacementResources().ToString() + << " for placement, but no nodes have the necessary resources. " + << "Check the client table to view node resources."; } } } @@ -126,19 +125,21 @@ std::vector SchedulingPolicy::SpillOver( // Check if we can accommodate an infeasible task. for (const auto &task : scheduling_queue_.GetInfeasibleTasks()) { - if (task.GetTaskSpecification().GetRequiredResources().IsSubset( + const auto &spec = task.GetTaskSpecification(); + if (spec.GetRequiredPlacementResources().IsSubset( remote_scheduling_resources.GetTotalResources())) { - decision.push_back(task.GetTaskSpecification().TaskId()); - new_load.AddResources(task.GetTaskSpecification().GetRequiredResources()); + decision.push_back(spec.TaskId()); + new_load.AddResources(spec.GetRequiredResources()); } } for (const auto &task : scheduling_queue_.GetReadyTasks()) { - if (!task.GetTaskSpecification().IsActorTask()) { - if (task.GetTaskSpecification().GetRequiredResources().IsSubset( + const auto &spec = task.GetTaskSpecification(); + if (!spec.IsActorTask()) { + if (spec.GetRequiredPlacementResources().IsSubset( remote_scheduling_resources.GetTotalResources())) { - decision.push_back(task.GetTaskSpecification().TaskId()); - new_load.AddResources(task.GetTaskSpecification().GetRequiredResources()); + decision.push_back(spec.TaskId()); + new_load.AddResources(spec.GetRequiredResources()); break; } } diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index 49519c493..e85f3eaa7 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -17,7 +17,7 @@ ResourceSet::ResourceSet(const std::vector &resource_labels, const std::vector resource_capacity) { RAY_CHECK(resource_labels.size() == resource_capacity.size()); for (uint i = 0; i < resource_labels.size(); i++) { - RAY_CHECK(this->AddResource(resource_labels[i], resource_capacity[i])); + RAY_CHECK(AddResource(resource_labels[i], resource_capacity[i])); } } @@ -119,11 +119,11 @@ bool ResourceSet::GetResource(const std::string &resource_name, double *value) c if (!value) { return false; } - if (this->resource_capacity_.count(resource_name) == 0) { + if (resource_capacity_.count(resource_name) == 0) { *value = std::nan(""); return false; } - *value = this->resource_capacity_.at(resource_name); + *value = resource_capacity_.at(resource_name); return true; } @@ -135,15 +135,25 @@ double ResourceSet::GetNumCpus() const { const std::string ResourceSet::ToString() const { std::string return_string = ""; - for (const auto &resource_pair : this->resource_capacity_) { - return_string += - "{" + resource_pair.first + "," + std::to_string(resource_pair.second) + "}, "; + + auto it = resource_capacity_.begin(); + + // Convert the first element to a string. + if (it != resource_capacity_.end()) { + return_string += "{" + it->first + "," + std::to_string(it->second) + "}"; } + it++; + + // Add the remaining elements to the string (along with a comma). + for (; it != resource_capacity_.end(); ++it) { + return_string += ",{" + it->first + "," + std::to_string(it->second) + "}"; + } + return return_string; } const std::unordered_map &ResourceSet::GetResourceMap() const { - return this->resource_capacity_; + return resource_capacity_; }; /// ResourceIds class implementation @@ -400,11 +410,20 @@ ResourceSet ResourceIdSet::ToResourceSet() const { std::string ResourceIdSet::ToString() const { std::string return_string = "AvailableResources: "; - for (auto const &resource_pair : available_resources_) { - return_string += resource_pair.first + ": {"; - return_string += resource_pair.second.ToString(); - return_string += "}, "; + + auto it = available_resources_.begin(); + + // Convert the first element to a string. + if (it != available_resources_.end()) { + return_string += (it->first + ": {" + it->second.ToString() + "}"); } + it++; + + // Add the remaining elements to the string (along with a comma). + for (; it != available_resources_.end(); ++it) { + return_string += (", " + it->first + ": {" + it->second.ToString() + "}"); + } + return return_string; } @@ -450,26 +469,26 @@ SchedulingResources::~SchedulingResources() {} ResourceAvailabilityStatus SchedulingResources::CheckResourcesSatisfied( ResourceSet &resources) const { - if (!resources.IsSubset(this->resources_total_)) { + if (!resources.IsSubset(resources_total_)) { return ResourceAvailabilityStatus::kInfeasible; } // Resource demand specified is feasible. Check if it's available. - if (!resources.IsSubset(this->resources_available_)) { + if (!resources.IsSubset(resources_available_)) { return ResourceAvailabilityStatus::kResourcesUnavailable; } return ResourceAvailabilityStatus::kFeasible; } const ResourceSet &SchedulingResources::GetAvailableResources() const { - return this->resources_available_; + return resources_available_; } void SchedulingResources::SetAvailableResources(ResourceSet &&newset) { - this->resources_available_ = newset; + resources_available_ = newset; } const ResourceSet &SchedulingResources::GetTotalResources() const { - return this->resources_total_; + return resources_total_; } void SchedulingResources::SetLoadResources(ResourceSet &&newset) { @@ -482,12 +501,12 @@ const ResourceSet &SchedulingResources::GetLoadResources() const { // Return specified resources back to SchedulingResources. bool SchedulingResources::Release(const ResourceSet &resources) { - return this->resources_available_.AddResourcesStrict(resources); + return resources_available_.AddResourcesStrict(resources); } // Take specified resources from SchedulingResources. bool SchedulingResources::Acquire(const ResourceSet &resources) { - return this->resources_available_.SubtractResourcesStrict(resources); + return resources_available_.SubtractResourcesStrict(resources); } } // namespace raylet diff --git a/src/ray/raylet/task_spec.cc b/src/ray/raylet/task_spec.cc index b9fd35f02..cab87a729 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/raylet/task_spec.cc @@ -50,7 +50,7 @@ TaskSpecification::TaskSpecification( : TaskSpecification(driver_id, parent_task_id, parent_counter, ActorID::nil(), ObjectID::nil(), ActorID::nil(), ActorHandleID::nil(), -1, function_id, task_arguments, num_returns, required_resources, - language) {} + std::unordered_map(), language) {} TaskSpecification::TaskSpecification( const UniqueID &driver_id, const TaskID &parent_task_id, int64_t parent_counter, @@ -59,6 +59,7 @@ TaskSpecification::TaskSpecification( const FunctionID &function_id, const std::vector> &task_arguments, int64_t num_returns, const std::unordered_map &required_resources, + const std::unordered_map &required_placement_resources, const Language &language) : spec_() { flatbuffers::FlatBufferBuilder fbb; @@ -99,7 +100,8 @@ TaskSpecification::TaskSpecification( to_flatbuf(fbb, actor_creation_dummy_object_id), to_flatbuf(fbb, actor_id), to_flatbuf(fbb, actor_handle_id), actor_counter, false, to_flatbuf(fbb, function_id), fbb.CreateVector(arguments), - fbb.CreateVector(returns), map_to_flatbuf(fbb, required_resources), task_language); + fbb.CreateVector(returns), map_to_flatbuf(fbb, required_resources), + map_to_flatbuf(fbb, required_placement_resources), task_language); fbb.Finish(spec); AssignSpecification(fbb.GetBufferPointer(), fbb.GetSize()); } @@ -179,6 +181,19 @@ const ResourceSet TaskSpecification::GetRequiredResources() const { return ResourceSet(required_resources); } +const ResourceSet TaskSpecification::GetRequiredPlacementResources() const { + auto message = flatbuffers::GetRoot(spec_.data()); + auto required_placement_resources = + map_from_flatbuf(*message->required_placement_resources()); + // If the required_placement_resources field is empty, then the placement + // resources default to the required resources. + if (required_placement_resources.size() == 0) { + required_placement_resources = map_from_flatbuf(*message->required_resources()); + } + + return ResourceSet(required_placement_resources); +} + bool TaskSpecification::IsDriverTask() const { // Driver tasks are empty tasks that have no function ID set. return FunctionId().is_nil(); diff --git a/src/ray/raylet/task_spec.h b/src/ray/raylet/task_spec.h index a4075ff6a..49bd02bd6 100644 --- a/src/ray/raylet/task_spec.h +++ b/src/ray/raylet/task_spec.h @@ -83,34 +83,61 @@ class TaskSpecification { TaskSpecification(const flatbuffers::String &string); // TODO(swang): Define an actor task constructor. - /// Create a task specification from the raw fields. + /// Create a task specification from the raw fields. This constructor omits + /// some values and sets them to sensible defaults. /// /// \param driver_id The driver ID, representing the job that this task is a - /// part of. + /// part of. /// \param parent_task_id The task ID of the task that spawned this task. /// \param parent_counter The number of tasks that this task's parent spawned - /// before this task. + /// before this task. /// \param function_id The ID of the function this task should execute. - /// \param arguments The list of task arguments. + /// \param task_arguments The list of task arguments. /// \param num_returns The number of values returned by the task. /// \param required_resources The task's resource demands. + /// \param language The language of the worker that must execute the function. TaskSpecification(const UniqueID &driver_id, const TaskID &parent_task_id, int64_t parent_counter, const FunctionID &function_id, - const std::vector> &arguments, - int64_t num_returns, - const std::unordered_map &required_resources, - const Language &language); - - TaskSpecification(const UniqueID &driver_id, const TaskID &parent_task_id, - int64_t parent_counter, const ActorID &actor_creation_id, - const ObjectID &actor_creation_dummy_object_id, - const ActorID &actor_id, const ActorHandleID &actor_handle_id, - int64_t actor_counter, const FunctionID &function_id, const std::vector> &task_arguments, int64_t num_returns, const std::unordered_map &required_resources, const Language &language); + // TODO(swang): Define an actor task constructor. + /// Create a task specification from the raw fields. + /// + /// \param driver_id The driver ID, representing the job that this task is a + /// part of. + /// \param parent_task_id The task ID of the task that spawned this task. + /// \param parent_counter The number of tasks that this task's parent spawned + /// before this task. + /// \param actor_creation_id If this is an actor task, then this is the ID of + /// the corresponding actor creation task. Otherwise, this is nil. + /// \param actor_id The ID of the actor for the task. If this is not an actor + /// task, then this is nil. + /// \param actor_handle_id The ID of the actor handle that submitted this + /// task. If this is not an actor task, then this is nil. + /// \param actor_counter The number of tasks submitted before this task from + /// the same actor handle. If this is not an actor task, then this is 0. + /// \param function_id The ID of the function this task should execute. + /// \param task_arguments The list of task arguments. + /// \param num_returns The number of values returned by the task. + /// \param required_resources The task's resource demands. + /// \param required_placement_resources The resources required to place this + /// task on a node. Typically, this should be an empty map in which case it + /// will default to be equal to the required_resources argument. + /// \param language The language of the worker that must execute the function. + TaskSpecification( + const UniqueID &driver_id, const TaskID &parent_task_id, int64_t parent_counter, + const ActorID &actor_creation_id, const ObjectID &actor_creation_dummy_object_id, + const ActorID &actor_id, const ActorHandleID &actor_handle_id, + int64_t actor_counter, const FunctionID &function_id, + const std::vector> &task_arguments, + int64_t num_returns, + const std::unordered_map &required_resources, + const std::unordered_map &required_placement_resources, + const Language &language); + /// Deserialize a task specification from a flatbuffer's string data. /// /// \param string The string data for a serialized task specification @@ -141,7 +168,22 @@ class TaskSpecification { const uint8_t *ArgVal(int64_t arg_index) const; size_t ArgValLength(int64_t arg_index) const; double GetRequiredResource(const std::string &resource_name) const; + /// Return the resources that are to be acquired during the execution of this + /// task. + /// + /// \return The resources that will be acquired during the execution of this + /// task. const ResourceSet GetRequiredResources() const; + /// Return the resources that are required for a task to be placed on a node. + /// This will typically be the same as the resources acquired during execution + /// and will always be a superset of those resources. However, they may + /// differ, e.g., actor creation tasks may require more resources to be + /// scheduled on a machine because the actor creation task may require no + /// resources itself, but subsequent actor methods may require resources, and + /// so the placement of the actor should take this into account. + /// + /// \return The resources that are required to place a task on a node. + const ResourceSet GetRequiredPlacementResources() const; bool IsDriverTask() const; Language GetLanguage() const; diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 2a7e32123..abaf675ff 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -65,7 +65,7 @@ static inline TaskSpecification ExampleTaskSpec( const Language &language = Language::PYTHON) { return TaskSpecification(UniqueID::nil(), UniqueID::nil(), 0, ActorID::nil(), ObjectID::nil(), actor_id, ActorHandleID::nil(), 0, - FunctionID::nil(), {}, 0, {{}}, language); + FunctionID::nil(), {}, 0, {{}}, {{}}, language); } TEST_F(WorkerPoolTest, HandleWorkerRegistration) { diff --git a/test/failure_test.py b/test/failure_test.py index 3f631ae59..9cd25962b 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -561,6 +561,25 @@ def test_warning_for_infeasible_tasks(ray_start_regular): wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 2) +@pytest.mark.skipif( + os.environ.get("RAY_USE_XRAY") != "1", + reason="This test only works with xray.") +def test_warning_for_infeasible_zero_cpu_actor(shutdown_only): + # Check that we cannot place an actor on a 0 CPU machine and that we get an + # infeasibility warning (even though the actor creation task itself + # requires no CPUs). + + ray.init(num_cpus=0) + + @ray.remote + class Foo(object): + pass + + # The actor creation should be infeasible. + Foo.remote() + wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1) + + @pytest.fixture def ray_start_two_nodes(): # Start the Ray processes.