mirror of
https://github.com/wassname/ray.git
synced 2026-07-05 22:54:17 +08:00
Fix seed bug for generating object ids for put (#120)
* fix seed bug for generating object ids for put * fix clang-format
This commit is contained in:
committed by
Robert Nishihara
parent
24d2b42d86
commit
2152cd9f31
@@ -1,5 +1,5 @@
|
||||
#!/bin/bash
|
||||
set -e -x
|
||||
|
||||
if [ "$TRAVIS_PULL_REQUEST" == "false" ] ; then
|
||||
# Not in a pull request, so compare against parent commit
|
||||
base_commit="HEAD^"
|
||||
|
||||
@@ -862,6 +862,7 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker):
|
||||
if mode in [SCRIPT_MODE, SILENT_MODE]:
|
||||
worker.current_task_id = photon.ObjectID("".join(chr(i) for i in range(20)))
|
||||
worker.task_index = 0
|
||||
worker.put_index = 0
|
||||
# If this is a worker, then start a thread to import exports from the driver.
|
||||
if mode == WORKER_MODE:
|
||||
t = threading.Thread(target=import_thread, args=(worker,))
|
||||
@@ -998,9 +999,10 @@ def put(value, worker=global_worker):
|
||||
check_connected(worker)
|
||||
if worker.mode == PYTHON_MODE:
|
||||
return value # In PYTHON_MODE, ray.put is the identity operation
|
||||
objectid = random_object_id()
|
||||
worker.put_object(objectid, value)
|
||||
return objectid
|
||||
object_id = photon.compute_put_id(worker.current_task_id, worker.put_index)
|
||||
worker.put_object(object_id, value)
|
||||
worker.put_index += 1
|
||||
return object_id
|
||||
|
||||
def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
|
||||
"""Return a list of IDs that are ready and a list of IDs that are not ready.
|
||||
@@ -1075,6 +1077,7 @@ def main_loop(worker=global_worker):
|
||||
"""
|
||||
worker.current_task_id = task.task_id()
|
||||
worker.task_index = 0
|
||||
worker.put_index = 0
|
||||
function_id = task.function_id()
|
||||
args = task.arguments()
|
||||
return_object_ids = task.returns()
|
||||
|
||||
@@ -258,9 +258,9 @@ static PyObject *PyTask_task_id(PyObject *self) {
|
||||
}
|
||||
|
||||
static PyObject *PyTask_arguments(PyObject *self) {
|
||||
int64_t num_args = task_num_args(((PyTask *) self)->spec);
|
||||
PyObject *arg_list = PyList_New((Py_ssize_t) num_args);
|
||||
task_spec *task = ((PyTask *) self)->spec;
|
||||
int64_t num_args = task_num_args(task);
|
||||
PyObject *arg_list = PyList_New((Py_ssize_t) num_args);
|
||||
for (int i = 0; i < num_args; ++i) {
|
||||
if (task_arg_type(task, i) == ARG_BY_REF) {
|
||||
object_id object_id = task_arg_id(task, i);
|
||||
@@ -281,9 +281,9 @@ static PyObject *PyTask_arguments(PyObject *self) {
|
||||
}
|
||||
|
||||
static PyObject *PyTask_returns(PyObject *self) {
|
||||
int64_t num_returns = task_num_returns(((PyTask *) self)->spec);
|
||||
PyObject *return_id_list = PyList_New((Py_ssize_t) num_returns);
|
||||
task_spec *task = ((PyTask *) self)->spec;
|
||||
int64_t num_returns = task_num_returns(task);
|
||||
PyObject *return_id_list = PyList_New((Py_ssize_t) num_returns);
|
||||
for (int i = 0; i < num_returns; ++i) {
|
||||
object_id object_id = task_return(task, i);
|
||||
PyList_SetItem(return_id_list, i, PyObjectID_make(object_id));
|
||||
@@ -431,3 +431,14 @@ PyObject *check_simple_value(PyObject *self, PyObject *args) {
|
||||
}
|
||||
Py_RETURN_FALSE;
|
||||
}
|
||||
|
||||
PyObject *compute_put_id(PyObject *self, PyObject *args) {
|
||||
int put_index;
|
||||
task_id task_id;
|
||||
if (!PyArg_ParseTuple(args, "O&i", &PyObjectToUniqueID, &task_id,
|
||||
&put_index)) {
|
||||
return NULL;
|
||||
}
|
||||
object_id put_id = task_compute_put_id(task_id, put_index);
|
||||
return PyObjectID_make(put_id);
|
||||
}
|
||||
|
||||
@@ -39,6 +39,8 @@ PyObject *PyObjectID_make(object_id object_id);
|
||||
|
||||
PyObject *check_simple_value(PyObject *self, PyObject *args);
|
||||
|
||||
PyObject *compute_put_id(PyObject *self, PyObject *args);
|
||||
|
||||
PyObject *PyTask_make(task_spec *task_spec);
|
||||
|
||||
#endif /* COMMON_EXTENSION_H */
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
static PyMethodDef common_methods[] = {
|
||||
{"check_simple_value", check_simple_value, METH_VARARGS,
|
||||
"Should the object be passed by value?"},
|
||||
{"compute_put_id", compute_put_id, METH_VARARGS,
|
||||
"Return the object ID for a put call within a task."},
|
||||
{NULL} /* Sentinel */
|
||||
};
|
||||
|
||||
|
||||
+16
-2
@@ -112,7 +112,10 @@ task_id compute_task_id(task_spec *spec) {
|
||||
return task_id;
|
||||
}
|
||||
|
||||
object_id compute_return_id(task_id task_id, int64_t return_index) {
|
||||
object_id task_compute_return_id(task_id task_id, int64_t return_index) {
|
||||
/* Here, return_indices need to be >= 0, so we can use negative
|
||||
* indices for put. */
|
||||
DCHECK(return_index >= 0);
|
||||
/* TODO(rkn): This line requires object and task IDs to be the same size. */
|
||||
object_id return_id = task_id;
|
||||
int64_t *first_bytes = (int64_t *) &return_id;
|
||||
@@ -122,6 +125,17 @@ object_id compute_return_id(task_id task_id, int64_t return_index) {
|
||||
return return_id;
|
||||
}
|
||||
|
||||
object_id task_compute_put_id(task_id task_id, int64_t put_index) {
|
||||
DCHECK(put_index >= 0);
|
||||
/* TODO(pcm): This line requires object and task IDs to be the same size. */
|
||||
object_id put_id = task_id;
|
||||
int64_t *first_bytes = (int64_t *) &put_id;
|
||||
/* XOR the first bytes of the object ID with the return index. We add one so
|
||||
* the first return ID is not the same as the task ID. */
|
||||
*first_bytes = *first_bytes ^ (-put_index - 1);
|
||||
return put_id;
|
||||
}
|
||||
|
||||
task_spec *start_construct_task_spec(task_id parent_task_id,
|
||||
int64_t parent_counter,
|
||||
function_id function_id,
|
||||
@@ -151,7 +165,7 @@ void finish_construct_task_spec(task_spec *spec) {
|
||||
spec->task_id = compute_task_id(spec);
|
||||
/* Set the object IDs for the return values. */
|
||||
for (int64_t i = 0; i < spec->num_returns; ++i) {
|
||||
*task_return_ptr(spec, i) = compute_return_id(spec->task_id, i);
|
||||
*task_return_ptr(spec, i) = task_compute_return_id(spec->task_id, i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -220,6 +220,15 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length);
|
||||
*/
|
||||
object_id task_return(task_spec *spec, int64_t return_index);
|
||||
|
||||
/**
|
||||
* Compute the object id associated to a put call.
|
||||
*
|
||||
* @param task_id The task id of the parent task that called the put.
|
||||
* @param put_index The number of put calls in this task so far.
|
||||
* @return The object ID for the object that was put.
|
||||
*/
|
||||
object_id task_compute_put_id(task_id task_id, int64_t put_index);
|
||||
|
||||
/**
|
||||
* Free a task_spec.
|
||||
*
|
||||
|
||||
@@ -116,6 +116,8 @@ static PyTypeObject PyPhotonClientType = {
|
||||
static PyMethodDef photon_methods[] = {
|
||||
{"check_simple_value", check_simple_value, METH_VARARGS,
|
||||
"Should the object be passed by value?"},
|
||||
{"compute_put_id", compute_put_id, METH_VARARGS,
|
||||
"Return the object ID for a put call within a task."},
|
||||
{NULL} /* Sentinel */
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user