diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index e22526229..a5aac7b87 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -10,4 +10,4 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil: c_bool Connect() void Disconnect() c_vector[c_string] GetAllJobInfo() - + c_vector[c_string] GetAllProfileInfo() diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index 81ba5fdfd..7400829eb 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -22,3 +22,6 @@ cdef class GlobalStateAccessor: def get_job_table(self): return self.inner.get().GetAllJobInfo() + + def get_profile_table(self): + return self.inner.get().GetAllProfileInfo() diff --git a/python/ray/state.py b/python/ray/state.py index b22c163c6..8687f7b02 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -428,71 +428,34 @@ class GlobalState: return results - def _profile_table(self, batch_id): - """Get the profile events for a given batch of profile events. + def profile_table(self): + self._check_connected() - Args: - batch_id: An identifier for a batch of profile events. + result = defaultdict(list) + profile_table = self.global_state_accessor.get_profile_table() + for i in range(len(profile_table)): + profile = gcs_utils.ProfileTableData.FromString(profile_table[i]) - Returns: - A list of the profile events for the specified batch. - """ - # TODO(rkn): This method should support limiting the number of log - # events and should also support returning a window of events. - message = self._execute_command(batch_id, "RAY.TABLE_LOOKUP", - gcs_utils.TablePrefix.Value("PROFILE"), - "", batch_id.binary()) + component_type = profile.component_type + component_id = binary_to_hex(profile.component_id) + node_ip_address = profile.node_ip_address - if message is None: - return [] - - gcs_entries = gcs_utils.GcsEntry.FromString(message) - - profile_events = [] - for entry in gcs_entries.entries: - profile_table_message = gcs_utils.ProfileTableData.FromString( - entry) - - component_type = profile_table_message.component_type - component_id = binary_to_hex(profile_table_message.component_id) - node_ip_address = profile_table_message.node_ip_address - - for profile_event_message in profile_table_message.profile_events: + for event in profile.profile_events: try: - extra_data = json.loads(profile_event_message.extra_data) + extra_data = json.loads(event.extra_data) except ValueError: extra_data = {} profile_event = { - "event_type": profile_event_message.event_type, + "event_type": event.event_type, "component_id": component_id, "node_ip_address": node_ip_address, "component_type": component_type, - "start_time": profile_event_message.start_time, - "end_time": profile_event_message.end_time, + "start_time": event.start_time, + "end_time": event.end_time, "extra_data": extra_data } - profile_events.append(profile_event) - - return profile_events - - def profile_table(self): - self._check_connected() - profile_table_keys = self._keys(gcs_utils.TablePrefix_PROFILE_string + - "*") - batch_identifiers_binary = [ - key[len(gcs_utils.TablePrefix_PROFILE_string):] - for key in profile_table_keys - ] - - result = defaultdict(list) - for batch_id in batch_identifiers_binary: - profile_data = self._profile_table(binary_to_object_id(batch_id)) - # Note that if keys are being evicted from Redis, then it is - # possible that the batch will be evicted before we get it. - if len(profile_data) > 0: - component_id = profile_data[0]["component_id"] - result[component_id].extend(profile_data) + result[component_id].append(profile_event) return dict(result) diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 1fc18c669..17cd5cd60 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -570,6 +570,13 @@ class StatsInfoAccessor { const std::shared_ptr &data_ptr, const StatusCallback &callback) = 0; + /// Get all profile info from GCS asynchronously. + /// + /// \param callback Callback that will be called after lookup finished. + /// \return Status + virtual Status AsyncGetAll( + const MultiItemCallback &callback) = 0; + protected: StatsInfoAccessor() = default; }; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index 98a08493e..977e3081f 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -80,5 +80,22 @@ std::vector GlobalStateAccessor::GetAllJobInfo() { return job_table_data; } +std::vector GlobalStateAccessor::GetAllProfileInfo() { + std::vector profile_table_data; + std::promise promise; + auto on_done = [&profile_table_data, &promise]( + const Status &status, + const std::vector &result) { + RAY_CHECK_OK(status); + for (auto &data : result) { + profile_table_data.push_back(data.SerializeAsString()); + } + promise.set_value(true); + }; + RAY_CHECK_OK(gcs_client_->Stats().AsyncGetAll(on_done)); + promise.get_future().get(); + return profile_table_data; +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index c80972173..0191a8261 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -51,6 +51,13 @@ class GlobalStateAccessor { /// protobuf function. std::vector GetAllJobInfo(); + /// Get information of all profiles from GCS Service. + /// + /// \return All profile info. To support multi-language, we serialized each + /// ProfileTableData and returned the serialized string. Where used, it needs to be + /// deserialized with protobuf function. + std::vector GetAllProfileInfo(); + private: /// Whether this client is connected to gcs server. bool is_connected_{false}; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index ec560c927..1420dfa7d 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1068,6 +1068,21 @@ Status ServiceBasedStatsInfoAccessor::AsyncAddProfileData( return Status::OK(); } +Status ServiceBasedStatsInfoAccessor::AsyncGetAll( + const MultiItemCallback &callback) { + RAY_LOG(DEBUG) << "Getting all profile info."; + RAY_CHECK(callback); + rpc::GetAllProfileInfoRequest request; + client_impl_->GetGcsRpcClient().GetAllProfileInfo( + request, + [callback](const Status &status, const rpc::GetAllProfileInfoReply &reply) { + auto result = VectorFromProtobuf(reply.profile_info_list()); + callback(status, result); + RAY_LOG(DEBUG) << "Finished getting all job info."; + }); + return Status::OK(); +} + ServiceBasedErrorInfoAccessor::ServiceBasedErrorInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 66edf2416..af0edba4f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -281,6 +281,8 @@ class ServiceBasedStatsInfoAccessor : public StatsInfoAccessor { Status AsyncAddProfileData(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; + Status AsyncGetAll(const MultiItemCallback &callback) override; + private: ServiceBasedGcsClient *client_impl_; }; diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 308bc8ce6..34f479ab7 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -105,6 +105,21 @@ TEST_F(GlobalStateAccessorTest, TestJobTable) { ASSERT_EQ(global_state_->GetAllJobInfo().size(), job_count); } +TEST_F(GlobalStateAccessorTest, TestProfileTable) { + int profile_count = 100; + ASSERT_EQ(global_state_->GetAllProfileInfo().size(), 0); + for (int index = 0; index < profile_count; ++index) { + auto client_id = ClientID::FromRandom(); + auto profile_table_data = Mocker::GenProfileTableData(client_id); + std::promise promise; + RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData( + profile_table_data, + [&promise](Status status) { promise.set_value(status.ok()); })); + WaitReady(promise.get_future(), timeout_ms_); + } + ASSERT_EQ(global_state_->GetAllProfileInfo().size(), profile_count); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 6fd1f3b0d..9f5bad70f 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -229,7 +229,7 @@ std::unique_ptr GcsServer::InitTaskInfoHandler() { std::unique_ptr GcsServer::InitStatsHandler() { return std::unique_ptr( - new rpc::DefaultStatsHandler(*redis_gcs_client_)); + new rpc::DefaultStatsHandler(gcs_table_storage_)); } std::unique_ptr GcsServer::InitErrorInfoHandler() { diff --git a/src/ray/gcs/gcs_server/stats_handler_impl.cc b/src/ray/gcs/gcs_server/stats_handler_impl.cc index e04560142..7725f1081 100644 --- a/src/ray/gcs/gcs_server/stats_handler_impl.cc +++ b/src/ray/gcs/gcs_server/stats_handler_impl.cc @@ -34,7 +34,8 @@ void DefaultStatsHandler::HandleAddProfileData(const AddProfileDataRequest &requ GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; - Status status = gcs_client_.Stats().AsyncAddProfileData(profile_table_data, on_done); + Status status = gcs_table_storage_->ProfileTable().Put(UniqueID::FromRandom(), + *profile_table_data, on_done); if (!status.ok()) { on_done(status); } @@ -42,5 +43,24 @@ void DefaultStatsHandler::HandleAddProfileData(const AddProfileDataRequest &requ << request.profile_data().component_type() << ", node id = " << node_id; } +void DefaultStatsHandler::HandleGetAllProfileInfo( + const rpc::GetAllProfileInfoRequest &request, rpc::GetAllProfileInfoReply *reply, + rpc::SendReplyCallback send_reply_callback) { + RAY_LOG(DEBUG) << "Getting all profile info."; + auto on_done = [reply, send_reply_callback]( + const std::unordered_map &result) { + for (auto &data : result) { + reply->add_profile_info_list()->CopyFrom(data.second); + } + RAY_LOG(DEBUG) << "Finished getting all profile info."; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + }; + + Status status = gcs_table_storage_->ProfileTable().GetAll(on_done); + if (!status.ok()) { + on_done(std::unordered_map()); + } +} + } // namespace rpc } // namespace ray diff --git a/src/ray/gcs/gcs_server/stats_handler_impl.h b/src/ray/gcs/gcs_server/stats_handler_impl.h index 9868b704d..6ee6a8a26 100644 --- a/src/ray/gcs/gcs_server/stats_handler_impl.h +++ b/src/ray/gcs/gcs_server/stats_handler_impl.h @@ -15,6 +15,7 @@ #ifndef RAY_GCS_STATS_HANDLER_IMPL_H #define RAY_GCS_STATS_HANDLER_IMPL_H +#include "gcs_table_storage.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" @@ -24,15 +25,19 @@ namespace rpc { /// This implementation class of `StatsHandler`. class DefaultStatsHandler : public rpc::StatsHandler { public: - explicit DefaultStatsHandler(gcs::RedisGcsClient &gcs_client) - : gcs_client_(gcs_client) {} + explicit DefaultStatsHandler(std::shared_ptr gcs_table_storage) + : gcs_table_storage_(std::move(gcs_table_storage)) {} void HandleAddProfileData(const AddProfileDataRequest &request, AddProfileDataReply *reply, SendReplyCallback send_reply_callback) override; + void HandleGetAllProfileInfo(const rpc::GetAllProfileInfoRequest &request, + rpc::GetAllProfileInfoReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + private: - gcs::RedisGcsClient &gcs_client_; + std::shared_ptr gcs_table_storage_; }; } // namespace rpc diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index 799fa4e63..71992cb51 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -404,6 +404,10 @@ class RedisStatsInfoAccessor : public StatsInfoAccessor { Status AsyncAddProfileData(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; + Status AsyncGetAll(const MultiItemCallback &callback) override { + return Status::NotImplemented("AsyncGetAll not implemented"); + } + private: RedisGcsClient *client_impl_{nullptr}; }; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 4b68c2d84..70669f19a 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -368,10 +368,20 @@ message AddProfileDataReply { GcsStatus status = 1; } +message GetAllProfileInfoRequest { +} + +message GetAllProfileInfoReply { + GcsStatus status = 1; + repeated ProfileTableData profile_info_list = 2; +} + // Service for stats access. service StatsGcsService { // Add profile data to GCS Service. rpc AddProfileData(AddProfileDataRequest) returns (AddProfileDataReply); + // Get information of all profiles from GCS Service. + rpc GetAllProfileInfo(GetAllProfileInfoRequest) returns (GetAllProfileInfoReply); } message ReportJobErrorRequest { diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 07d9c89ce..0c3dfa007 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -191,6 +191,9 @@ class GcsRpcClient { /// Add profile data to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, AddProfileData, stats_grpc_client_, ) + /// Get information of all profiles from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, GetAllProfileInfo, stats_grpc_client_, ) + /// Report a job error to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError, error_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index a449881c9..dfd4cbc18 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -345,6 +345,10 @@ class StatsGcsServiceHandler { virtual void HandleAddProfileData(const AddProfileDataRequest &request, AddProfileDataReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetAllProfileInfo(const GetAllProfileInfoRequest &request, + GetAllProfileInfoReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `StatsGcsService`. @@ -364,6 +368,7 @@ class StatsGrpcService : public GrpcService { const std::unique_ptr &cq, std::vector> *server_call_factories) override { STATS_SERVICE_RPC_HANDLER(AddProfileData); + STATS_SERVICE_RPC_HANDLER(GetAllProfileInfo); } private: