From e556b729c2388e429e3598d84ddbf9889b0df134 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 19 Dec 2019 13:48:32 -0800 Subject: [PATCH] [direct call] Fix max_calls interaction with background tasks. (#6536) --- python/ray/_raylet.pyx | 15 ++++---- python/ray/includes/common.pxd | 5 ++- python/ray/tests/test_basic.py | 19 ++++++++++ src/ray/common/status.h | 19 +++++++--- src/ray/core_worker/core_worker.cc | 15 +++++++- src/ray/core_worker/future_resolver.cc | 4 +-- src/ray/core_worker/task_manager.cc | 24 +++++++++++++ src/ray/core_worker/task_manager.h | 11 ++++++ .../test/direct_task_transport_test.cc | 35 +++++++++++++++++-- .../transport/direct_actor_transport.cc | 9 +++-- .../transport/direct_actor_transport.h | 4 +-- .../transport/direct_task_transport.cc | 12 +++++-- .../core_worker/transport/raylet_transport.cc | 4 +-- .../core_worker/transport/raylet_transport.h | 4 +-- src/ray/protobuf/core_worker.proto | 2 ++ 15 files changed, 154 insertions(+), 28 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index dc964d94c..07ecb6f77 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -688,10 +688,9 @@ cdef execute_task( # If we've reached the max number of executions for this worker, exit. task_counter = manager.get_task_counter(job_id, function_descriptor) if task_counter == execution_info.max_calls: - # Intentionally disconnect so the raylet doesn't print an error. - # TODO(edoakes): we should handle max_calls in the core worker. - worker.core_worker.disconnect() - sys.exit(0) + exit = SystemExit(0) + exit.is_ray_terminate = True + raise exit cdef CRayStatus task_execution_handler( @@ -721,11 +720,13 @@ cdef CRayStatus task_execution_handler( job_id=None) sys.exit(1) except SystemExit as e: - if not hasattr(e, "is_ray_terminate"): - logger.exception("SystemExit was raised from the worker") # Tell the core worker to exit as soon as the result objects # are processed. - return CRayStatus.SystemExit() + if hasattr(e, "is_ray_terminate"): + return CRayStatus.IntentionalSystemExit() + else: + logger.exception("SystemExit was raised from the worker") + return CRayStatus.UnexpectedSystemExit() return CRayStatus.OK() diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index ade09ea2f..9b54f9178 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -80,7 +80,10 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: CRayStatus Interrupted(const c_string &msg) @staticmethod - CRayStatus SystemExit() + CRayStatus IntentionalSystemExit() + + @staticmethod + CRayStatus UnexpectedSystemExit() c_bool ok() c_bool IsOutOfMemory() diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 8a875209b..8130f37df 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -91,6 +91,25 @@ def test_simple_serialization(ray_start_regular): assert type(obj) == type(new_obj_2) +def test_background_tasks_with_max_calls(shutdown_only): + ray.init(num_cpus=2) + + @ray.remote + def g(): + time.sleep(.1) + return 0 + + @ray.remote(max_calls=1, max_retries=0) + def f(): + return [g.remote()] + + nested = ray.get([f.remote() for _ in range(10)]) + + # Should still be able to retrieve these objects, since f's workers will + # wait for g to finish before exiting. + ray.get([x[0] for x in nested]) + + def test_fair_queueing(shutdown_only): ray.init( num_cpus=1, _internal_config=json.dumps({ diff --git a/src/ray/common/status.h b/src/ray/common/status.h index 1fb329f55..07b193d33 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -81,7 +81,8 @@ enum class StatusCode : char { RedisError = 11, TimedOut = 12, Interrupted = 13, - SystemExit = 14, + IntentionalSystemExit = 14, + UnexpectedSystemExit = 15, }; #if defined(__clang__) @@ -153,8 +154,12 @@ class RAY_EXPORT Status { return Status(StatusCode::Interrupted, msg); } - static Status SystemExit() { - return Status(StatusCode::SystemExit, "process requested exit"); + static Status IntentionalSystemExit() { + return Status(StatusCode::IntentionalSystemExit, "intentional system exit"); + } + + static Status UnexpectedSystemExit() { + return Status(StatusCode::UnexpectedSystemExit, "user code caused exit"); } // Returns true iff the status indicates success. @@ -172,7 +177,13 @@ class RAY_EXPORT Status { bool IsRedisError() const { return code() == StatusCode::RedisError; } bool IsTimedOut() const { return code() == StatusCode::TimedOut; } bool IsInterrupted() const { return code() == StatusCode::Interrupted; } - bool IsSystemExit() const { return code() == StatusCode::SystemExit; } + bool IsSystemExit() const { + return code() == StatusCode::IntentionalSystemExit || + code() == StatusCode::UnexpectedSystemExit; + } + bool IsIntentionalSystemExit() const { + return code() == StatusCode::IntentionalSystemExit; + } // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f48b2f6af..bd34b612e 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -117,7 +117,20 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, RAY_CHECK(task_execution_callback_ != nullptr); auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); - auto exit = std::bind(&CoreWorker::Shutdown, this); + auto exit = [this](bool intentional) { + // Release the resources early in case draining takes a long time. + RAY_CHECK_OK(local_raylet_client_->NotifyDirectCallTaskBlocked()); + task_manager_->DrainAndShutdown([this, intentional]() { + // To avoid problems, make sure shutdown is always called from the same + // event loop each time. + task_execution_service_.post([this, intentional]() { + if (intentional) { + Disconnect(); // Notify the raylet this is an intentional exit. + } + Shutdown(); + }); + }); + }; raylet_task_receiver_ = std::unique_ptr(new CoreWorkerRayletTaskReceiver( worker_context_.GetWorkerID(), local_raylet_client_, execute_task, exit)); diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index c31d15993..8b7a2cec8 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -20,8 +20,8 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID request, [this, object_id](const Status &status, const rpc::GetObjectStatusReply &reply) { if (!status.ok()) { - RAY_LOG(ERROR) << "Error retrieving the value of object ID " << object_id - << " that was deserialized: " << status.ToString(); + RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id + << " that was deserialized: " << status.ToString(); } // Either the owner is gone or the owner replied that the object has // been created. In both cases, we can now try to fetch the object via diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index f93439c36..806352979 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -17,6 +17,18 @@ void TaskManager::AddPendingTask(const TaskSpecification &spec, int max_retries) RAY_CHECK(pending_tasks_.emplace(spec.TaskId(), std::move(entry)).second); } +void TaskManager::DrainAndShutdown(std::function shutdown) { + absl::MutexLock lock(&mu_); + if (pending_tasks_.empty()) { + shutdown(); + } else { + RAY_LOG(WARNING) + << "This worker is still managing " << pending_tasks_.size() + << " in flight tasks, waiting for them to finish before shutting down."; + } + shutdown_hook_ = shutdown; +} + bool TaskManager::IsTaskPending(const TaskID &task_id) const { absl::MutexLock lock(&mu_); return pending_tasks_.count(task_id) > 0; @@ -68,6 +80,8 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, RAY_CHECK(actor_addr != nullptr); actor_manager_->PublishCreatedActor(spec, *actor_addr); } + + ShutdownIfNeeded(); } void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, @@ -122,6 +136,16 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ } MarkPendingTaskFailed(task_id, spec, error_type); } + + ShutdownIfNeeded(); +} + +void TaskManager::ShutdownIfNeeded() { + absl::MutexLock lock(&mu_); + if (shutdown_hook_ && pending_tasks_.empty()) { + RAY_LOG(WARNING) << "All in flight tasks finished, shutting down worker."; + shutdown_hook_(); + } } void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index d3e09fc3f..0db630f38 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -43,6 +43,11 @@ class TaskManager : public TaskFinisherInterface { /// \return Void. void AddPendingTask(const TaskSpecification &spec, int max_retries = 0); + /// Wait for all pending tasks to finish, and then shutdown. + /// + /// \param shutdown The shutdown callback to call. + void DrainAndShutdown(std::function shutdown); + /// Return whether the task is pending. /// /// \param[in] task_id ID of the task to query. @@ -76,6 +81,9 @@ class TaskManager : public TaskFinisherInterface { void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type) LOCKS_EXCLUDED(mu_); + /// Shutdown if all tasks are finished and shutdown is scheduled. + void ShutdownIfNeeded() LOCKS_EXCLUDED(mu_); + /// Used to store task results. std::shared_ptr in_memory_store_; @@ -104,6 +112,9 @@ class TaskManager : public TaskFinisherInterface { /// storing a shared_ptr to a PushTaskRequest protobuf for all tasks. absl::flat_hash_map> pending_tasks_ GUARDED_BY(mu_); + + /// Optional shutdown hook to call when pending tasks all finish. + std::function shutdown_hook_ GUARDED_BY(mu_) = nullptr; }; } // namespace ray diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index abce6d4d1..d7ee98608 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -24,12 +24,16 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { return Status::OK(); } - bool ReplyPushTask(Status status = Status::OK()) { + bool ReplyPushTask(Status status = Status::OK(), bool exit = false) { if (callbacks.size() == 0) { return false; } auto callback = callbacks.front(); - callback(status, rpc::PushTaskReply()); + auto reply = rpc::PushTaskReply(); + if (exit) { + reply.set_worker_exiting(true); + } + callback(status, reply); callbacks.pop_front(); return true; } @@ -412,6 +416,33 @@ TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) { ASSERT_EQ(task_finisher->num_tasks_failed, 1); } +TEST(DirectTaskTransportTest, TestWorkerNotReturnedOnExit) { + auto raylet_client = std::make_shared(); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); + auto factory = [&](const std::string &addr, int port) { return worker_client; }; + auto task_finisher = std::make_shared(); + CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store, + task_finisher, ClientID::Nil(), kLongTimeout); + std::unordered_map empty_resources; + std::vector empty_descriptor; + TaskSpecification task1 = BuildTaskSpec(empty_resources, empty_descriptor); + + ASSERT_TRUE(submitter.SubmitTask(task1).ok()); + ASSERT_EQ(raylet_client->num_workers_requested, 1); + + // Task 1 is pushed. + ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1000, ClientID::Nil())); + ASSERT_EQ(worker_client->callbacks.size(), 1); + + // Task 1 finishes with exit status; the worker is not returned. + ASSERT_TRUE(worker_client->ReplyPushTask(Status::OK(), /*exit=*/true)); + ASSERT_EQ(raylet_client->num_workers_returned, 0); + ASSERT_EQ(raylet_client->num_workers_disconnected, 0); + ASSERT_EQ(task_finisher->num_tasks_complete, 1); + ASSERT_EQ(task_finisher->num_tasks_failed, 0); +} + TEST(DirectTaskTransportTest, TestSpillback) { auto raylet_client = std::make_shared(); auto worker_client = std::make_shared(); diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index bc6ce5647..bac8fc748 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -257,16 +257,21 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( } } if (status.IsSystemExit()) { + // Don't allow the worker to be reused, even though the reply status is OK. + // The worker will be shutting down shortly. + reply->set_worker_exiting(true); // In Python, SystemExit can only be raised on the main thread. To // work around this when we are executing tasks on worker threads, // we re-post the exit event explicitly on the main thread. exiting_ = true; if (objects_valid) { + // This happens when max_calls is hit. We still need to return the objects. send_reply_callback(Status::OK(), nullptr, nullptr); } else { - send_reply_callback(Status::SystemExit(), nullptr, nullptr); + send_reply_callback(status, nullptr, nullptr); } - task_main_io_service_.post([this]() { exit_handler_(); }); + task_main_io_service_.post( + [this, status]() { exit_handler_(status.IsIntentionalSystemExit()); }); } else { RAY_CHECK(objects_valid) << return_objects.size() << " " << num_returns; send_reply_callback(status, nullptr, nullptr); diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 2831288be..1164b7cfc 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -423,7 +423,7 @@ class CoreWorkerDirectTaskReceiver { CoreWorkerDirectTaskReceiver(WorkerContext &worker_context, boost::asio::io_service &main_io_service, const TaskHandler &task_handler, - const std::function &exit_handler) + const std::function &exit_handler) : worker_context_(worker_context), task_handler_(task_handler), exit_handler_(exit_handler), @@ -470,7 +470,7 @@ class CoreWorkerDirectTaskReceiver { /// The callback function to process a task. TaskHandler task_handler_; /// The callback function to exit the worker. - std::function exit_handler_; + std::function exit_handler_; /// The IO event loop for running tasks on. boost::asio::io_service &task_main_io_service_; /// Factory for producing new core worker clients. diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index f2298a78d..a3cfe9ce0 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -175,10 +175,16 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( std::move(request), [this, task_id, is_actor, is_actor_creation, scheduling_key, addr, assigned_resources](Status status, const rpc::PushTaskReply &reply) { - // Successful actor creation leases the worker indefinitely from the raylet. - if (!status.ok() || !is_actor_creation) { + if (reply.worker_exiting()) { + // The worker is draining and will shutdown after it is done. Don't return + // it to the Raylet since that will kill it early. absl::MutexLock lock(&mu_); - OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources); + worker_to_lease_client_.erase(addr); + } else if (!status.ok() || !is_actor_creation) { + // Successful actor creation leases the worker indefinitely from the raylet. + absl::MutexLock lock(&mu_); + OnWorkerIdle(addr, scheduling_key, + /*error=*/!status.ok(), assigned_resources); } if (!status.ok()) { // TODO: It'd be nice to differentiate here between process vs node diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index 50c16a18f..c9b4b68e6 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -7,7 +7,7 @@ namespace ray { CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver( const WorkerID &worker_id, std::shared_ptr &raylet_client, - const TaskHandler &task_handler, const std::function &exit_handler) + const TaskHandler &task_handler, const std::function &exit_handler) : worker_id_(worker_id), raylet_client_(raylet_client), task_handler_(task_handler), @@ -50,7 +50,7 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( std::vector> results; auto status = task_handler_(task_spec, resource_ids, &results); if (status.IsSystemExit()) { - exit_handler_(); + exit_handler_(status.IsIntentionalSystemExit()); return; } diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 3a392e573..0bcf7a761 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -19,7 +19,7 @@ class CoreWorkerRayletTaskReceiver { CoreWorkerRayletTaskReceiver(const WorkerID &worker_id, std::shared_ptr &raylet_client, const TaskHandler &task_handler, - const std::function &exit_handler); + const std::function &exit_handler); /// Handle a `AssignTask` request. /// The implementation can handle this request asynchronously. When handling is done, @@ -41,7 +41,7 @@ class CoreWorkerRayletTaskReceiver { /// The callback function to process a task. TaskHandler task_handler_; /// The callback function to exit the worker. - std::function exit_handler_; + std::function exit_handler_; /// The callback to process arg wait complete. std::function on_wait_complete_; }; diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 96e193989..593f5cdfb 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -86,6 +86,8 @@ message PushTaskRequest { message PushTaskReply { // The returned objects. repeated ReturnObject return_objects = 1; + // Set to true if the worker will be exiting. + bool worker_exiting = 2; } message DirectActorCallArgWaitCompleteRequest {