diff --git a/python/ray/worker.py b/python/ray/worker.py index ee4ecbedf..1bc4a507b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -615,7 +615,8 @@ class Worker(object): actor_counter, is_actor_checkpoint_method, execution_dependencies, - resources) + resources, + self.use_raylet) # Increment the worker's task index to track how many tasks have # been submitted by the current task so far. self.task_index += 1 @@ -794,7 +795,7 @@ class Worker(object): self.current_task_id = task.task_id() self.current_function_id = task.function_id().id() self.task_index = 0 - self.put_index = 0 + self.put_index = 1 function_id = task.function_id() args = task.arguments() return_object_ids = task.returns() @@ -1908,6 +1909,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.connected = True worker.set_mode(mode) worker.use_raylet = use_raylet + # The worker.events field is used to aggregate logging information and # display it in the web UI. Note that Python lists protected by the GIL, # which is important because we will append to this field from multiple @@ -2041,7 +2043,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, np.random.set_state(numpy_state) # Set other fields needed for computing task IDs. worker.task_index = 0 - worker.put_index = 0 + worker.put_index = 1 # Create an entry for the driver task in the task table. This task is # added immediately with status RUNNING. This allows us to push errors @@ -2050,6 +2052,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, # user that we're unable to reconstruct the object, since we cannot # rerun the driver. nil_actor_counter = 0 + driver_task = ray.local_scheduler.Task( worker.task_driver_id, ray.local_scheduler.ObjectID(NIL_FUNCTION_ID), @@ -2064,7 +2067,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, nil_actor_counter, False, [], - {"CPU": 0}) + {"CPU": 0}, + worker.use_raylet) global_state._execute_command( driver_task.task_id(), "RAY.TASK_TABLE_ADD", @@ -2417,7 +2421,7 @@ def put(value, worker=global_worker): # In PYTHON_MODE, ray.put is the identity operation. return value object_id = worker.local_scheduler_client.compute_put_id( - worker.current_task_id, worker.put_index) + worker.current_task_id, worker.put_index, worker.use_raylet) worker.put_object(object_id, value) worker.put_index += 1 return object_id diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 645e20621..0ef0bb00f 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -5,6 +5,7 @@ #include "common.h" #include "common_extension.h" #include "common_protocol.h" +#include "ray/raylet/task_spec.h" #include "task.h" #include @@ -63,6 +64,10 @@ int PyObjectToUniqueID(PyObject *object, ObjectID *objectid) { } } +bool use_raylet(PyTask *task) { + return task->spec == nullptr; +} + static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) { const char *data; int size; @@ -278,7 +283,7 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { /* How many tasks have been launched on the actor so far? */ int actor_counter = 0; /* True if this is an actor checkpoint task and false otherwise. */ - PyObject *is_actor_checkpoint_method_object = NULL; + PyObject *is_actor_checkpoint_method_object = nullptr; /* ID of the function this task executes. */ FunctionID function_id; /* Arguments of the task (can be PyObjectIDs or Python values). */ @@ -295,54 +300,36 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { ObjectID actor_creation_dummy_object_id = ObjectID::nil(); /* Arguments of the task that are execution-dependent. These must be * PyObjectIDs). */ - PyObject *execution_arguments = NULL; + PyObject *execution_arguments = nullptr; /* Dictionary of resource requirements for this task. */ - PyObject *resource_map = NULL; - if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&O&O&iOOO", &PyObjectToUniqueID, - &driver_id, &PyObjectToUniqueID, &function_id, - &arguments, &num_returns, &PyObjectToUniqueID, - &parent_task_id, &parent_counter, &PyObjectToUniqueID, - &actor_creation_id, &PyObjectToUniqueID, - &actor_creation_dummy_object_id, &PyObjectToUniqueID, - &actor_id, &PyObjectToUniqueID, &actor_handle_id, - &actor_counter, &is_actor_checkpoint_method_object, - &execution_arguments, &resource_map)) { + PyObject *resource_map = nullptr; + // True if we should use the raylet code path and false otherwise. + PyObject *use_raylet_object = nullptr; + if (!PyArg_ParseTuple( + args, "O&O&OiO&i|O&O&O&O&iOOOO", &PyObjectToUniqueID, &driver_id, + &PyObjectToUniqueID, &function_id, &arguments, &num_returns, + &PyObjectToUniqueID, &parent_task_id, &parent_counter, + &PyObjectToUniqueID, &actor_creation_id, &PyObjectToUniqueID, + &actor_creation_dummy_object_id, &PyObjectToUniqueID, &actor_id, + &PyObjectToUniqueID, &actor_handle_id, &actor_counter, + &is_actor_checkpoint_method_object, &execution_arguments, + &resource_map, &use_raylet_object)) { return -1; } bool is_actor_checkpoint_method = false; - if (is_actor_checkpoint_method_object != NULL && + if (is_actor_checkpoint_method_object != nullptr && PyObject_IsTrue(is_actor_checkpoint_method_object) == 1) { is_actor_checkpoint_method = true; } - Py_ssize_t size = PyList_Size(arguments); - /* Construct the task specification. */ - TaskSpec_start_construct( - g_task_builder, driver_id, parent_task_id, parent_counter, - actor_creation_id, actor_creation_dummy_object_id, actor_id, - actor_handle_id, actor_counter, is_actor_checkpoint_method, function_id, - num_returns); - /* Add the task arguments. */ - for (Py_ssize_t i = 0; i < size; ++i) { - PyObject *arg = PyList_GetItem(arguments, i); - if (PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) { - TaskSpec_args_add_ref(g_task_builder, &((PyObjectID *) arg)->object_id, - 1); - } else { - /* We do this check because we cast a signed int to an unsigned int. */ - PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, - arg, pickle_protocol, NULL); - TaskSpec_args_add_val(g_task_builder, (uint8_t *) PyBytes_AsString(data), - PyBytes_Size(data)); - Py_DECREF(data); - } - } - /* Set the resource requirements for the task. */ + // Parse the resource map. + std::unordered_map required_resources; + bool found_CPU_requirements = false; PyObject *key, *value; Py_ssize_t position = 0; - if (resource_map != NULL) { + if (resource_map != nullptr) { if (!PyDict_Check(resource_map)) { PyErr_SetString(PyExc_TypeError, "resource_map must be a dictionary"); return -1; @@ -373,22 +360,92 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { if (resource_name == std::string("CPU")) { found_CPU_requirements = true; } - TaskSpec_set_required_resource(g_task_builder, resource_name, - PyFloat_AsDouble(value)); + required_resources[resource_name] = PyFloat_AsDouble(value); } } if (!found_CPU_requirements) { - TaskSpec_set_required_resource(g_task_builder, "CPU", 1.0); + required_resources["CPU"] = 1.0; } - /* Compute the task ID and the return object IDs. */ - self->spec = TaskSpec_finish_construct(g_task_builder, &self->size); + Py_ssize_t num_args = PyList_Size(arguments); + + bool use_raylet = false; + if (use_raylet_object != nullptr && PyObject_IsTrue(use_raylet_object) == 1) { + use_raylet = true; + } + self->spec = nullptr; + self->task_spec = nullptr; + + // Create the task spec. + if (!use_raylet) { + // The non-raylet code path. + + // Construct the task specification. + TaskSpec_start_construct( + g_task_builder, driver_id, parent_task_id, parent_counter, + actor_creation_id, actor_creation_dummy_object_id, actor_id, + actor_handle_id, actor_counter, is_actor_checkpoint_method, function_id, + num_returns); + // Add the task arguments. + for (Py_ssize_t i = 0; i < num_args; ++i) { + PyObject *arg = PyList_GetItem(arguments, i); + if (PyObject_IsInstance(arg, + reinterpret_cast(&PyObjectIDType))) { + TaskSpec_args_add_ref(g_task_builder, + &(reinterpret_cast(arg))->object_id, + 1); + } else { + PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, + arg, pickle_protocol, NULL); + TaskSpec_args_add_val( + g_task_builder, reinterpret_cast(PyBytes_AsString(data)), + PyBytes_Size(data)); + Py_DECREF(data); + } + } + // Set the resource requirements for the task. + for (auto const &resource_pair : required_resources) { + TaskSpec_set_required_resource(g_task_builder, resource_pair.first, + resource_pair.second); + } + + // Compute the task ID and the return object IDs. + self->spec = TaskSpec_finish_construct(g_task_builder, &self->size); + + } else { + // The raylet code path. + + // Parse the arguments from the list. + std::vector> args; + for (Py_ssize_t i = 0; i < num_args; ++i) { + PyObject *arg = PyList_GetItem(arguments, i); + if (PyObject_IsInstance(arg, + reinterpret_cast(&PyObjectIDType))) { + std::vector references = { + reinterpret_cast(arg)->object_id}; + args.push_back( + std::make_shared(references)); + } else { + PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, + arg, pickle_protocol, NULL); + args.push_back(std::make_shared( + reinterpret_cast(PyBytes_AsString(data)), + PyBytes_Size(data))); + Py_DECREF(data); + } + } + + self->task_spec = new ray::raylet::TaskSpecification( + driver_id, parent_task_id, parent_counter, actor_creation_id, + actor_creation_dummy_object_id, actor_id, actor_handle_id, + actor_counter, function_id, args, num_returns, required_resources); + } /* Set the task's execution dependencies. */ self->execution_dependencies = new std::vector(); if (execution_arguments != NULL) { - size = PyList_Size(execution_arguments); - for (Py_ssize_t i = 0; i < size; ++i) { + Py_ssize_t num_execution_args = PyList_Size(execution_arguments); + for (Py_ssize_t i = 0; i < num_execution_args; ++i) { PyObject *execution_arg = PyList_GetItem(execution_arguments, i); if (!PyObject_IsInstance(execution_arg, (PyObject *) &PyObjectIDType)) { PyErr_SetString(PyExc_TypeError, @@ -404,63 +461,133 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { } static void PyTask_dealloc(PyTask *self) { - if (self->spec != NULL) { + if (!use_raylet(self)) { TaskSpec_free(self->spec); + } else { + delete self->task_spec; } delete self->execution_dependencies; - Py_TYPE(self)->tp_free((PyObject *) self); + Py_TYPE(self)->tp_free(reinterpret_cast(self)); } -static PyObject *PyTask_function_id(PyObject *self) { - FunctionID function_id = TaskSpec_function(((PyTask *) self)->spec); +static PyObject *PyTask_function_id(PyTask *self) { + FunctionID function_id; + if (!use_raylet(self)) { + function_id = TaskSpec_function(self->spec); + } else { + function_id = self->task_spec->FunctionId(); + } return PyObjectID_make(function_id); } -static PyObject *PyTask_actor_id(PyObject *self) { - ActorID actor_id = TaskSpec_actor_id(((PyTask *) self)->spec); +static PyObject *PyTask_actor_id(PyTask *self) { + ActorID actor_id; + if (!use_raylet(self)) { + actor_id = TaskSpec_actor_id(self->spec); + } else { + actor_id = self->task_spec->ActorId(); + } return PyObjectID_make(actor_id); } -static PyObject *PyTask_actor_counter(PyObject *self) { - int64_t actor_counter = TaskSpec_actor_counter(((PyTask *) self)->spec); +static PyObject *PyTask_actor_counter(PyTask *self) { + int64_t actor_counter; + if (!use_raylet(self)) { + actor_counter = TaskSpec_actor_counter(self->spec); + } else { + actor_counter = self->task_spec->ActorCounter(); + } return PyLong_FromLongLong(actor_counter); } -static PyObject *PyTask_driver_id(PyObject *self) { - UniqueID driver_id = TaskSpec_driver_id(((PyTask *) self)->spec); +static PyObject *PyTask_driver_id(PyTask *self) { + UniqueID driver_id; + if (!use_raylet(self)) { + driver_id = TaskSpec_driver_id(self->spec); + } else { + driver_id = self->task_spec->DriverId(); + } return PyObjectID_make(driver_id); } -static PyObject *PyTask_task_id(PyObject *self) { - TaskID task_id = TaskSpec_task_id(((PyTask *) self)->spec); +static PyObject *PyTask_task_id(PyTask *self) { + TaskID task_id; + if (!use_raylet(self)) { + task_id = TaskSpec_task_id(self->spec); + } else { + task_id = self->task_spec->TaskId(); + } return PyObjectID_make(task_id); } -static PyObject *PyTask_parent_task_id(PyObject *self) { - TaskID task_id = TaskSpec_parent_task_id(((PyTask *) self)->spec); +static PyObject *PyTask_parent_task_id(PyTask *self) { + TaskID task_id; + if (!use_raylet(self)) { + task_id = TaskSpec_parent_task_id(self->spec); + } else { + task_id = self->task_spec->ParentTaskId(); + } return PyObjectID_make(task_id); } -static PyObject *PyTask_parent_counter(PyObject *self) { - int64_t parent_counter = TaskSpec_parent_counter(((PyTask *) self)->spec); +static PyObject *PyTask_parent_counter(PyTask *self) { + int64_t parent_counter; + if (!use_raylet(self)) { + parent_counter = TaskSpec_parent_counter(self->spec); + } else { + parent_counter = self->task_spec->ParentCounter(); + } return PyLong_FromLongLong(parent_counter); } -static PyObject *PyTask_arguments(PyObject *self) { - TaskSpec *task = ((PyTask *) self)->spec; - int64_t num_args = TaskSpec_num_args(task); +static PyObject *PyTask_arguments(PyTask *self) { + TaskSpec *task = self->spec; + ray::raylet::TaskSpecification *task_spec = self->task_spec; + + int64_t num_args; + if (!use_raylet(self)) { + num_args = TaskSpec_num_args(task); + } else { + num_args = self->task_spec->NumArgs(); + } + PyObject *arg_list = PyList_New((Py_ssize_t) num_args); for (int i = 0; i < num_args; ++i) { - int count = TaskSpec_arg_id_count(task, i); + int count; + if (!use_raylet(self)) { + count = TaskSpec_arg_id_count(task, i); + } else { + count = task_spec->ArgIdCount(i); + } + if (count > 0) { assert(count == 1); - PyList_SetItem(arg_list, i, PyObjectID_make(TaskSpec_arg_id(task, i, 0))); + + ObjectID object_id; + if (!use_raylet(self)) { + object_id = TaskSpec_arg_id(task, i, 0); + } else { + object_id = task_spec->ArgId(i, 0); + } + + PyList_SetItem(arg_list, i, PyObjectID_make(object_id)); } else { RAY_CHECK(pickle_module != NULL); RAY_CHECK(pickle_loads != NULL); + + const uint8_t *arg_val; + int64_t arg_length; + if (!use_raylet(self)) { + arg_val = TaskSpec_arg_val(task, i); + arg_length = TaskSpec_arg_length(task, i); + } else { + arg_val = task_spec->ArgVal(i); + arg_length = task_spec->ArgValLength(i); + } + PyObject *str = - PyBytes_FromStringAndSize((char *) TaskSpec_arg_val(task, i), - (Py_ssize_t) TaskSpec_arg_length(task, i)); + PyBytes_FromStringAndSize(reinterpret_cast(arg_val), + static_cast(arg_length)); PyObject *val = PyObject_CallMethodObjArgs(pickle_module, pickle_loads, str, NULL); Py_XDECREF(str); @@ -470,25 +597,43 @@ static PyObject *PyTask_arguments(PyObject *self) { return arg_list; } -static PyObject *PyTask_actor_creation_id(PyObject *self) { - ActorID actor_creation_id = - TaskSpec_actor_creation_id(((PyTask *) self)->spec); +static PyObject *PyTask_actor_creation_id(PyTask *self) { + ActorID actor_creation_id; + if (!use_raylet(self)) { + actor_creation_id = TaskSpec_actor_creation_id(self->spec); + } else { + actor_creation_id = self->task_spec->ActorCreationId(); + } return PyObjectID_make(actor_creation_id); } -static PyObject *PyTask_actor_creation_dummy_object_id(PyObject *self) { - ActorID actor_creation_dummy_object_id = ActorID::nil(); - if (TaskSpec_is_actor_task(((PyTask *) self)->spec)) { +static PyObject *PyTask_actor_creation_dummy_object_id(PyTask *self) { + ObjectID actor_creation_dummy_object_id; + if (!use_raylet(self)) { + if (TaskSpec_is_actor_task(self->spec)) { + actor_creation_dummy_object_id = + TaskSpec_actor_creation_dummy_object_id(self->spec); + } else { + actor_creation_dummy_object_id = ObjectID::nil(); + } + } else { actor_creation_dummy_object_id = - TaskSpec_actor_creation_dummy_object_id(((PyTask *) self)->spec); + self->task_spec->ActorCreationDummyObjectId(); } return PyObjectID_make(actor_creation_dummy_object_id); } -static PyObject *PyTask_required_resources(PyObject *self) { - TaskSpec *task = ((PyTask *) self)->spec; +static PyObject *PyTask_required_resources(PyTask *self) { PyObject *required_resources = PyDict_New(); - for (auto const &resource_pair : TaskSpec_get_required_resources(task)) { + + std::unordered_map resource_map; + if (!use_raylet(self)) { + resource_map = TaskSpec_get_required_resources(self->spec); + } else { + resource_map = self->task_spec->GetRequiredResources().GetResourceMap(); + } + + for (auto const &resource_pair : resource_map) { std::string resource_name = resource_pair.first; #if PY_MAJOR_VERSION >= 3 PyObject *key = @@ -505,12 +650,25 @@ static PyObject *PyTask_required_resources(PyObject *self) { return required_resources; } -static PyObject *PyTask_returns(PyObject *self) { - TaskSpec *task = ((PyTask *) self)->spec; - int64_t num_returns = TaskSpec_num_returns(task); +static PyObject *PyTask_returns(PyTask *self) { + TaskSpec *task = self->spec; + ray::raylet::TaskSpecification *task_spec = self->task_spec; + + int64_t num_returns; + if (!use_raylet(self)) { + num_returns = TaskSpec_num_returns(task); + } else { + num_returns = task_spec->NumReturns(); + } + PyObject *return_id_list = PyList_New((Py_ssize_t) num_returns); for (int i = 0; i < num_returns; ++i) { - ObjectID object_id = TaskSpec_return(task, i); + ObjectID object_id; + if (!use_raylet(self)) { + object_id = TaskSpec_return(task, i); + } else { + object_id = task_spec->ReturnId(i); + } PyList_SetItem(return_id_list, i, PyObjectID_make(object_id)); } return return_id_list; diff --git a/src/common/lib/python/common_extension.h b/src/common/lib/python/common_extension.h index c72f3e8ea..81a9f0f48 100644 --- a/src/common/lib/python/common_extension.h +++ b/src/common/lib/python/common_extension.h @@ -8,6 +8,7 @@ #include "structmember.h" #include "common.h" +#include "ray/raylet/task_spec.h" typedef char TaskSpec; class TaskBuilder; @@ -23,7 +24,10 @@ typedef struct { typedef struct { PyObject_HEAD int64_t size; + // The task spec to use in the non-raylet case. TaskSpec *spec; + // The task spec to use in the raylet case. + ray::raylet::TaskSpecification *task_spec; std::vector *execution_dependencies; } PyTask; // clang-format on @@ -32,6 +36,8 @@ extern PyTypeObject PyObjectIDType; extern PyTypeObject PyTaskType; +bool use_raylet(PyTask *task); + /* Python module for pickling. */ extern PyObject *pickle_module; extern PyObject *pickle_dumps; diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index eda4f2ff7..2d134d9a9 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -65,8 +65,9 @@ void local_scheduler_submit(LocalSchedulerConnection *conn, flatbuffers::FlatBufferBuilder fbb; auto execution_dependencies = to_flatbuf(fbb, execution_spec.ExecutionDependencies()); - auto task_spec = fbb.CreateString((char *) execution_spec.Spec(), - execution_spec.SpecSize()); + auto task_spec = + fbb.CreateString(reinterpret_cast(execution_spec.Spec()), + execution_spec.SpecSize()); auto message = CreateSubmitTaskRequest(fbb, execution_dependencies, task_spec); fbb.Finish(message); @@ -74,6 +75,19 @@ void local_scheduler_submit(LocalSchedulerConnection *conn, fbb.GetBufferPointer()); } +void local_scheduler_submit_raylet( + LocalSchedulerConnection *conn, + const std::vector &execution_dependencies, + ray::raylet::TaskSpecification task_spec) { + flatbuffers::FlatBufferBuilder fbb; + auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); + auto message = CreateSubmitTaskRequest(fbb, execution_dependencies_message, + task_spec.ToFlatbuffer(fbb)); + fbb.Finish(message); + write_message(conn->conn, MessageType_SubmitTask, fbb.GetSize(), + fbb.GetBufferPointer()); +} + TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, int64_t *task_size) { write_message(conn->conn, MessageType_GetTask, 0, NULL); diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index f9bffc18d..7b834a09c 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -3,6 +3,7 @@ #include "common/task.h" #include "local_scheduler_shared.h" +#include "ray/raylet/task_spec.h" struct LocalSchedulerConnection { /** File descriptor of the Unix domain socket that connects to local @@ -45,6 +46,17 @@ void LocalSchedulerConnection_free(LocalSchedulerConnection *conn); void local_scheduler_submit(LocalSchedulerConnection *conn, TaskExecutionSpec &execution_spec); +/// Submit a task using the raylet code path. +/// +/// \param The connection information. +/// \param The execution dependencies. +/// \param The task specification. +/// \return Void. +void local_scheduler_submit_raylet( + LocalSchedulerConnection *conn, + const std::vector &execution_dependencies, + ray::raylet::TaskSpecification task_spec); + /** * Notify the local scheduler that this client is disconnecting gracefully. This * is used by actors to exit gracefully so that the local scheduler doesn't diff --git a/src/local_scheduler/local_scheduler_extension.cc b/src/local_scheduler/local_scheduler_extension.cc index ae91c1a8e..2bf8e8c31 100644 --- a/src/local_scheduler/local_scheduler_extension.cc +++ b/src/local_scheduler/local_scheduler_extension.cc @@ -49,12 +49,20 @@ static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) { if (!PyArg_ParseTuple(args, "O", &py_task)) { return NULL; } - PyTask *task = (PyTask *) py_task; - TaskExecutionSpec execution_spec = - TaskExecutionSpec(*task->execution_dependencies, task->spec, task->size); - local_scheduler_submit( - ((PyLocalSchedulerClient *) self)->local_scheduler_connection, - execution_spec); + LocalSchedulerConnection *connection = + reinterpret_cast(self) + ->local_scheduler_connection; + PyTask *task = reinterpret_cast(py_task); + + if (!use_raylet(task)) { + TaskExecutionSpec execution_spec = TaskExecutionSpec( + *task->execution_dependencies, task->spec, task->size); + local_scheduler_submit(connection, execution_spec); + } else { + local_scheduler_submit_raylet(connection, *task->execution_dependencies, + *task->task_spec); + } + Py_RETURN_NONE; } @@ -111,14 +119,22 @@ static PyObject *PyLocalSchedulerClient_compute_put_id(PyObject *self, PyObject *args) { int put_index; TaskID task_id; - if (!PyArg_ParseTuple(args, "O&i", &PyObjectToUniqueID, &task_id, - &put_index)) { + PyObject *use_raylet; + if (!PyArg_ParseTuple(args, "O&iO", &PyObjectToUniqueID, &task_id, &put_index, + &use_raylet)) { return NULL; } - ObjectID put_id = task_compute_put_id(task_id, put_index); - local_scheduler_put_object( - ((PyLocalSchedulerClient *) self)->local_scheduler_connection, task_id, - put_id); + ObjectID put_id; + if (!PyObject_IsTrue(use_raylet)) { + put_id = task_compute_put_id(task_id, put_index); + local_scheduler_put_object( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection, task_id, + put_id); + } else { + // TODO(rkn): Raise an exception if the put index is not a valid value + // instead of crashing in ComputePutId. + put_id = ray::ComputePutId(task_id, put_index); + } return PyObjectID_make(put_id); } diff --git a/src/ray/id.cc b/src/ray/id.cc index f3c2b3212..e872aa294 100644 --- a/src/ray/id.cc +++ b/src/ray/id.cc @@ -86,7 +86,7 @@ std::ostream &operator<<(std::ostream &os, const UniqueID &id) { return os; } -const ObjectID ComputeObjectId(TaskID task_id, int64_t object_index) { +const ObjectID ComputeObjectId(const TaskID &task_id, int64_t object_index) { RAY_CHECK(object_index <= kMaxTaskReturns && object_index >= -kMaxTaskPuts); ObjectID return_id = task_id; int64_t *first_bytes = reinterpret_cast(&return_id); @@ -101,12 +101,12 @@ const ObjectID ComputeObjectId(TaskID task_id, int64_t object_index) { const TaskID FinishTaskId(const TaskID &task_id) { return ComputeObjectId(task_id, 0); } -const ObjectID ComputeReturnId(TaskID task_id, int64_t return_index) { +const ObjectID ComputeReturnId(const TaskID &task_id, int64_t return_index) { RAY_CHECK(return_index >= 1 && return_index <= kMaxTaskReturns); return ComputeObjectId(task_id, return_index); } -const ObjectID ComputePutId(TaskID task_id, int64_t put_index) { +const ObjectID ComputePutId(const TaskID &task_id, int64_t put_index) { RAY_CHECK(put_index >= 1 && put_index <= kMaxTaskPuts); return ComputeObjectId(task_id, -1 * put_index); } diff --git a/src/ray/id.h b/src/ray/id.h index db8958cc8..51aa52f93 100644 --- a/src/ray/id.h +++ b/src/ray/id.h @@ -73,14 +73,14 @@ const TaskID FinishTaskId(const TaskID &task_id); /// \param task_id The task ID of the task that created the object. /// \param put_index What number return value this object is in the task. /// \return The computed object ID. -const ObjectID ComputeReturnId(TaskID task_id, int64_t return_index); +const ObjectID ComputeReturnId(const TaskID &task_id, int64_t return_index); /// Compute the object ID of an object put by the task. /// /// \param task_id The task ID of the task that created the object. /// \param put_index What number put this object was created by in the task. /// \return The computed object ID. -const ObjectID ComputePutId(TaskID task_id, int64_t put_index); +const ObjectID ComputePutId(const TaskID &task_id, int64_t put_index); /// Compute the task ID of the task that created the object. /// diff --git a/test/xray_test.py b/test/xray_test.py index 034099db6..a1fdcfbaf 100644 --- a/test/xray_test.py +++ b/test/xray_test.py @@ -5,6 +5,8 @@ from __future__ import print_function import pytest import ray +test_values = [1, 1.0, "test", b"test", (0, 1), [0, 1], {0: 1}] + @pytest.fixture def ray_start(): @@ -40,8 +42,7 @@ def test_basic_task_api(ray_start): def f_args_by_value(x): return x - args = [1, 1.0, "test", b"test", (0, 1), [0, 1], {0: 1}] - for arg in args: + for arg in test_values: assert ray.get(f_args_by_value.remote(arg)) == arg # Test arguments passed by ID. @@ -49,6 +50,17 @@ def test_basic_task_api(ray_start): # Test keyword arguments. +def test_put_api(ray_start): + + for obj in test_values: + assert ray.get(ray.put(obj)) == obj + + # Test putting object IDs. + x_id = ray.put(0) + for obj in [[x_id], (x_id,), {x_id: x_id}]: + assert ray.get(ray.put(obj)) == obj + + def test_actor_api(ray_start): @ray.remote