diff --git a/python/ray/worker.py b/python/ray/worker.py index a54fc4277..4299956ff 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1729,7 +1729,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): return ready_ids, remaining_ids -def wait_for_function(function_id, driver_id, timeout=5, worker=global_worker): +def wait_for_function(function_id, driver_id, timeout=10, + worker=global_worker): """Wait until the function to be executed is present on this worker. This method will simply loop until the import thread has imported the diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 384dab91e..633be8e46 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -99,8 +99,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) { PyTask *result = PyObject_New(PyTask, &PyTaskType); result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType); result->size = size; - result->spec = (TaskSpec *) malloc(size); - memcpy(result->spec, data, size); + result->spec = TaskSpec_copy((TaskSpec *) data, size); /* TODO(pcm): Use flatbuffers validation here. */ return (PyObject *) result; } diff --git a/src/common/task.cc b/src/common/task.cc index 7b1b7426d..1f3a1e5fb 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -288,6 +288,12 @@ bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) { return false; } +TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size) { + TaskSpec *copy = (TaskSpec *) malloc(task_spec_size); + memcpy(copy, spec, task_spec_size); + return copy; +} + void TaskSpec_free(TaskSpec *spec) { free(spec); } diff --git a/src/common/task.h b/src/common/task.h index 986717860..5bb4f3fee 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -291,6 +291,15 @@ ObjectID task_compute_put_id(TaskID task_id, int64_t put_index); */ void TaskSpec_print(TaskSpec *spec, UT_string *output); +/** + * Create a copy of the task spec. Must be freed with TaskSpec_free after use. + * + * @param spec The task specification that will be copied. + * @param task_spec_size The size of the task specification in bytes. + * @returns Pointer to the copy of the task specification. + */ +TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size); + /** * Free a task_spec. * @@ -355,7 +364,7 @@ Task *Task_alloc(TaskSpec *spec, DBClientID local_scheduler_id); /** - * Create a copy of the task. Must be freed with free_task after use. + * Create a copy of the task. Must be freed with Task_free after use. * * @param other The task that will be copied. * @returns Pointer to the copy of the task. diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 874bef5c7..a78ab6df8 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -112,8 +112,7 @@ struct SchedulingAlgorithmState { TaskQueueEntry TaskQueueEntry_init(TaskSpec *spec, int64_t task_spec_size) { TaskQueueEntry elt; - elt.spec = (TaskSpec *) malloc(task_spec_size); - memcpy(elt.spec, spec, task_spec_size); + elt.spec = TaskSpec_copy(spec, task_spec_size); elt.task_spec_size = task_spec_size; return elt; } @@ -833,8 +832,9 @@ void handle_task_submitted(LocalSchedulerState *state, void handle_actor_task_submitted(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, + TaskSpec *task_spec, int64_t task_spec_size) { + TaskSpec *spec = TaskSpec_copy(task_spec, task_spec_size); ActorID actor_id = TaskSpec_actor_id(spec); CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); @@ -865,6 +865,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state, state, algorithm_state, spec, task_spec_size, state->actor_mapping[actor_id].local_scheduler_id); } + TaskSpec_free(spec); } void handle_actor_creation_notification( diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index ca8955bfd..c2605eb3c 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -92,8 +92,8 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, auto reply_message = flatbuffers::GetRoot(message); /* Create a copy of the task spec so we can free the reply. */ *task_size = reply_message->task_spec()->size(); - TaskSpec *spec = (TaskSpec *) malloc(*task_size); - memcpy(spec, reply_message->task_spec()->data(), *task_size); + TaskSpec *data = (TaskSpec *) reply_message->task_spec()->data(); + TaskSpec *spec = TaskSpec_copy(data, *task_size); /* Free the original message from the local scheduler. */ free(message); /* Return the copy of the task spec and pass ownership to the caller. */