From f21d783e6d40bfb408a4fde4e107b0c785b7dc84 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 3 Jul 2018 11:48:50 -0700 Subject: [PATCH] Remove new gcs code from legacy Ray codepath (#2329) --- src/common/state/redis.cc | 6 +- src/global_scheduler/global_scheduler.cc | 8 --- src/local_scheduler/local_scheduler.cc | 71 ------------------- .../local_scheduler_algorithm.cc | 35 --------- .../test/local_scheduler_tests.cc | 12 ---- src/plasma/plasma_manager.cc | 19 +---- 6 files changed, 2 insertions(+), 149 deletions(-) diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index 56f2a905f..17a3c8ce2 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -1162,13 +1162,9 @@ void redis_task_table_subscribe(TableCallbackData *callback_data) { /* TASK_CHANNEL_PREFIX is defined in ray_redis_module.cc and must be kept in * sync with that file. */ const char *TASK_CHANNEL_PREFIX = "TT:"; -#if !RAY_USE_NEW_GCS - for (auto subscribe_context : db->subscribe_contexts) { -#else /* In the new code path, subscriptions currently go through the * primary redis shard. */ - for (auto subscribe_context : {db->subscribe_context}) { -#endif + for (auto subscribe_context : db->subscribe_contexts) { int status; if (data->local_scheduler_id.is_nil()) { /* TODO(swang): Implement the state_filter by translating the bitmask into diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index 448947cc7..cd2477420 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -41,7 +41,6 @@ void assign_task_to_local_scheduler_retry(UniqueID id, return; } -#if !RAY_USE_NEW_GCS // The local scheduler is still alive. The failure is most likely due to the // task assignment getting published before the local scheduler subscribed to // the channel. Retry the assignment. @@ -51,9 +50,6 @@ void assign_task_to_local_scheduler_retry(UniqueID id, .fail_callback = assign_task_to_local_scheduler_retry, }; task_table_update(state->db, Task_copy(task), &retryInfo, NULL, user_context); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); -#endif } /** @@ -76,16 +72,12 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state, RAY_LOG(DEBUG) << "Issuing a task table update for task = " << Task_task_id(task); -#if !RAY_USE_NEW_GCS auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. .timeout = 0, // This value is unused. .fail_callback = assign_task_to_local_scheduler_retry, }; task_table_update(state->db, Task_copy(task), &retryInfo, NULL, state); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); -#endif /* Update the object table info to reflect the fact that the results of this * task will be created on the machine that the task was assigned to. This can diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 0362a12ad..27f14dad8 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -144,12 +144,7 @@ void kill_worker(LocalSchedulerState *state, /* Update the task table to reflect that the task failed to complete. */ if (state->db != NULL) { Task_set_state(worker->task_in_progress, TaskStatus::LOST); -#if !RAY_USE_NEW_GCS task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, worker->task_in_progress)); - Task_free(worker->task_in_progress); -#endif } else { Task_free(worker->task_in_progress); } @@ -558,12 +553,7 @@ void assign_task_to_worker(LocalSchedulerState *state, worker->task_in_progress = Task_copy(task); /* Update the global task table. */ if (state->db != NULL) { -#if !RAY_USE_NEW_GCS task_table_update(state->db, task, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif } else { Task_free(task); } @@ -630,7 +620,6 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { /* Update control state tables. */ TaskStatus task_state = TaskStatus::DONE; Task_set_state(worker->task_in_progress, task_state); -#if !RAY_USE_NEW_GCS auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. .timeout = 0, // This value is unused. @@ -641,10 +630,6 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { // task table entries have already been cleaned up by the monitor. task_table_update(state->db, worker->task_in_progress, &retryInfo, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, worker->task_in_progress)); - Task_free(worker->task_in_progress); -#endif } else { Task_free(worker->task_in_progress); } @@ -692,22 +677,10 @@ void reconstruct_task_update_callback(Task *task, /* (2) The current local scheduler for the task is dead. The task is * lost, but the task table hasn't received the update yet. Retry the * test-and-set. */ -#if !RAY_USE_NEW_GCS task_table_test_and_update(state->db, Task_task_id(task), current_local_scheduler_id, Task_state(task), TaskStatus::RECONSTRUCTING, NULL, reconstruct_task_update_callback, state); -#else - RAY_CHECK_OK(gcs::TaskTableTestAndUpdate( - &state->gcs_client, Task_task_id(task), current_local_scheduler_id, - static_cast(Task_state(task)), - SchedulingState::RECONSTRUCTING, - [task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &, - const TaskTableDataT &t, bool updated) { - reconstruct_task_update_callback(task, user_context, updated); - })); - Task_free(task); -#endif } } /* The test-and-set failed, so it is not safe to resubmit the task for @@ -751,22 +724,10 @@ void reconstruct_put_task_update_callback(Task *task, /* (2) The current local scheduler for the task is dead. The task is * lost, but the task table hasn't received the update yet. Retry the * test-and-set. */ -#if !RAY_USE_NEW_GCS task_table_test_and_update(state->db, Task_task_id(task), current_local_scheduler_id, Task_state(task), TaskStatus::RECONSTRUCTING, NULL, reconstruct_put_task_update_callback, state); -#else - RAY_CHECK_OK(gcs::TaskTableTestAndUpdate( - &state->gcs_client, Task_task_id(task), current_local_scheduler_id, - static_cast(Task_state(task)), - SchedulingState::RECONSTRUCTING, - [task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &, - const TaskTableDataT &, bool updated) { - reconstruct_put_task_update_callback(task, user_context, updated); - })); - Task_free(task); -#endif } else if (Task_state(task) == TaskStatus::RUNNING) { /* (1) The task is still executing on a live node. The object created * by `ray.put` was not able to be reconstructed, and the workload will @@ -821,27 +782,10 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id, } /* If there are no other instances of the task running, it's safe for us to * claim responsibility for reconstruction. */ -#if !RAY_USE_NEW_GCS task_table_test_and_update(state->db, task_id, DBClientID::nil(), (TaskStatus::DONE | TaskStatus::LOST), TaskStatus::RECONSTRUCTING, NULL, done_callback, state); -#else - RAY_CHECK_OK(gcs::TaskTableTestAndUpdate( - &state->gcs_client, task_id, DBClientID::nil(), - static_cast(static_cast(SchedulingState::DONE) | - static_cast(SchedulingState::LOST)), - SchedulingState::RECONSTRUCTING, - [done_callback, state](gcs::AsyncGcsClient *, const ray::TaskID &, - const TaskTableDataT &t, bool updated) { - Task *task = Task_alloc(t.task_info.data(), t.task_info.size(), - static_cast(t.scheduling_state), - DBClientID::from_binary(t.scheduler_id), - std::vector()); - done_callback(task, state, updated); - Task_free(task); - })); -#endif } void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id, @@ -860,24 +804,9 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id, LocalSchedulerState *state = (LocalSchedulerState *) user_context; /* If the task failed to finish, it's safe for us to claim responsibility for * reconstruction. */ -#if !RAY_USE_NEW_GCS task_table_test_and_update(state->db, task_id, DBClientID::nil(), TaskStatus::LOST, TaskStatus::RECONSTRUCTING, NULL, reconstruct_task_update_callback, state); -#else - RAY_CHECK_OK(gcs::TaskTableTestAndUpdate( - &state->gcs_client, task_id, DBClientID::nil(), SchedulingState::LOST, - SchedulingState::RECONSTRUCTING, - [state](gcs::AsyncGcsClient *, const ray::TaskID &, - const TaskTableDataT &t, bool updated) { - Task *task = Task_alloc(t.task_info.data(), t.task_info.size(), - static_cast(t.scheduling_state), - DBClientID::from_binary(t.scheduler_id), - std::vector()); - reconstruct_task_update_callback(task, state, updated); - Task_free(task); - })); -#endif } void reconstruct_object_lookup_callback( diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index ded56f582..ebaac4162 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -389,16 +389,11 @@ void finish_killed_task(LocalSchedulerState *state, if (state->db != NULL) { Task *task = Task_alloc(execution_spec, TaskStatus::DONE, get_db_client_id(state->db)); -#if !RAY_USE_NEW_GCS // In most cases, task_table_update would be appropriate, however, it is // possible in some cases that the task has not yet been added to the task // table (e.g., if it is an actor task that is queued locally because the // actor has not been created yet). task_table_add_task(state->db, task, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif } } @@ -507,22 +502,12 @@ void queue_actor_task(LocalSchedulerState *state, if (from_global_scheduler) { /* If the task is from the global scheduler, it's already been added to * the task table, so just update the entry. */ -#if !RAY_USE_NEW_GCS task_table_update(state->db, task, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif } else { /* Otherwise, this is the first time the task has been seen in the * system (unless it's a resubmission of a previous task), so add the * entry. */ -#if !RAY_USE_NEW_GCS task_table_add_task(state->db, task, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif } } @@ -1002,7 +987,6 @@ std::list::iterator queue_task( if (state->db != NULL) { Task *task = Task_alloc(task_entry, TaskStatus::QUEUED, get_db_client_id(state->db)); -#if !RAY_USE_NEW_GCS if (from_global_scheduler) { /* If the task is from the global scheduler, it's already been added to * the task table, so just update the entry. */ @@ -1012,10 +996,6 @@ std::list::iterator queue_task( * (unless it's a resubmission of a previous task), so add the entry. */ task_table_add_task(state->db, task, NULL, NULL, NULL); } -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif } /* Copy the spec and add it to the task queue. The allocated spec will be @@ -1204,7 +1184,6 @@ void give_task_to_local_scheduler(LocalSchedulerState *state, RAY_CHECK(state->config.global_scheduler_exists); Task *task = Task_alloc(execution_spec, TaskStatus::SCHEDULED, local_scheduler_id); -#if !RAY_USE_NEW_GCS auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. .timeout = 0, // This value is unused. @@ -1212,10 +1191,6 @@ void give_task_to_local_scheduler(LocalSchedulerState *state, }; task_table_add_task(state->db, task, &retryInfo, NULL, state); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif } void give_task_to_global_scheduler_retry(UniqueID id, @@ -1252,7 +1227,6 @@ void give_task_to_global_scheduler(LocalSchedulerState *state, RAY_CHECK(state->config.global_scheduler_exists); Task *task = Task_alloc(execution_spec, TaskStatus::WAITING, get_db_client_id(state->db)); -#if !RAY_USE_NEW_GCS RAY_CHECK(state->db != NULL); auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. @@ -1260,10 +1234,6 @@ void give_task_to_global_scheduler(LocalSchedulerState *state, .fail_callback = give_task_to_global_scheduler_retry, }; task_table_add_task(state->db, task, &retryInfo, NULL, state); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif } bool resource_constraints_satisfied(LocalSchedulerState *state, @@ -1338,16 +1308,11 @@ void handle_actor_task_submitted(LocalSchedulerState *state, algorithm_state->cached_submitted_actor_tasks.push_back( std::move(task_entry)); -#if !RAY_USE_NEW_GCS // Even if the task can't be assigned to a worker yet, we should still write // it to the task table. TODO(rkn): There's no need to do this more than // once, and we could run into problems if we have very large numbers of // tasks in this cache. task_table_add_task(state->db, task, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); - Task_free(task); -#endif return; } diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index f75bcb750..a9ff6d22f 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -233,14 +233,8 @@ TEST object_reconstruction_test(void) { Task *task = Task_alloc( execution_spec, TaskStatus::DONE, get_db_client_id(local_scheduler->local_scheduler_state->db)); -#if !RAY_USE_NEW_GCS task_table_add_task(local_scheduler->local_scheduler_state->db, task, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd( - &local_scheduler->local_scheduler_state->gcs_client, task)); - Task_free(task); -#endif /* Trigger reconstruction, and run the event loop again. */ ObjectID return_id = TaskSpec_return(spec, 0); @@ -355,14 +349,8 @@ TEST object_reconstruction_recursive_test(void) { Task *last_task = Task_alloc( specs[NUM_TASKS - 1], TaskStatus::DONE, get_db_client_id(local_scheduler->local_scheduler_state->db)); -#if !RAY_USE_NEW_GCS task_table_add_task(local_scheduler->local_scheduler_state->db, last_task, NULL, NULL, NULL); -#else - RAY_CHECK_OK(TaskTableAdd( - &local_scheduler->local_scheduler_state->gcs_client, last_task)); - Task_free(last_task); -#endif /* Simulate eviction of the objects, so that reconstruction is required. */ for (int i = 0; i < NUM_TASKS; ++i) { ObjectID return_id = TaskSpec_return(specs[i].Spec(), 0); diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index 9107910a6..a55cff177 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -1321,26 +1321,9 @@ void log_object_hash_mismatch_error_result_callback(ObjectID object_id, void *user_context) { RAY_CHECK(!task_id.is_nil()); PlasmaManagerState *state = (PlasmaManagerState *) user_context; -/* Get the specification for the nondeterministic task. */ -#if !RAY_USE_NEW_GCS + /* Get the specification for the nondeterministic task. */ task_table_get_task(state->db, task_id, NULL, log_object_hash_mismatch_error_task_callback, state); -#else - RAY_CHECK_OK(state->gcs_client.task_table().Lookup( - ray::JobID::nil(), task_id, - [user_context](gcs::AsyncGcsClient *, const TaskID &, - const TaskTableDataT &t) { - Task *task = Task_alloc(t.task_info.data(), t.task_info.size(), - static_cast(t.scheduling_state), - DBClientID::from_binary(t.scheduler_id), - std::vector()); - log_object_hash_mismatch_error_task_callback(task, user_context); - Task_free(task); - }, - [](gcs::AsyncGcsClient *, const TaskID &) { - // TODO(pcmoritz): Handle failure. - })); -#endif } void log_object_hash_mismatch_error_object_callback(ObjectID object_id,