[GCS]profile info getting implementation based gcs service (#8536)

This commit is contained in:
Tao Wang
2020-05-24 22:23:01 +08:00
committed by GitHub
parent 822de1b7f7
commit 92c2e41dfd
16 changed files with 134 additions and 58 deletions
@@ -10,4 +10,4 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
c_bool Connect()
void Disconnect()
c_vector[c_string] GetAllJobInfo()
c_vector[c_string] GetAllProfileInfo()
@@ -22,3 +22,6 @@ cdef class GlobalStateAccessor:
def get_job_table(self):
return self.inner.get().GetAllJobInfo()
def get_profile_table(self):
return self.inner.get().GetAllProfileInfo()
+15 -52
View File
@@ -428,71 +428,34 @@ class GlobalState:
return results
def _profile_table(self, batch_id):
"""Get the profile events for a given batch of profile events.
def profile_table(self):
self._check_connected()
Args:
batch_id: An identifier for a batch of profile events.
result = defaultdict(list)
profile_table = self.global_state_accessor.get_profile_table()
for i in range(len(profile_table)):
profile = gcs_utils.ProfileTableData.FromString(profile_table[i])
Returns:
A list of the profile events for the specified batch.
"""
# TODO(rkn): This method should support limiting the number of log
# events and should also support returning a window of events.
message = self._execute_command(batch_id, "RAY.TABLE_LOOKUP",
gcs_utils.TablePrefix.Value("PROFILE"),
"", batch_id.binary())
component_type = profile.component_type
component_id = binary_to_hex(profile.component_id)
node_ip_address = profile.node_ip_address
if message is None:
return []
gcs_entries = gcs_utils.GcsEntry.FromString(message)
profile_events = []
for entry in gcs_entries.entries:
profile_table_message = gcs_utils.ProfileTableData.FromString(
entry)
component_type = profile_table_message.component_type
component_id = binary_to_hex(profile_table_message.component_id)
node_ip_address = profile_table_message.node_ip_address
for profile_event_message in profile_table_message.profile_events:
for event in profile.profile_events:
try:
extra_data = json.loads(profile_event_message.extra_data)
extra_data = json.loads(event.extra_data)
except ValueError:
extra_data = {}
profile_event = {
"event_type": profile_event_message.event_type,
"event_type": event.event_type,
"component_id": component_id,
"node_ip_address": node_ip_address,
"component_type": component_type,
"start_time": profile_event_message.start_time,
"end_time": profile_event_message.end_time,
"start_time": event.start_time,
"end_time": event.end_time,
"extra_data": extra_data
}
profile_events.append(profile_event)
return profile_events
def profile_table(self):
self._check_connected()
profile_table_keys = self._keys(gcs_utils.TablePrefix_PROFILE_string +
"*")
batch_identifiers_binary = [
key[len(gcs_utils.TablePrefix_PROFILE_string):]
for key in profile_table_keys
]
result = defaultdict(list)
for batch_id in batch_identifiers_binary:
profile_data = self._profile_table(binary_to_object_id(batch_id))
# Note that if keys are being evicted from Redis, then it is
# possible that the batch will be evicted before we get it.
if len(profile_data) > 0:
component_id = profile_data[0]["component_id"]
result[component_id].extend(profile_data)
result[component_id].append(profile_event)
return dict(result)
+7
View File
@@ -570,6 +570,13 @@ class StatsInfoAccessor {
const std::shared_ptr<rpc::ProfileTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Get all profile info from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finished.
/// \return Status
virtual Status AsyncGetAll(
const MultiItemCallback<rpc::ProfileTableData> &callback) = 0;
protected:
StatsInfoAccessor() = default;
};
@@ -80,5 +80,22 @@ std::vector<std::string> GlobalStateAccessor::GetAllJobInfo() {
return job_table_data;
}
std::vector<std::string> GlobalStateAccessor::GetAllProfileInfo() {
std::vector<std::string> profile_table_data;
std::promise<bool> promise;
auto on_done = [&profile_table_data, &promise](
const Status &status,
const std::vector<rpc::ProfileTableData> &result) {
RAY_CHECK_OK(status);
for (auto &data : result) {
profile_table_data.push_back(data.SerializeAsString());
}
promise.set_value(true);
};
RAY_CHECK_OK(gcs_client_->Stats().AsyncGetAll(on_done));
promise.get_future().get();
return profile_table_data;
}
} // namespace gcs
} // namespace ray
@@ -51,6 +51,13 @@ class GlobalStateAccessor {
/// protobuf function.
std::vector<std::string> GetAllJobInfo();
/// Get information of all profiles from GCS Service.
///
/// \return All profile info. To support multi-language, we serialized each
/// ProfileTableData and returned the serialized string. Where used, it needs to be
/// deserialized with protobuf function.
std::vector<std::string> GetAllProfileInfo();
private:
/// Whether this client is connected to gcs server.
bool is_connected_{false};
@@ -1068,6 +1068,21 @@ Status ServiceBasedStatsInfoAccessor::AsyncAddProfileData(
return Status::OK();
}
Status ServiceBasedStatsInfoAccessor::AsyncGetAll(
const MultiItemCallback<rpc::ProfileTableData> &callback) {
RAY_LOG(DEBUG) << "Getting all profile info.";
RAY_CHECK(callback);
rpc::GetAllProfileInfoRequest request;
client_impl_->GetGcsRpcClient().GetAllProfileInfo(
request,
[callback](const Status &status, const rpc::GetAllProfileInfoReply &reply) {
auto result = VectorFromProtobuf(reply.profile_info_list());
callback(status, result);
RAY_LOG(DEBUG) << "Finished getting all job info.";
});
return Status::OK();
}
ServiceBasedErrorInfoAccessor::ServiceBasedErrorInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
@@ -281,6 +281,8 @@ class ServiceBasedStatsInfoAccessor : public StatsInfoAccessor {
Status AsyncAddProfileData(const std::shared_ptr<rpc::ProfileTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncGetAll(const MultiItemCallback<rpc::ProfileTableData> &callback) override;
private:
ServiceBasedGcsClient *client_impl_;
};
@@ -105,6 +105,21 @@ TEST_F(GlobalStateAccessorTest, TestJobTable) {
ASSERT_EQ(global_state_->GetAllJobInfo().size(), job_count);
}
TEST_F(GlobalStateAccessorTest, TestProfileTable) {
int profile_count = 100;
ASSERT_EQ(global_state_->GetAllProfileInfo().size(), 0);
for (int index = 0; index < profile_count; ++index) {
auto client_id = ClientID::FromRandom();
auto profile_table_data = Mocker::GenProfileTableData(client_id);
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(
profile_table_data,
[&promise](Status status) { promise.set_value(status.ok()); }));
WaitReady(promise.get_future(), timeout_ms_);
}
ASSERT_EQ(global_state_->GetAllProfileInfo().size(), profile_count);
}
} // namespace ray
int main(int argc, char **argv) {
+1 -1
View File
@@ -229,7 +229,7 @@ std::unique_ptr<rpc::TaskInfoHandler> GcsServer::InitTaskInfoHandler() {
std::unique_ptr<rpc::StatsHandler> GcsServer::InitStatsHandler() {
return std::unique_ptr<rpc::DefaultStatsHandler>(
new rpc::DefaultStatsHandler(*redis_gcs_client_));
new rpc::DefaultStatsHandler(gcs_table_storage_));
}
std::unique_ptr<rpc::ErrorInfoHandler> GcsServer::InitErrorInfoHandler() {
+21 -1
View File
@@ -34,7 +34,8 @@ void DefaultStatsHandler::HandleAddProfileData(const AddProfileDataRequest &requ
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Stats().AsyncAddProfileData(profile_table_data, on_done);
Status status = gcs_table_storage_->ProfileTable().Put(UniqueID::FromRandom(),
*profile_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
@@ -42,5 +43,24 @@ void DefaultStatsHandler::HandleAddProfileData(const AddProfileDataRequest &requ
<< request.profile_data().component_type() << ", node id = " << node_id;
}
void DefaultStatsHandler::HandleGetAllProfileInfo(
const rpc::GetAllProfileInfoRequest &request, rpc::GetAllProfileInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all profile info.";
auto on_done = [reply, send_reply_callback](
const std::unordered_map<UniqueID, ProfileTableData> &result) {
for (auto &data : result) {
reply->add_profile_info_list()->CopyFrom(data.second);
}
RAY_LOG(DEBUG) << "Finished getting all profile info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
Status status = gcs_table_storage_->ProfileTable().GetAll(on_done);
if (!status.ok()) {
on_done(std::unordered_map<UniqueID, ProfileTableData>());
}
}
} // namespace rpc
} // namespace ray
+8 -3
View File
@@ -15,6 +15,7 @@
#ifndef RAY_GCS_STATS_HANDLER_IMPL_H
#define RAY_GCS_STATS_HANDLER_IMPL_H
#include "gcs_table_storage.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
@@ -24,15 +25,19 @@ namespace rpc {
/// This implementation class of `StatsHandler`.
class DefaultStatsHandler : public rpc::StatsHandler {
public:
explicit DefaultStatsHandler(gcs::RedisGcsClient &gcs_client)
: gcs_client_(gcs_client) {}
explicit DefaultStatsHandler(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: gcs_table_storage_(std::move(gcs_table_storage)) {}
void HandleAddProfileData(const AddProfileDataRequest &request,
AddProfileDataReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetAllProfileInfo(const rpc::GetAllProfileInfoRequest &request,
rpc::GetAllProfileInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
private:
gcs::RedisGcsClient &gcs_client_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
};
} // namespace rpc
+4
View File
@@ -404,6 +404,10 @@ class RedisStatsInfoAccessor : public StatsInfoAccessor {
Status AsyncAddProfileData(const std::shared_ptr<ProfileTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncGetAll(const MultiItemCallback<rpc::ProfileTableData> &callback) override {
return Status::NotImplemented("AsyncGetAll not implemented");
}
private:
RedisGcsClient *client_impl_{nullptr};
};
+10
View File
@@ -368,10 +368,20 @@ message AddProfileDataReply {
GcsStatus status = 1;
}
message GetAllProfileInfoRequest {
}
message GetAllProfileInfoReply {
GcsStatus status = 1;
repeated ProfileTableData profile_info_list = 2;
}
// Service for stats access.
service StatsGcsService {
// Add profile data to GCS Service.
rpc AddProfileData(AddProfileDataRequest) returns (AddProfileDataReply);
// Get information of all profiles from GCS Service.
rpc GetAllProfileInfo(GetAllProfileInfoRequest) returns (GetAllProfileInfoReply);
}
message ReportJobErrorRequest {
+3
View File
@@ -191,6 +191,9 @@ class GcsRpcClient {
/// Add profile data to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, AddProfileData, stats_grpc_client_, )
/// Get information of all profiles from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, GetAllProfileInfo, stats_grpc_client_, )
/// Report a job error to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError,
error_info_grpc_client_, )
+5
View File
@@ -345,6 +345,10 @@ class StatsGcsServiceHandler {
virtual void HandleAddProfileData(const AddProfileDataRequest &request,
AddProfileDataReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetAllProfileInfo(const GetAllProfileInfoRequest &request,
GetAllProfileInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `StatsGcsService`.
@@ -364,6 +368,7 @@ class StatsGrpcService : public GrpcService {
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
STATS_SERVICE_RPC_HANDLER(AddProfileData);
STATS_SERVICE_RPC_HANDLER(GetAllProfileInfo);
}
private: