From ce8170db992e175694c2f0f08dd3d5072f1ed1d2 Mon Sep 17 00:00:00 2001 From: micafan <550435771@qq.com> Date: Fri, 10 Jan 2020 13:55:10 +0800 Subject: [PATCH] [GCS] Add StatsInfoAccessor to GCS Client (#6748) --- src/ray/core_worker/profiling.cc | 33 +++++++++++++++--------- src/ray/core_worker/profiling.h | 2 +- src/ray/gcs/accessor.h | 16 ++++++++++++ src/ray/gcs/gcs_client.h | 8 ++++++ src/ray/gcs/redis_accessor.cc | 15 +++++++++++ src/ray/gcs/redis_accessor.h | 16 ++++++++++++ src/ray/gcs/redis_gcs_client.cc | 1 + src/ray/gcs/redis_gcs_client.h | 6 +++-- src/ray/gcs/tables.cc | 8 ------ src/ray/gcs/tables.h | 8 +----- src/ray/object_manager/object_manager.cc | 10 +++---- src/ray/object_manager/object_manager.h | 2 +- src/ray/raylet/node_manager.cc | 10 +++---- 13 files changed, 94 insertions(+), 41 deletions(-) diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc index acdc7c024..4c8cbede2 100644 --- a/src/ray/core_worker/profiling.cc +++ b/src/ray/core_worker/profiling.cc @@ -18,31 +18,40 @@ Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_add const std::shared_ptr &gcs_client) : io_service_(io_service), timer_(io_service_, boost::asio::chrono::seconds(1)), + rpc_profile_data_(new rpc::ProfileTableData()), gcs_client_(gcs_client) { - rpc_profile_data_.set_component_type(WorkerTypeString(worker_context.GetWorkerType())); - rpc_profile_data_.set_component_id(worker_context.GetWorkerID().Binary()); - rpc_profile_data_.set_node_ip_address(node_ip_address); + rpc_profile_data_->set_component_type(WorkerTypeString(worker_context.GetWorkerType())); + rpc_profile_data_->set_component_id(worker_context.GetWorkerID().Binary()); + rpc_profile_data_->set_node_ip_address(node_ip_address); timer_.async_wait(boost::bind(&Profiler::FlushEvents, this)); } void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) { absl::MutexLock lock(&mutex_); - rpc_profile_data_.add_profile_events()->CopyFrom(event); + rpc_profile_data_->add_profile_events()->CopyFrom(event); } void Profiler::FlushEvents() { - absl::MutexLock lock(&mutex_); - if (rpc_profile_data_.profile_events_size() != 0) { - // TODO(edoakes): this should be migrated to use the new GCS client interface - // instead of the raw table interface once it's ready. - if (!gcs_client_->profile_table().AddProfileEventBatch(rpc_profile_data_).ok()) { + auto cur_profile_data = std::make_shared(); + { + absl::MutexLock lock(&mutex_); + if (rpc_profile_data_->profile_events_size() != 0) { + cur_profile_data->set_component_type(rpc_profile_data_->component_type()); + cur_profile_data->set_component_id(rpc_profile_data_->component_id()); + cur_profile_data->set_node_ip_address(rpc_profile_data_->node_ip_address()); + rpc_profile_data_.swap(cur_profile_data); + } + } + + if (cur_profile_data->profile_events_size() != 0) { + if (!gcs_client_->Stats().AsyncAddProfileData(cur_profile_data, nullptr).ok()) { RAY_LOG(WARNING) << "Failed to push profile events to GCS."; } else { - RAY_LOG(DEBUG) << "Pushed " << rpc_profile_data_.profile_events_size() - << "events to GCS."; + RAY_LOG(DEBUG) << "Pushed " << cur_profile_data->profile_events_size() + << " events to GCS."; } - rpc_profile_data_.clear_profile_events(); } + // Reset the timer to 1 second from the previous expiration time to avoid drift. timer_.expires_at(timer_.expiry() + boost::asio::chrono::seconds(1)); timer_.async_wait(boost::bind(&Profiler::FlushEvents, this)); diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index dc0c92a4f..01ed41b5b 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -35,7 +35,7 @@ class Profiler { // RPC message containing profiling data. Holds the queue of profile events // until they are flushed. - rpc::ProfileTableData rpc_profile_data_ GUARDED_BY(mutex_); + std::shared_ptr rpc_profile_data_ GUARDED_BY(mutex_); // Client to the GCS used to push profile events to it. std::shared_ptr gcs_client_; diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 4f578989f..bdf36f50c 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -473,6 +473,22 @@ class NodeInfoAccessor { NodeInfoAccessor() = default; }; +/// \class StatsInfoAccessor +/// `StatsInfoAccessor` is a sub-interface of `GcsClient`. +/// This class includes all the methods that are related to accessing +/// stats in the GCS. +class StatsInfoAccessor { + public: + virtual ~StatsInfoAccessor() = default; + + virtual Status AsyncAddProfileData( + const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + protected: + StatsInfoAccessor() = default; +}; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_client.h b/src/ray/gcs/gcs_client.h index 5f47ce1fc..d28027c77 100644 --- a/src/ray/gcs/gcs_client.h +++ b/src/ray/gcs/gcs_client.h @@ -95,6 +95,13 @@ class GcsClient : public std::enable_shared_from_this { return *task_accessor_; } + /// Get the sub-interface for accessing stats in GCS. + /// This function is thread safe. + StatsInfoAccessor &Stats() { + RAY_CHECK(stats_accessor_ != nullptr); + return *stats_accessor_; + } + protected: /// Constructor of GcsClient. /// @@ -111,6 +118,7 @@ class GcsClient : public std::enable_shared_from_this { std::unique_ptr object_accessor_; std::unique_ptr node_accessor_; std::unique_ptr task_accessor_; + std::unique_ptr stats_accessor_; }; } // namespace gcs diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index b31005621..06e681c1b 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -589,6 +589,21 @@ Status RedisNodeInfoAccessor::AsyncSubscribeToResources( return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done); } +RedisStatsInfoAccessor::RedisStatsInfoAccessor(RedisGcsClient *client_impl) + : client_impl_(client_impl) {} + +Status RedisStatsInfoAccessor::AsyncAddProfileData( + const std::shared_ptr &data_ptr, const StatusCallback &callback) { + ProfileTable::WriteCallback on_done = nullptr; + if (callback != nullptr) { + on_done = [callback](RedisGcsClient *client, const UniqueID &id, + const ProfileTableData &data) { callback(Status::OK()); }; + } + + ProfileTable &profile_table = client_impl_->profile_table(); + return profile_table.Append(JobID::Nil(), UniqueID::FromRandom(), data_ptr, on_done); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index f4583f2c8..70eb6f532 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -290,6 +290,22 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { HeartbeatBatchSubscriptionExecutor heartbeat_batch_sub_executor_; }; +/// \class RedisStatsInfoAccessor +/// RedisStatsInfoAccessor is an implementation of `StatsInfoAccessor` +/// that uses Redis as the backend storage. +class RedisStatsInfoAccessor : public StatsInfoAccessor { + public: + explicit RedisStatsInfoAccessor(RedisGcsClient *client_impl); + + virtual ~RedisStatsInfoAccessor() = default; + + Status AsyncAddProfileData(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + private: + RedisGcsClient *client_impl_{nullptr}; +}; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 4df428f1c..b169d207b 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -146,6 +146,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { object_accessor_.reset(new RedisObjectInfoAccessor(this)); node_accessor_.reset(new RedisNodeInfoAccessor(this)); task_accessor_.reset(new RedisTaskInfoAccessor(this)); + stats_accessor_.reset(new RedisStatsInfoAccessor(this)); is_connected_ = true; diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 573d563a9..e41934943 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -25,6 +25,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { friend class RedisTaskInfoAccessor; friend class RedisNodeInfoAccessor; friend class RedisObjectInfoAccessor; + friend class RedisStatsInfoAccessor; friend class SubscriptionExecutorTest; friend class LogSubscribeTestHelper; friend class LogLookupTestHelper; @@ -65,7 +66,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { // TODO: Some API for getting the error on the driver ErrorTable &error_table(); - ProfileTable &profile_table(); // We also need something to export generic code to run on workers from the // driver (to set the PYTHONPATH) @@ -94,7 +94,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { ActorCheckpointIdTable &actor_checkpoint_id_table(); /// This method will be deprecated, use method Jobs() instead. JobTable &job_table(); - /// This method will be deprecated, use method Objects() instead + /// This method will be deprecated, use method Objects() instead. ObjectTable &object_table(); /// The following four methods will be deprecated, use method Nodes() instead. ClientTable &client_table(); @@ -105,6 +105,8 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { raylet::TaskTable &raylet_task_table(); TaskLeaseTable &task_lease_table(); TaskReconstructionLog &task_reconstruction_log(); + /// This method will be deprecated, use method Stats() instead. + ProfileTable &profile_table(); // GCS command type. If CommandType::kChain, chain-replicated versions of the tables // might be used, if available. diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index d9c3d40bf..b267b026f 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -529,14 +529,6 @@ std::string ErrorTable::DebugString() const { return Log::DebugString(); } -Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events) { - // TODO(hchen): Change the parameter to shared_ptr to avoid copying data. - auto data = std::make_shared(); - data->CopyFrom(profile_events); - return Append(JobID::Nil(), UniqueID::FromRandom(), data, - /*done_callback=*/nullptr); -} - std::string ProfileTable::DebugString() const { return Log::DebugString(); } diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 05c9f35a0..ca4f023f5 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -842,7 +842,7 @@ class ErrorTable : private Log { std::string DebugString() const; }; -class ProfileTable : private Log { +class ProfileTable : public Log { public: ProfileTable(const std::vector> &contexts, RedisGcsClient *client) @@ -850,12 +850,6 @@ class ProfileTable : private Log { prefix_ = TablePrefix::PROFILE; }; - /// Add a batch of profiling events to the profile table. - /// - /// \param profile_events The profile events to record. - /// \return Status. - Status AddProfileEventBatch(const ProfileTableData &profile_events); - /// Returns debug string for class. /// /// \return string. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 3a5c7276a..66a305d1d 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -804,15 +804,15 @@ std::shared_ptr ObjectManager::GetRpcClient( return it->second; } -rpc::ProfileTableData ObjectManager::GetAndResetProfilingInfo() { - rpc::ProfileTableData profile_info; - profile_info.set_component_type("object_manager"); - profile_info.set_component_id(self_node_id_.Binary()); +std::shared_ptr ObjectManager::GetAndResetProfilingInfo() { + auto profile_info = std::make_shared(); + profile_info->set_component_type("object_manager"); + profile_info->set_component_id(self_node_id_.Binary()); { std::lock_guard lock(profile_mutex_); for (auto const &profile_event : profile_events_) { - profile_info.add_profile_events()->CopyFrom(profile_event); + profile_info->add_profile_events()->CopyFrom(profile_event); } profile_events_.clear(); } diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 8a1f1ca5d..7d5597c1c 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -235,7 +235,7 @@ class ObjectManager : public ObjectManagerInterface, /// /// \return All profiling information that has accumulated since the last call /// to this method. - rpc::ProfileTableData GetAndResetProfilingInfo(); + std::shared_ptr GetAndResetProfilingInfo(); /// Returns debug string for class. /// diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c6aab817e..340085acc 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -402,8 +402,8 @@ void NodeManager::GetObjectManagerProfileInfo() { auto profile_info = object_manager_.GetAndResetProfilingInfo(); - if (profile_info.profile_events_size() > 0) { - RAY_CHECK_OK(gcs_client_->profile_table().AddProfileEventBatch(profile_info)); + if (profile_info->profile_events_size() > 0) { + RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_info, nullptr)); } // Reset the timer. @@ -905,10 +905,10 @@ void NodeManager::ProcessClientMessage( } break; case protocol::MessageType::PushProfileEventsRequest: { auto fbs_message = flatbuffers::GetRoot(message_data); - rpc::ProfileTableData profile_table_data; + auto profile_table_data = std::make_shared(); RAY_CHECK( - profile_table_data.ParseFromArray(fbs_message->data(), fbs_message->size())); - RAY_CHECK_OK(gcs_client_->profile_table().AddProfileEventBatch(profile_table_data)); + profile_table_data->ParseFromArray(fbs_message->data(), fbs_message->size())); + RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_table_data, nullptr)); } break; case protocol::MessageType::FreeObjectsInObjectStoreRequest: { auto message = flatbuffers::GetRoot(message_data);