diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 76943bb20..3ce84d331 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -176,14 +176,21 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, } CoreWorker::~CoreWorker() { - io_service_.stop(); + Shutdown(); io_thread_.join(); - if (worker_type_ == WorkerType::WORKER) { - task_execution_service_.stop(); - } - if (log_dir_ != "") { - RayLog::ShutDownRayLog(); +} + +void CoreWorker::Shutdown() { + if (!shutdown_) { + io_service_.stop(); + if (worker_type_ == WorkerType::WORKER) { + task_execution_service_.stop(); + } + if (log_dir_ != "") { + RayLog::ShutDownRayLog(); + } } + shutdown_ = true; } void CoreWorker::Disconnect() { @@ -236,18 +243,21 @@ void CoreWorker::ReportActiveObjectIDs() { absl::MutexLock lock(&object_ref_mu_); // Only send a heartbeat when the set of active object IDs has changed because the // raylet only modifies the set of IDs when it receives a heartbeat. - if (active_object_ids_updated_) { - RAY_LOG(DEBUG) << "Sending " << active_object_ids_.size() << " object IDs to raylet."; - if (active_object_ids_.size() > - RayConfig::instance().raylet_max_active_object_ids()) { - RAY_LOG(WARNING) << active_object_ids_.size() - << "object IDs are currently in scope. " - << "This may lead to required objects being garbage collected."; - } - std::unordered_set copy; - copy.insert(active_object_ids_.begin(), active_object_ids_.end()); - RAY_CHECK_OK(raylet_client_->ReportActiveObjectIDs(copy)); + // TODO(edoakes): this is currently commented out because this heartbeat causes the + // workers to die when the raylet crashes unexpectedly. Without this, they could + // hang idle forever because they wait for the raylet to push tasks via gRPC. + // if (active_object_ids_updated_) { + RAY_LOG(DEBUG) << "Sending " << active_object_ids_.size() << " object IDs to raylet."; + if (active_object_ids_.size() > RayConfig::instance().raylet_max_active_object_ids()) { + RAY_LOG(WARNING) << active_object_ids_.size() << "object IDs are currently in scope. " + << "This may lead to required objects being garbage collected."; } + std::unordered_set copy(active_object_ids_.begin(), active_object_ids_.end()); + if (!raylet_client_->ReportActiveObjectIDs(copy).ok()) { + RAY_LOG(ERROR) << "Raylet connection failed. Shutting down."; + Shutdown(); + } + // } // Reset the timer from the previous expiration time to avoid drift. heartbeat_timer_.expires_at( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 10d1856c6..bf78f8e38 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -282,6 +282,10 @@ class CoreWorker { /// Run the io_service_ event loop. This should be called in a background thread. void RunIOService(); + /// Shut down the worker completely. + /// \return void. + void Shutdown(); + /// Send the list of active object IDs to the raylet. void ReportActiveObjectIDs() LOCKS_EXCLUDED(object_ref_mu_); @@ -361,6 +365,9 @@ class CoreWorker { /// worker context. TaskID main_thread_task_id_; + // Flag indicating whether this worker has been shut down. + bool shutdown_ = false; + /// Event loop where the IO events are handled. e.g. async GCS operations. boost::asio::io_service io_service_;