mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 09:05:47 +08:00
[minor] Reduce perf overhead of object ref tracking (#6041)
This commit is contained in:
@@ -337,6 +337,7 @@ cc_library(
|
||||
"@boost//:asio",
|
||||
"@com_github_jupp0r_prometheus_cpp//pull",
|
||||
"@com_google_absl//absl/base:core_headers",
|
||||
"@com_google_absl//absl/container:flat_hash_set",
|
||||
"@com_google_absl//absl/memory",
|
||||
"@com_google_absl//absl/strings",
|
||||
"@com_google_googletest//:gtest",
|
||||
@@ -369,6 +370,7 @@ cc_library(
|
||||
]),
|
||||
copts = COPTS,
|
||||
deps = [
|
||||
"@com_google_absl//absl/container:flat_hash_set",
|
||||
":core_worker_cc_proto",
|
||||
":ray_common",
|
||||
":ray_util",
|
||||
|
||||
@@ -1048,11 +1048,11 @@ cdef class CoreWorker:
|
||||
def add_active_object_id(self, ObjectID object_id):
|
||||
cdef:
|
||||
CObjectID c_object_id = object_id.native()
|
||||
with nogil:
|
||||
self.core_worker.get().AddActiveObjectID(c_object_id)
|
||||
# Note: faster to not release GIL for short-running op.
|
||||
self.core_worker.get().AddActiveObjectID(c_object_id)
|
||||
|
||||
def remove_active_object_id(self, ObjectID object_id):
|
||||
cdef:
|
||||
CObjectID c_object_id = object_id.native()
|
||||
with nogil:
|
||||
self.core_worker.get().RemoveActiveObjectID(c_object_id)
|
||||
# Note: faster to not release GIL for short-running op.
|
||||
self.core_worker.get().RemoveActiveObjectID(c_object_id)
|
||||
|
||||
@@ -218,23 +218,22 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
|
||||
}
|
||||
|
||||
void CoreWorker::AddActiveObjectID(const ObjectID &object_id) {
|
||||
io_service_.post([this, object_id]() -> void {
|
||||
active_object_ids_.insert(object_id);
|
||||
active_object_ids_updated_ = true;
|
||||
});
|
||||
absl::MutexLock lock(&object_ref_mu_);
|
||||
active_object_ids_.insert(object_id);
|
||||
active_object_ids_updated_ = true;
|
||||
}
|
||||
|
||||
void CoreWorker::RemoveActiveObjectID(const ObjectID &object_id) {
|
||||
io_service_.post([this, object_id]() -> void {
|
||||
if (active_object_ids_.erase(object_id)) {
|
||||
active_object_ids_updated_ = true;
|
||||
} else {
|
||||
RAY_LOG(WARNING) << "Tried to erase non-existent object ID" << object_id;
|
||||
}
|
||||
});
|
||||
absl::MutexLock lock(&object_ref_mu_);
|
||||
if (active_object_ids_.erase(object_id)) {
|
||||
active_object_ids_updated_ = true;
|
||||
} else {
|
||||
RAY_LOG(WARNING) << "Tried to erase non-existent object ID" << object_id;
|
||||
}
|
||||
}
|
||||
|
||||
void CoreWorker::ReportActiveObjectIDs() {
|
||||
absl::MutexLock lock(&object_ref_mu_);
|
||||
// Only send a heartbeat when the set of active object IDs has changed because the
|
||||
// raylet only modifies the set of IDs when it receives a heartbeat.
|
||||
if (active_object_ids_updated_) {
|
||||
@@ -245,7 +244,9 @@ void CoreWorker::ReportActiveObjectIDs() {
|
||||
<< "object IDs are currently in scope. "
|
||||
<< "This may lead to required objects being garbage collected.";
|
||||
}
|
||||
RAY_CHECK_OK(raylet_client_->ReportActiveObjectIDs(active_object_ids_));
|
||||
std::unordered_set<ObjectID> copy;
|
||||
copy.insert(active_object_ids_.begin(), active_object_ids_.end());
|
||||
RAY_CHECK_OK(raylet_client_->ReportActiveObjectIDs(copy));
|
||||
}
|
||||
|
||||
// Reset the timer from the previous expiration time to avoid drift.
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
#ifndef RAY_CORE_WORKER_CORE_WORKER_H
|
||||
#define RAY_CORE_WORKER_CORE_WORKER_H
|
||||
|
||||
#include "absl/base/thread_annotations.h"
|
||||
#include "absl/container/flat_hash_set.h"
|
||||
#include "absl/synchronization/mutex.h"
|
||||
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/core_worker/actor_handle.h"
|
||||
#include "ray/core_worker/common.h"
|
||||
@@ -81,11 +85,11 @@ class CoreWorker {
|
||||
|
||||
// Add this object ID to the set of active object IDs that is sent to the raylet
|
||||
// in the heartbeat messsage.
|
||||
void AddActiveObjectID(const ObjectID &object_id);
|
||||
void AddActiveObjectID(const ObjectID &object_id) LOCKS_EXCLUDED(object_ref_mu_);
|
||||
|
||||
// Remove this object ID from the set of active object IDs that is sent to the raylet
|
||||
// in the heartbeat messsage.
|
||||
void RemoveActiveObjectID(const ObjectID &object_id);
|
||||
void RemoveActiveObjectID(const ObjectID &object_id) LOCKS_EXCLUDED(object_ref_mu_);
|
||||
|
||||
/* Public methods related to storing and retrieving objects. */
|
||||
|
||||
@@ -279,7 +283,7 @@ class CoreWorker {
|
||||
void RunIOService();
|
||||
|
||||
/// Send the list of active object IDs to the raylet.
|
||||
void ReportActiveObjectIDs();
|
||||
void ReportActiveObjectIDs() LOCKS_EXCLUDED(object_ref_mu_);
|
||||
|
||||
/* Private methods related to task submission. */
|
||||
|
||||
@@ -379,12 +383,19 @@ class CoreWorker {
|
||||
// Thread that runs a boost::asio service to process IO events.
|
||||
std::thread io_thread_;
|
||||
|
||||
/* Fields related to ref counting objects. */
|
||||
|
||||
/// Protects access to the set of active object ids. Since this set is updated
|
||||
/// very frequently, it is faster to lock around accesses rather than serialize
|
||||
/// accesses via the event loop.
|
||||
absl::Mutex object_ref_mu_;
|
||||
|
||||
/// Set of object IDs that are in scope in the language worker.
|
||||
std::unordered_set<ObjectID> active_object_ids_;
|
||||
absl::flat_hash_set<ObjectID> active_object_ids_ GUARDED_BY(object_ref_mu_);
|
||||
|
||||
/// Indicates whether or not the active_object_ids map has changed since the
|
||||
/// last time it was sent to the raylet.
|
||||
bool active_object_ids_updated_ = false;
|
||||
bool active_object_ids_updated_ GUARDED_BY(object_ref_mu_) = false;
|
||||
|
||||
/* Fields related to storing and retrieving objects. */
|
||||
|
||||
|
||||
Reference in New Issue
Block a user