diff --git a/.travis/check-git-clang-format-output.sh b/.travis/check-git-clang-format-output.sh index 8a83b1a53..5f468a6c4 100755 --- a/.travis/check-git-clang-format-output.sh +++ b/.travis/check-git-clang-format-output.sh @@ -1,5 +1,5 @@ #!/bin/bash -set -e -x + if [ "$TRAVIS_PULL_REQUEST" == "false" ] ; then # Not in a pull request, so compare against parent commit base_commit="HEAD^" diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index befad89fe..4ccd84d81 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -862,6 +862,7 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker): if mode in [SCRIPT_MODE, SILENT_MODE]: worker.current_task_id = photon.ObjectID("".join(chr(i) for i in range(20))) worker.task_index = 0 + worker.put_index = 0 # If this is a worker, then start a thread to import exports from the driver. if mode == WORKER_MODE: t = threading.Thread(target=import_thread, args=(worker,)) @@ -998,9 +999,10 @@ def put(value, worker=global_worker): check_connected(worker) if worker.mode == PYTHON_MODE: return value # In PYTHON_MODE, ray.put is the identity operation - objectid = random_object_id() - worker.put_object(objectid, value) - return objectid + object_id = photon.compute_put_id(worker.current_task_id, worker.put_index) + worker.put_object(object_id, value) + worker.put_index += 1 + return object_id def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): """Return a list of IDs that are ready and a list of IDs that are not ready. @@ -1075,6 +1077,7 @@ def main_loop(worker=global_worker): """ worker.current_task_id = task.task_id() worker.task_index = 0 + worker.put_index = 0 function_id = task.function_id() args = task.arguments() return_object_ids = task.returns() diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.c index 5aebc6283..1dd8726f3 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.c @@ -258,9 +258,9 @@ static PyObject *PyTask_task_id(PyObject *self) { } static PyObject *PyTask_arguments(PyObject *self) { - int64_t num_args = task_num_args(((PyTask *) self)->spec); - PyObject *arg_list = PyList_New((Py_ssize_t) num_args); task_spec *task = ((PyTask *) self)->spec; + int64_t num_args = task_num_args(task); + PyObject *arg_list = PyList_New((Py_ssize_t) num_args); for (int i = 0; i < num_args; ++i) { if (task_arg_type(task, i) == ARG_BY_REF) { object_id object_id = task_arg_id(task, i); @@ -281,9 +281,9 @@ static PyObject *PyTask_arguments(PyObject *self) { } static PyObject *PyTask_returns(PyObject *self) { - int64_t num_returns = task_num_returns(((PyTask *) self)->spec); - PyObject *return_id_list = PyList_New((Py_ssize_t) num_returns); task_spec *task = ((PyTask *) self)->spec; + int64_t num_returns = task_num_returns(task); + PyObject *return_id_list = PyList_New((Py_ssize_t) num_returns); for (int i = 0; i < num_returns; ++i) { object_id object_id = task_return(task, i); PyList_SetItem(return_id_list, i, PyObjectID_make(object_id)); @@ -431,3 +431,14 @@ PyObject *check_simple_value(PyObject *self, PyObject *args) { } Py_RETURN_FALSE; } + +PyObject *compute_put_id(PyObject *self, PyObject *args) { + int put_index; + task_id task_id; + if (!PyArg_ParseTuple(args, "O&i", &PyObjectToUniqueID, &task_id, + &put_index)) { + return NULL; + } + object_id put_id = task_compute_put_id(task_id, put_index); + return PyObjectID_make(put_id); +} diff --git a/src/common/lib/python/common_extension.h b/src/common/lib/python/common_extension.h index a34fca5ff..3fc03f78c 100644 --- a/src/common/lib/python/common_extension.h +++ b/src/common/lib/python/common_extension.h @@ -39,6 +39,8 @@ PyObject *PyObjectID_make(object_id object_id); PyObject *check_simple_value(PyObject *self, PyObject *args); +PyObject *compute_put_id(PyObject *self, PyObject *args); + PyObject *PyTask_make(task_spec *task_spec); #endif /* COMMON_EXTENSION_H */ diff --git a/src/common/lib/python/common_module.c b/src/common/lib/python/common_module.c index 6ae8b2eba..a1e6aea6c 100644 --- a/src/common/lib/python/common_module.c +++ b/src/common/lib/python/common_module.c @@ -5,6 +5,8 @@ static PyMethodDef common_methods[] = { {"check_simple_value", check_simple_value, METH_VARARGS, "Should the object be passed by value?"}, + {"compute_put_id", compute_put_id, METH_VARARGS, + "Return the object ID for a put call within a task."}, {NULL} /* Sentinel */ }; diff --git a/src/common/task.c b/src/common/task.c index 9d8a8a501..fa514d7da 100644 --- a/src/common/task.c +++ b/src/common/task.c @@ -112,7 +112,10 @@ task_id compute_task_id(task_spec *spec) { return task_id; } -object_id compute_return_id(task_id task_id, int64_t return_index) { +object_id task_compute_return_id(task_id task_id, int64_t return_index) { + /* Here, return_indices need to be >= 0, so we can use negative + * indices for put. */ + DCHECK(return_index >= 0); /* TODO(rkn): This line requires object and task IDs to be the same size. */ object_id return_id = task_id; int64_t *first_bytes = (int64_t *) &return_id; @@ -122,6 +125,17 @@ object_id compute_return_id(task_id task_id, int64_t return_index) { return return_id; } +object_id task_compute_put_id(task_id task_id, int64_t put_index) { + DCHECK(put_index >= 0); + /* TODO(pcm): This line requires object and task IDs to be the same size. */ + object_id put_id = task_id; + int64_t *first_bytes = (int64_t *) &put_id; + /* XOR the first bytes of the object ID with the return index. We add one so + * the first return ID is not the same as the task ID. */ + *first_bytes = *first_bytes ^ (-put_index - 1); + return put_id; +} + task_spec *start_construct_task_spec(task_id parent_task_id, int64_t parent_counter, function_id function_id, @@ -151,7 +165,7 @@ void finish_construct_task_spec(task_spec *spec) { spec->task_id = compute_task_id(spec); /* Set the object IDs for the return values. */ for (int64_t i = 0; i < spec->num_returns; ++i) { - *task_return_ptr(spec, i) = compute_return_id(spec->task_id, i); + *task_return_ptr(spec, i) = task_compute_return_id(spec->task_id, i); } } diff --git a/src/common/task.h b/src/common/task.h index 757cb43a3..d9a2ca41f 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -220,6 +220,15 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length); */ object_id task_return(task_spec *spec, int64_t return_index); +/** + * Compute the object id associated to a put call. + * + * @param task_id The task id of the parent task that called the put. + * @param put_index The number of put calls in this task so far. + * @return The object ID for the object that was put. + */ +object_id task_compute_put_id(task_id task_id, int64_t put_index); + /** * Free a task_spec. * diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c index 0e874750d..599743fba 100644 --- a/src/photon/photon_extension.c +++ b/src/photon/photon_extension.c @@ -116,6 +116,8 @@ static PyTypeObject PyPhotonClientType = { static PyMethodDef photon_methods[] = { {"check_simple_value", check_simple_value, METH_VARARGS, "Should the object be passed by value?"}, + {"compute_put_id", compute_put_id, METH_VARARGS, + "Return the object ID for a put call within a task."}, {NULL} /* Sentinel */ };