diff --git a/python/ray/actor.py b/python/ray/actor.py index 74d4f3853..f1675f0a2 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -113,12 +113,6 @@ def put_dummy_object(worker, dummy_object_id): worker.actor_pinned_objects[dummy_object_id] = dummy_object -def is_checkpoint_task(task_counter, checkpoint_interval): - if checkpoint_interval <= 0: - return False - return (task_counter % checkpoint_interval == 0) - - def make_actor_method_executor(worker, method_name, method): """Make an executor that wraps a user-defined actor method. @@ -333,6 +327,8 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): + if checkpoint_interval == 0: + raise Exception("checkpoint_interval must be greater than 0.") # Add one to the checkpoint interval since we will insert a mock task for # every checkpoint. checkpoint_interval += 1 @@ -621,23 +617,21 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): # task. args.append(dependency) - actor_counter = self._ray_actor_counter - # Mark checkpoint methods with a negative task counter. - if is_checkpoint_task(actor_counter, checkpoint_interval): - actor_counter = self._ray_actor_counter * -1 + is_actor_checkpoint_method = (method_name == "__ray_checkpoint__") function_id = get_actor_method_function_id(method_name) object_ids = ray.worker.global_worker.submit_task( function_id, args, actor_id=self._ray_actor_id, - actor_counter=actor_counter) + actor_counter=self._ray_actor_counter, + is_actor_checkpoint_method=is_actor_checkpoint_method) # Update the actor counter and cursor to reflect the most recent # invocation. self._ray_actor_counter += 1 self._ray_actor_cursor = object_ids.pop() - # Submit a checkpoint task if necessary. - if is_checkpoint_task(self._ray_actor_counter, - checkpoint_interval): + # Submit a checkpoint task if it is time to do so. + if (checkpoint_interval > 1 and + self._ray_actor_counter % checkpoint_interval == 0): self.__ray_checkpoint__.remote() # The last object returned is the dummy object that should be diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index ba898140d..f1a738060 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -170,7 +170,7 @@ class TestGlobalScheduler(unittest.TestCase): task2 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, local_scheduler.ObjectID(NIL_ACTOR_ID), - 0, [1.0, 2.0, 0.0]) + 0, 0, [1.0, 2.0, 0.0]) self.assertEqual(task2.required_resources(), [1.0, 2.0, 0.0]) def test_redis_only_single_task(self): diff --git a/python/ray/worker.py b/python/ray/worker.py index 6139e3c54..08ad0bb08 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -454,7 +454,8 @@ class Worker(object): assert len(final_results) == len(object_ids) return final_results - def submit_task(self, function_id, args, actor_id=None, actor_counter=0): + def submit_task(self, function_id, args, actor_id=None, actor_counter=0, + is_actor_checkpoint_method=False): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with ID @@ -462,9 +463,14 @@ class Worker(object): the function from the scheduler and immediately return them. Args: - args (List[Any]): The arguments to pass into the function. - Arguments can be object IDs or they can be values. If they are - values, they must be serializable objecs. + function_id: The ID of the function to execute. + args: The arguments to pass into the function. Arguments can be + object IDs or they can be values. If they are values, they must + be serializable objecs. + actor_id: The ID of the actor that this task is for. + actor_counter: The counter of the actor task. + is_actor_checkpoint_method: True if this is an actor checkpoint + task and false otherwise. """ with log_span("ray:submit_task", worker=self): check_main_thread() @@ -495,6 +501,7 @@ class Worker(object): self.task_index, actor_id, actor_counter, + is_actor_checkpoint_method, [function_properties.num_cpus, function_properties.num_gpus, function_properties.num_custom_resource]) # Increment the worker's task index to track how many tasks have @@ -1834,6 +1841,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.task_index, ray.local_scheduler.ObjectID(NIL_ACTOR_ID), nil_actor_counter, + False, [0, 0, 0]) global_state._execute_command( driver_task.task_id(), diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index ac188c519..81b126231 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -37,6 +37,8 @@ table TaskInfo { actor_id: string; // Number of tasks that have been submitted to this actor so far. actor_counter: int; + // True if this task is an actor checkpoint task and false otherwise. + is_actor_checkpoint_method: bool; // Function ID of the task. function_id: string; // Task arguments. diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index d4ef3f9d9..7d7c4b4fc 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -273,6 +273,8 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { UniqueID actor_id = NIL_ACTOR_ID; /* How many tasks have been launched on the actor so far? */ int actor_counter = 0; + /* True if this is an actor checkpoint task and false otherwise. */ + PyObject *is_actor_checkpoint_method_object = NULL; /* ID of the function this task executes. */ FunctionID function_id; /* Arguments of the task (can be PyObjectIDs or Python values). */ @@ -285,18 +287,26 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { int parent_counter; /* Resource vector of the required resources to execute this task. */ PyObject *resource_vector = NULL; - if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&iO", &PyObjectToUniqueID, &driver_id, - &PyObjectToUniqueID, &function_id, &arguments, - &num_returns, &PyObjectToUniqueID, &parent_task_id, - &parent_counter, &PyObjectToUniqueID, &actor_id, - &actor_counter, &resource_vector)) { + if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&iOO", &PyObjectToUniqueID, + &driver_id, &PyObjectToUniqueID, &function_id, + &arguments, &num_returns, &PyObjectToUniqueID, + &parent_task_id, &parent_counter, &PyObjectToUniqueID, + &actor_id, &actor_counter, + &is_actor_checkpoint_method_object, &resource_vector)) { return -1; } + + bool is_actor_checkpoint_method = false; + if (is_actor_checkpoint_method_object != NULL && + PyObject_IsTrue(is_actor_checkpoint_method_object) == 1) { + is_actor_checkpoint_method = true; + } + Py_ssize_t size = PyList_Size(arguments); /* Construct the task specification. */ - TaskSpec_start_construct(g_task_builder, driver_id, parent_task_id, - parent_counter, actor_id, actor_counter, function_id, - num_returns); + TaskSpec_start_construct( + g_task_builder, driver_id, parent_task_id, parent_counter, actor_id, + actor_counter, is_actor_checkpoint_method, function_id, num_returns); /* Add the task arguments. */ for (Py_ssize_t i = 0; i < size; ++i) { PyObject *arg = PyList_GetItem(arguments, i); diff --git a/src/common/task.cc b/src/common/task.cc index 7d90a72e0..d8074bbe8 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -39,6 +39,7 @@ class TaskBuilder { int64_t parent_counter, ActorID actor_id, int64_t actor_counter, + bool is_actor_checkpoint_method, FunctionID function_id, int64_t num_returns) { driver_id_ = driver_id; @@ -46,6 +47,7 @@ class TaskBuilder { parent_counter_ = parent_counter; actor_id_ = actor_id; actor_counter_ = actor_counter; + is_actor_checkpoint_method_ = is_actor_checkpoint_method; function_id_ = function_id; num_returns_ = num_returns; @@ -56,6 +58,8 @@ class TaskBuilder { sha256_update(&ctx, (BYTE *) &parent_counter, sizeof(parent_counter)); sha256_update(&ctx, (BYTE *) &actor_id, sizeof(actor_id)); sha256_update(&ctx, (BYTE *) &actor_counter, sizeof(actor_counter)); + sha256_update(&ctx, (BYTE *) &is_actor_checkpoint_method, + sizeof(is_actor_checkpoint_method)); sha256_update(&ctx, (BYTE *) &function_id, sizeof(function_id)); } @@ -103,7 +107,7 @@ class TaskBuilder { auto message = CreateTaskInfo( fbb, to_flatbuf(fbb, driver_id_), to_flatbuf(fbb, task_id), to_flatbuf(fbb, parent_task_id_), parent_counter_, - to_flatbuf(fbb, actor_id_), actor_counter_, + to_flatbuf(fbb, actor_id_), actor_counter_, is_actor_checkpoint_method_, to_flatbuf(fbb, function_id_), arguments, fbb.CreateVector(returns), fbb.CreateVector(resource_vector_)); /* Finish the TaskInfo. */ @@ -127,6 +131,7 @@ class TaskBuilder { int64_t parent_counter_; ActorID actor_id_; int64_t actor_counter_; + bool is_actor_checkpoint_method_; FunctionID function_id_; int64_t num_returns_; std::vector resource_vector_; @@ -168,10 +173,12 @@ void TaskSpec_start_construct(TaskBuilder *builder, int64_t parent_counter, ActorID actor_id, int64_t actor_counter, + bool is_actor_checkpoint_method, FunctionID function_id, int64_t num_returns) { builder->Start(driver_id, parent_task_id, parent_counter, actor_id, - actor_counter, function_id, num_returns); + actor_counter, is_actor_checkpoint_method, function_id, + num_returns); } uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) { @@ -224,18 +231,17 @@ int64_t TaskSpec_actor_counter(TaskSpec *spec) { return std::abs(message->actor_counter()); } -bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) { +bool TaskSpec_is_actor_checkpoint_method(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - int64_t actor_counter = message->actor_counter(); - return actor_counter < 0; + return message->is_actor_checkpoint_method(); } bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) { if (TaskSpec_actor_counter(spec) == 0) { /* The first task does not have any dependencies. */ return false; - } else if (TaskSpec_actor_is_checkpoint_method(spec)) { + } else if (TaskSpec_is_actor_checkpoint_method(spec)) { /* Checkpoint tasks do not have any dependencies. */ return false; } else { diff --git a/src/common/task.h b/src/common/task.h index ee06de2ed..eee474ed4 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -84,6 +84,12 @@ void free_task_builder(TaskBuilder *builder); * @param parent_task_id The task ID of the task that submitted this task. * @param parent_counter A counter indicating how many tasks were submitted by * the parent task prior to this one. + * @param actor_id The ID of the actor that this task is for. If it is not an + * actor task, then this if NIL_ACTOR_ID. + * @param actor_counter A counter indicating how many tasks have been submitted + * to the same actor before this one. + * @param is_actor_checkpoint_method True if this is an actor checkpoint method + * and false otherwise. * @param function_id The function ID of the function to execute in this task. * @param num_args The number of arguments that this task has. * @param num_returns The number of return values that this task has. @@ -97,6 +103,7 @@ void TaskSpec_start_construct(TaskBuilder *B, int64_t parent_counter, UniqueID actor_id, int64_t actor_counter, + bool is_actor_checkpoint_method, FunctionID function_id, int64_t num_returns); @@ -149,7 +156,7 @@ int64_t TaskSpec_actor_counter(TaskSpec *spec); * @param spec The task_spec in question. * @return Whether the task is a checkpoint method. */ -bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec); +bool TaskSpec_is_actor_checkpoint_method(TaskSpec *spec); /** * Return whether the task's argument is a dummy object. Dummy objects are used diff --git a/src/common/test/example_task.h b/src/common/test/example_task.h index 4c1eea6e6..ab0e40cfe 100644 --- a/src/common/test/example_task.h +++ b/src/common/test/example_task.h @@ -14,7 +14,7 @@ static inline TaskSpec *example_task_spec_with_args(int64_t num_args, TaskID parent_task_id = globally_unique_id(); FunctionID func_id = globally_unique_id(); TaskSpec_start_construct(g_task_builder, NIL_ID, parent_task_id, 0, - NIL_ACTOR_ID, 0, func_id, num_returns); + NIL_ACTOR_ID, 0, false, func_id, num_returns); for (int64_t i = 0; i < num_args; ++i) { ObjectID arg_id; if (arg_ids == NULL) { diff --git a/src/common/test/task_tests.cc b/src/common/test/task_tests.cc index f348fe882..1cc806d1f 100644 --- a/src/common/test/task_tests.cc +++ b/src/common/test/task_tests.cc @@ -16,7 +16,7 @@ TEST task_test(void) { FunctionID func_id = globally_unique_id(); TaskBuilder *builder = make_task_builder(); TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - func_id, 2); + false, func_id, 2); UniqueID arg1 = globally_unique_id(); TaskSpec_args_add_ref(builder, arg1); @@ -55,7 +55,7 @@ TEST deterministic_ids_test(void) { /* Construct a first task. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - func_id, 3); + false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size1; @@ -63,7 +63,7 @@ TEST deterministic_ids_test(void) { /* Construct a second identical task. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - func_id, 3); + false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size2; @@ -83,7 +83,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different parent task ID. */ TaskSpec_start_construct(builder, NIL_ID, globally_unique_id(), 0, - NIL_ACTOR_ID, 0, func_id, 3); + NIL_ACTOR_ID, 0, false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size3; @@ -91,7 +91,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different parent counter. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 1, NIL_ACTOR_ID, 0, - func_id, 3); + false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size4; @@ -99,7 +99,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different function ID. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - globally_unique_id(), 3); + false, globally_unique_id(), 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size5; @@ -107,7 +107,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different object ID argument. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - func_id, 3); + false, func_id, 3); TaskSpec_args_add_ref(builder, globally_unique_id()); TaskSpec_args_add_val(builder, arg2, 11); int64_t size6; @@ -115,7 +115,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different value argument. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - func_id, 3); + false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, (uint8_t *) "hello_world", 11); int64_t size7; @@ -160,7 +160,7 @@ TEST send_task(void) { TaskID parent_task_id = globally_unique_id(); FunctionID func_id = globally_unique_id(); TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - func_id, 2); + false, func_id, 2); TaskSpec_args_add_ref(builder, globally_unique_id()); TaskSpec_args_add_val(builder, (uint8_t *) "Hello", 5); TaskSpec_args_add_val(builder, (uint8_t *) "World", 5); diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 57965ba8d..d6fed501a 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -344,7 +344,7 @@ bool dispatch_actor_task(LocalSchedulerState *state, * matches task_counter (the first task), or a checkpoint task. */ if (next_task_counter != entry.task_counter) { /* No other task should be first in the queue. */ - CHECK(TaskSpec_actor_is_checkpoint_method(task->spec)); + CHECK(TaskSpec_is_actor_checkpoint_method(task->spec)); } }