diff --git a/BUILD.bazel b/BUILD.bazel index d724e4dc8..f6da3a253 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -337,6 +337,7 @@ cc_library( "@boost//:asio", "@com_github_jupp0r_prometheus_cpp//pull", "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/container:flat_hash_set", "@com_google_absl//absl/memory", "@com_google_absl//absl/strings", "@com_google_googletest//:gtest", @@ -369,6 +370,7 @@ cc_library( ]), copts = COPTS, deps = [ + "@com_google_absl//absl/container:flat_hash_set", ":core_worker_cc_proto", ":ray_common", ":ray_util", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 82f5b2eaa..a0f349839 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1048,11 +1048,11 @@ cdef class CoreWorker: def add_active_object_id(self, ObjectID object_id): cdef: CObjectID c_object_id = object_id.native() - with nogil: - self.core_worker.get().AddActiveObjectID(c_object_id) + # Note: faster to not release GIL for short-running op. + self.core_worker.get().AddActiveObjectID(c_object_id) def remove_active_object_id(self, ObjectID object_id): cdef: CObjectID c_object_id = object_id.native() - with nogil: - self.core_worker.get().RemoveActiveObjectID(c_object_id) + # Note: faster to not release GIL for short-running op. + self.core_worker.get().RemoveActiveObjectID(c_object_id) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 64df69ecf..76943bb20 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -218,23 +218,22 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { } void CoreWorker::AddActiveObjectID(const ObjectID &object_id) { - io_service_.post([this, object_id]() -> void { - active_object_ids_.insert(object_id); - active_object_ids_updated_ = true; - }); + absl::MutexLock lock(&object_ref_mu_); + active_object_ids_.insert(object_id); + active_object_ids_updated_ = true; } void CoreWorker::RemoveActiveObjectID(const ObjectID &object_id) { - io_service_.post([this, object_id]() -> void { - if (active_object_ids_.erase(object_id)) { - active_object_ids_updated_ = true; - } else { - RAY_LOG(WARNING) << "Tried to erase non-existent object ID" << object_id; - } - }); + absl::MutexLock lock(&object_ref_mu_); + if (active_object_ids_.erase(object_id)) { + active_object_ids_updated_ = true; + } else { + RAY_LOG(WARNING) << "Tried to erase non-existent object ID" << object_id; + } } void CoreWorker::ReportActiveObjectIDs() { + absl::MutexLock lock(&object_ref_mu_); // Only send a heartbeat when the set of active object IDs has changed because the // raylet only modifies the set of IDs when it receives a heartbeat. if (active_object_ids_updated_) { @@ -245,7 +244,9 @@ void CoreWorker::ReportActiveObjectIDs() { << "object IDs are currently in scope. " << "This may lead to required objects being garbage collected."; } - RAY_CHECK_OK(raylet_client_->ReportActiveObjectIDs(active_object_ids_)); + std::unordered_set copy; + copy.insert(active_object_ids_.begin(), active_object_ids_.end()); + RAY_CHECK_OK(raylet_client_->ReportActiveObjectIDs(copy)); } // Reset the timer from the previous expiration time to avoid drift. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index a03b72060..10d1856c6 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1,6 +1,10 @@ #ifndef RAY_CORE_WORKER_CORE_WORKER_H #define RAY_CORE_WORKER_CORE_WORKER_H +#include "absl/base/thread_annotations.h" +#include "absl/container/flat_hash_set.h" +#include "absl/synchronization/mutex.h" + #include "ray/common/buffer.h" #include "ray/core_worker/actor_handle.h" #include "ray/core_worker/common.h" @@ -81,11 +85,11 @@ class CoreWorker { // Add this object ID to the set of active object IDs that is sent to the raylet // in the heartbeat messsage. - void AddActiveObjectID(const ObjectID &object_id); + void AddActiveObjectID(const ObjectID &object_id) LOCKS_EXCLUDED(object_ref_mu_); // Remove this object ID from the set of active object IDs that is sent to the raylet // in the heartbeat messsage. - void RemoveActiveObjectID(const ObjectID &object_id); + void RemoveActiveObjectID(const ObjectID &object_id) LOCKS_EXCLUDED(object_ref_mu_); /* Public methods related to storing and retrieving objects. */ @@ -279,7 +283,7 @@ class CoreWorker { void RunIOService(); /// Send the list of active object IDs to the raylet. - void ReportActiveObjectIDs(); + void ReportActiveObjectIDs() LOCKS_EXCLUDED(object_ref_mu_); /* Private methods related to task submission. */ @@ -379,12 +383,19 @@ class CoreWorker { // Thread that runs a boost::asio service to process IO events. std::thread io_thread_; + /* Fields related to ref counting objects. */ + + /// Protects access to the set of active object ids. Since this set is updated + /// very frequently, it is faster to lock around accesses rather than serialize + /// accesses via the event loop. + absl::Mutex object_ref_mu_; + /// Set of object IDs that are in scope in the language worker. - std::unordered_set active_object_ids_; + absl::flat_hash_set active_object_ids_ GUARDED_BY(object_ref_mu_); /// Indicates whether or not the active_object_ids map has changed since the /// last time it was sent to the raylet. - bool active_object_ids_updated_ = false; + bool active_object_ids_updated_ GUARDED_BY(object_ref_mu_) = false; /* Fields related to storing and retrieving objects. */