[EVENT] Basic Function and Definition (#9657)

This commit is contained in:
Basasuya
2020-08-11 17:36:07 +08:00
committed by GitHub
parent 65848201c3
commit 0400a88bf1
7 changed files with 458 additions and 0 deletions
+11
View File
@@ -804,6 +804,16 @@ cc_test(
],
)
cc_test(
name = "event_test",
srcs = ["src/ray/util/event_test.cc"],
copts = COPTS,
deps = [
":ray_util",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "filesystem_test",
srcs = ["src/ray/util/filesystem_test.cc"],
@@ -1278,6 +1288,7 @@ cc_library(
visibility = ["//visibility:public"],
deps = [
":sha256",
"//src/ray/protobuf:event_cc_proto",
"@boost//:asio",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/synchronization",
+9
View File
@@ -119,3 +119,12 @@ cc_proto_library(
deps = ["serialization_proto"],
)
proto_library(
name = "event_proto",
srcs = ["event.proto"],
)
cc_proto_library(
name = "event_cc_proto",
deps = [":event_proto"],
)
+42
View File
@@ -0,0 +1,42 @@
syntax = "proto3";
package ray.rpc;
message Event {
enum SourceType {
// The COMMON type is designed for some process that does not init there own
// source_type.
// We suppose that there will be some roles in addition to GCS, RAYLET, CORE_WORKER.
COMMON = 0;
CORE_WORKER = 1;
GCS = 2;
RAYLET = 3;
}
enum Severity {
INFO = 0;
WARNING = 1;
ERROR = 2;
FATAL = 3;
}
// event_id is the unique ID of this event
string event_id = 1;
// source type is the type of the source
SourceType source_type = 2;
// source_hostname is the hostname of the source
string source_hostname = 3;
// source_pid is the process pid of the source
int32 source_pid = 4;
// severity is the event severity level
Severity severity = 5;
// label describes some important information about this event, always use for the
// searching index and tag
string label = 6;
// message is main information of this event
string message = 7;
// timestamp is the report millionseconds since 00:00, Jan 1 1970 UTC
int64 timestamp = 8;
// store custom key such as node_id, job_id, task_id
map<string, string> custom_fields = 9;
}
+113
View File
@@ -0,0 +1,113 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/util/event.h"
namespace ray {
///
/// EventManager
///
EventManager &EventManager::Instance() {
static EventManager instance_;
return instance_;
}
bool EventManager::IsEmpty() { return reporter_map_.empty(); }
void EventManager::Publish(const rpc::Event &event) {
for (const auto &element : reporter_map_) {
(element.second)->Report(event);
}
}
void EventManager::AddReporter(std::shared_ptr<BaseEventReporter> reporter) {
reporter_map_.emplace(reporter->GetReporterKey(), reporter);
}
void EventManager::ClearReporters() { reporter_map_.clear(); }
///
/// RayEventContext
///
thread_local std::unique_ptr<RayEventContext> RayEventContext::context_ = nullptr;
RayEventContext &RayEventContext::Instance() {
if (context_ == nullptr) {
context_ = std::unique_ptr<RayEventContext>(new RayEventContext());
}
return *context_;
}
void RayEventContext::SetEventContext(
rpc::Event_SourceType source_type,
const std::unordered_map<std::string, std::string> &custom_fields) {
source_type_ = source_type;
custom_fields_ = custom_fields;
}
void RayEventContext::ResetEventContext() {
source_type_ = rpc::Event_SourceType::Event_SourceType_COMMON;
custom_fields_.clear();
}
void RayEventContext::SetCustomFields(const std::string &key, const std::string &value) {
custom_fields_[key] = value;
}
void RayEventContext::SetCustomFields(
const std::unordered_map<std::string, std::string> &custom_fields) {
custom_fields_ = custom_fields;
}
///
/// RayEvent
///
void RayEvent::ReportEvent(const std::string &severity, const std::string &label,
const std::string &message) {
rpc::Event_Severity severity_ele =
rpc::Event_Severity::Event_Severity_Event_Severity_INT_MIN_SENTINEL_DO_NOT_USE_;
RAY_CHECK(rpc::Event_Severity_Parse(severity, &severity_ele));
RayEvent(severity_ele, label) << message;
}
RayEvent::~RayEvent() { SendMessage(osstream_.str()); }
void RayEvent::SendMessage(const std::string &message) {
RAY_CHECK(rpc::Event_SourceType_IsValid(RayEventContext::Instance().GetSourceType()));
RAY_CHECK(rpc::Event_Severity_IsValid(severity_));
if (EventManager::Instance().IsEmpty()) {
return;
}
rpc::Event event;
std::string event_id_buffer = std::string(18, ' ');
FillRandom(&event_id_buffer);
event.set_event_id(StringToHex(event_id_buffer));
event.set_source_type(RayEventContext::Instance().GetSourceType());
event.set_source_hostname(RayEventContext::Instance().GetSourceHostname());
event.set_source_pid(RayEventContext::Instance().GetSourcePid());
event.set_severity(severity_);
event.set_label(label_);
event.set_message(message);
event.set_timestamp(current_sys_time_us());
auto mp = RayEventContext::Instance().GetCustomFields();
event.mutable_custom_fields()->insert(mp.begin(), mp.end());
EventManager::Instance().Publish(event);
}
} // namespace ray
+149
View File
@@ -0,0 +1,149 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <boost/asio/ip/host_name.hpp>
#include <cmath>
#include <cstring>
#include <iomanip>
#include <iostream>
#include <memory>
#include <sstream>
#include <vector>
#include "ray/util/logging.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/event.pb.h"
namespace ray {
#define RAY_EVENT(event_type, label) \
::ray::RayEvent(::ray::rpc::Event_Severity::Event_Severity_##event_type, label)
// interface of event reporter
class BaseEventReporter {
public:
virtual void Init() = 0;
virtual void Report(const rpc::Event &event) = 0;
virtual void Close() = 0;
virtual std::string GetReporterKey() = 0;
};
// store the reporters, add reporters and clean reporters
class EventManager final {
public:
static EventManager &Instance();
bool IsEmpty();
void Publish(const rpc::Event &event);
// NOTE(ruoqiu) AddReporters, ClearPeporters (along with the Pushlish function) would
// not be thread-safe. But we assume default initialization and shutdown are placed in
// the construction and destruction of a resident class, or at the beginning and end of
// a process.
void AddReporter(std::shared_ptr<BaseEventReporter> reporter);
void ClearReporters();
private:
EventManager() = default;
EventManager(const EventManager &manager) = delete;
const EventManager &operator=(const EventManager &manager) = delete;
protected:
std::unordered_map<std::string, std::shared_ptr<BaseEventReporter>> reporter_map_;
};
// store the event context. Different workers of a process in core_worker have different
// contexts, so a singleton of thread_local needs to be maintained
class RayEventContext final {
public:
static RayEventContext &Instance();
void SetEventContext(rpc::Event_SourceType source_type,
const std::unordered_map<std::string, std::string> &custom_fields =
std::unordered_map<std::string, std::string>());
void SetCustomFields(const std::string &key, const std::string &value);
void SetCustomFields(const std::unordered_map<std::string, std::string> &custom_fields);
void ResetEventContext();
inline const rpc::Event_SourceType &GetSourceType() const { return source_type_; }
inline const std::string &GetSourceHostname() const { return source_hostname_; }
inline int32_t GetSourcePid() const { return source_pid_; }
inline const std::unordered_map<std::string, std::string> &GetCustomFields() const {
return custom_fields_;
}
private:
RayEventContext() {}
RayEventContext(const RayEventContext &event_context) = delete;
const RayEventContext &operator=(const RayEventContext &event_context) = delete;
private:
rpc::Event_SourceType source_type_ = rpc::Event_SourceType::Event_SourceType_COMMON;
std::string source_hostname_ = boost::asio::ip::host_name();
int32_t source_pid_ = getpid();
std::unordered_map<std::string, std::string> custom_fields_;
static thread_local std::unique_ptr<RayEventContext> context_;
};
// when the RayEvent is deconstructed, the context information is obtained from the
// RayEventContext, then the Event structure is generated and pushed to the EventManager
// for sending
class RayEvent {
public:
RayEvent(rpc::Event_Severity severity, const std::string &label)
: severity_(severity), label_(label) {}
template <typename T>
RayEvent &operator<<(const T &t) {
osstream_ << t;
return *this;
}
static void ReportEvent(const std::string &severity, const std::string &label,
const std::string &message);
~RayEvent();
private:
RayEvent() = default;
void SendMessage(const std::string &message);
RayEvent(const RayEvent &event) = delete;
const RayEvent &operator=(const RayEvent &event) = delete;
private:
rpc::Event_Severity severity_;
std::string label_;
std::ostringstream osstream_;
};
} // namespace ray
+115
View File
@@ -0,0 +1,115 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/util/event.h"
#include <fstream>
#include <set>
#include <thread>
#include "gtest/gtest.h"
namespace ray {
class TestEventReporter : public BaseEventReporter {
public:
virtual void Init() override {}
virtual void Report(const rpc::Event &event) override { event_list.push_back(event); }
virtual void Close() override {}
virtual ~TestEventReporter() {}
virtual std::string GetReporterKey() override { return "test.event.reporter"; }
public:
static std::vector<rpc::Event> event_list;
};
std::vector<rpc::Event> TestEventReporter::event_list = std::vector<rpc::Event>();
void CheckEventDetail(rpc::Event &event, std::string job_id, std::string node_id,
std::string task_id, std::string source_type, std::string severity,
std::string label, std::string message) {
int custom_key_num = 0;
auto mp = (*event.mutable_custom_fields());
if (job_id != "") {
EXPECT_EQ(mp["job_id"], job_id);
custom_key_num++;
}
if (node_id != "") {
EXPECT_EQ(mp["node_id"], node_id);
custom_key_num++;
}
if (task_id != "") {
EXPECT_EQ(mp["task_id"], task_id);
custom_key_num++;
}
EXPECT_EQ(mp.size(), custom_key_num);
EXPECT_EQ(rpc::Event_SourceType_Name(event.source_type()), source_type);
EXPECT_EQ(rpc::Event_Severity_Name(event.severity()), severity);
EXPECT_EQ(event.label(), label);
EXPECT_EQ(event.message(), message);
EXPECT_EQ(event.source_pid(), getpid());
}
TEST(EVENT_TEST, TEST_BASIC) {
TestEventReporter::event_list.clear();
ray::EventManager::Instance().ClearReporters();
RAY_EVENT(WARNING, "label") << "test for empty reporters";
// If there are no reporters, it would not Publish event
EXPECT_EQ(TestEventReporter::event_list.size(), 0);
ray::EventManager::Instance().AddReporter(std::make_shared<TestEventReporter>());
RAY_EVENT(WARNING, "label 0") << "send message 0";
ray::RayEventContext::Instance().SetEventContext(
rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
std::unordered_map<std::string, std::string>(
{{"node_id", "node 1"}, {"job_id", "job 1"}, {"task_id", "task 1"}}));
RAY_EVENT(INFO, "label 1") << "send message 1";
ray::RayEventContext::Instance().SetEventContext(
rpc::Event_SourceType::Event_SourceType_RAYLET,
std::unordered_map<std::string, std::string>(
{{"node_id", "node 2"}, {"job_id", "job 2"}}));
RAY_EVENT(ERROR, "label 2") << "send message 2 "
<< "send message again";
ray::RayEventContext::Instance().SetEventContext(
rpc::Event_SourceType::Event_SourceType_GCS);
RAY_EVENT(FATAL, "") << "";
std::vector<rpc::Event> &result = TestEventReporter::event_list;
EXPECT_EQ(result.size(), 4);
CheckEventDetail(result[0], "", "", "", "COMMON", "WARNING", "label 0",
"send message 0");
CheckEventDetail(result[1], "job 1", "node 1", "task 1", "CORE_WORKER", "INFO",
"label 1", "send message 1");
CheckEventDetail(result[2], "job 2", "node 2", "", "RAYLET", "ERROR", "label 2",
"send message 2 send message again");
CheckEventDetail(result[3], "", "", "", "GCS", "FATAL", "", "");
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+19
View File
@@ -43,6 +43,18 @@ class stream_protocol;
enum class CommandLineSyntax { System, POSIX, Windows };
// Transfer the string to the Hex format. It can be more readable than the ANSI mode
inline std::string StringToHex(const std::string &str) {
constexpr char hex[] = "0123456789abcdef";
std::string result;
for (size_t i = 0; i < str.size(); i++) {
unsigned char val = str[i];
result.push_back(hex[val >> 4]);
result.push_back(hex[val & 0xf]);
}
return result;
}
/// Return the number of milliseconds since the steady clock epoch. NOTE: The
/// returned timestamp may be used for accurately measuring intervals but has
/// no relation to wall clock time. It must not be used for synchronization
@@ -66,6 +78,13 @@ inline int64_t current_sys_time_ms() {
return ms_since_epoch.count();
}
inline int64_t current_sys_time_us() {
std::chrono::microseconds mu_since_epoch =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch());
return mu_since_epoch.count();
}
/// A helper function to parse command-line arguments in a platform-compatible manner.
///
/// \param cmdline The command-line to split.