diff --git a/BUILD.bazel b/BUILD.bazel index 7fc231602..d720049c4 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -804,6 +804,16 @@ cc_test( ], ) +cc_test( + name = "event_test", + srcs = ["src/ray/util/event_test.cc"], + copts = COPTS, + deps = [ + ":ray_util", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "filesystem_test", srcs = ["src/ray/util/filesystem_test.cc"], @@ -1278,6 +1288,7 @@ cc_library( visibility = ["//visibility:public"], deps = [ ":sha256", + "//src/ray/protobuf:event_cc_proto", "@boost//:asio", "@com_github_google_glog//:glog", "@com_google_absl//absl/synchronization", diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index e58b645b7..1e7c3c982 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -119,3 +119,12 @@ cc_proto_library( deps = ["serialization_proto"], ) +proto_library( + name = "event_proto", + srcs = ["event.proto"], +) + +cc_proto_library( + name = "event_cc_proto", + deps = [":event_proto"], +) diff --git a/src/ray/protobuf/event.proto b/src/ray/protobuf/event.proto new file mode 100644 index 000000000..7dfd61c27 --- /dev/null +++ b/src/ray/protobuf/event.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +package ray.rpc; + +message Event { + enum SourceType { + // The COMMON type is designed for some process that does not init there own + // source_type. + // We suppose that there will be some roles in addition to GCS, RAYLET, CORE_WORKER. + COMMON = 0; + CORE_WORKER = 1; + GCS = 2; + RAYLET = 3; + } + + enum Severity { + INFO = 0; + WARNING = 1; + ERROR = 2; + FATAL = 3; + } + + // event_id is the unique ID of this event + string event_id = 1; + // source type is the type of the source + SourceType source_type = 2; + // source_hostname is the hostname of the source + string source_hostname = 3; + // source_pid is the process pid of the source + int32 source_pid = 4; + // severity is the event severity level + Severity severity = 5; + // label describes some important information about this event, always use for the + // searching index and tag + string label = 6; + // message is main information of this event + string message = 7; + // timestamp is the report millionseconds since 00:00, Jan 1 1970 UTC + int64 timestamp = 8; + // store custom key such as node_id, job_id, task_id + map custom_fields = 9; +} diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc new file mode 100644 index 000000000..2701804e6 --- /dev/null +++ b/src/ray/util/event.cc @@ -0,0 +1,113 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/util/event.h" + +namespace ray { +/// +/// EventManager +/// +EventManager &EventManager::Instance() { + static EventManager instance_; + return instance_; +} + +bool EventManager::IsEmpty() { return reporter_map_.empty(); } + +void EventManager::Publish(const rpc::Event &event) { + for (const auto &element : reporter_map_) { + (element.second)->Report(event); + } +} + +void EventManager::AddReporter(std::shared_ptr reporter) { + reporter_map_.emplace(reporter->GetReporterKey(), reporter); +} + +void EventManager::ClearReporters() { reporter_map_.clear(); } +/// +/// RayEventContext +/// +thread_local std::unique_ptr RayEventContext::context_ = nullptr; + +RayEventContext &RayEventContext::Instance() { + if (context_ == nullptr) { + context_ = std::unique_ptr(new RayEventContext()); + } + return *context_; +} + +void RayEventContext::SetEventContext( + rpc::Event_SourceType source_type, + const std::unordered_map &custom_fields) { + source_type_ = source_type; + custom_fields_ = custom_fields; +} + +void RayEventContext::ResetEventContext() { + source_type_ = rpc::Event_SourceType::Event_SourceType_COMMON; + custom_fields_.clear(); +} + +void RayEventContext::SetCustomFields(const std::string &key, const std::string &value) { + custom_fields_[key] = value; +} + +void RayEventContext::SetCustomFields( + const std::unordered_map &custom_fields) { + custom_fields_ = custom_fields; +} +/// +/// RayEvent +/// +void RayEvent::ReportEvent(const std::string &severity, const std::string &label, + const std::string &message) { + rpc::Event_Severity severity_ele = + rpc::Event_Severity::Event_Severity_Event_Severity_INT_MIN_SENTINEL_DO_NOT_USE_; + RAY_CHECK(rpc::Event_Severity_Parse(severity, &severity_ele)); + RayEvent(severity_ele, label) << message; +} + +RayEvent::~RayEvent() { SendMessage(osstream_.str()); } + +void RayEvent::SendMessage(const std::string &message) { + RAY_CHECK(rpc::Event_SourceType_IsValid(RayEventContext::Instance().GetSourceType())); + RAY_CHECK(rpc::Event_Severity_IsValid(severity_)); + + if (EventManager::Instance().IsEmpty()) { + return; + } + + rpc::Event event; + + std::string event_id_buffer = std::string(18, ' '); + FillRandom(&event_id_buffer); + event.set_event_id(StringToHex(event_id_buffer)); + + event.set_source_type(RayEventContext::Instance().GetSourceType()); + event.set_source_hostname(RayEventContext::Instance().GetSourceHostname()); + event.set_source_pid(RayEventContext::Instance().GetSourcePid()); + + event.set_severity(severity_); + event.set_label(label_); + event.set_message(message); + event.set_timestamp(current_sys_time_us()); + + auto mp = RayEventContext::Instance().GetCustomFields(); + event.mutable_custom_fields()->insert(mp.begin(), mp.end()); + + EventManager::Instance().Publish(event); +} + +} // namespace ray diff --git a/src/ray/util/event.h b/src/ray/util/event.h new file mode 100644 index 000000000..8a9243a6a --- /dev/null +++ b/src/ray/util/event.h @@ -0,0 +1,149 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include "ray/util/logging.h" +#include "ray/util/util.h" +#include "src/ray/protobuf/event.pb.h" + +namespace ray { + +#define RAY_EVENT(event_type, label) \ + ::ray::RayEvent(::ray::rpc::Event_Severity::Event_Severity_##event_type, label) + +// interface of event reporter +class BaseEventReporter { + public: + virtual void Init() = 0; + + virtual void Report(const rpc::Event &event) = 0; + + virtual void Close() = 0; + + virtual std::string GetReporterKey() = 0; +}; + +// store the reporters, add reporters and clean reporters +class EventManager final { + public: + static EventManager &Instance(); + + bool IsEmpty(); + + void Publish(const rpc::Event &event); + + // NOTE(ruoqiu) AddReporters, ClearPeporters (along with the Pushlish function) would + // not be thread-safe. But we assume default initialization and shutdown are placed in + // the construction and destruction of a resident class, or at the beginning and end of + // a process. + void AddReporter(std::shared_ptr reporter); + + void ClearReporters(); + + private: + EventManager() = default; + + EventManager(const EventManager &manager) = delete; + + const EventManager &operator=(const EventManager &manager) = delete; + + protected: + std::unordered_map> reporter_map_; +}; + +// store the event context. Different workers of a process in core_worker have different +// contexts, so a singleton of thread_local needs to be maintained +class RayEventContext final { + public: + static RayEventContext &Instance(); + + void SetEventContext(rpc::Event_SourceType source_type, + const std::unordered_map &custom_fields = + std::unordered_map()); + + void SetCustomFields(const std::string &key, const std::string &value); + + void SetCustomFields(const std::unordered_map &custom_fields); + + void ResetEventContext(); + + inline const rpc::Event_SourceType &GetSourceType() const { return source_type_; } + + inline const std::string &GetSourceHostname() const { return source_hostname_; } + + inline int32_t GetSourcePid() const { return source_pid_; } + + inline const std::unordered_map &GetCustomFields() const { + return custom_fields_; + } + + private: + RayEventContext() {} + + RayEventContext(const RayEventContext &event_context) = delete; + + const RayEventContext &operator=(const RayEventContext &event_context) = delete; + + private: + rpc::Event_SourceType source_type_ = rpc::Event_SourceType::Event_SourceType_COMMON; + std::string source_hostname_ = boost::asio::ip::host_name(); + int32_t source_pid_ = getpid(); + std::unordered_map custom_fields_; + + static thread_local std::unique_ptr context_; +}; + +// when the RayEvent is deconstructed, the context information is obtained from the +// RayEventContext, then the Event structure is generated and pushed to the EventManager +// for sending +class RayEvent { + public: + RayEvent(rpc::Event_Severity severity, const std::string &label) + : severity_(severity), label_(label) {} + + template + RayEvent &operator<<(const T &t) { + osstream_ << t; + return *this; + } + + static void ReportEvent(const std::string &severity, const std::string &label, + const std::string &message); + + ~RayEvent(); + + private: + RayEvent() = default; + + void SendMessage(const std::string &message); + + RayEvent(const RayEvent &event) = delete; + + const RayEvent &operator=(const RayEvent &event) = delete; + + private: + rpc::Event_Severity severity_; + std::string label_; + std::ostringstream osstream_; +}; + +} // namespace ray diff --git a/src/ray/util/event_test.cc b/src/ray/util/event_test.cc new file mode 100644 index 000000000..f9c356867 --- /dev/null +++ b/src/ray/util/event_test.cc @@ -0,0 +1,115 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/util/event.h" +#include +#include +#include +#include "gtest/gtest.h" + +namespace ray { + +class TestEventReporter : public BaseEventReporter { + public: + virtual void Init() override {} + virtual void Report(const rpc::Event &event) override { event_list.push_back(event); } + virtual void Close() override {} + virtual ~TestEventReporter() {} + virtual std::string GetReporterKey() override { return "test.event.reporter"; } + + public: + static std::vector event_list; +}; + +std::vector TestEventReporter::event_list = std::vector(); + +void CheckEventDetail(rpc::Event &event, std::string job_id, std::string node_id, + std::string task_id, std::string source_type, std::string severity, + std::string label, std::string message) { + int custom_key_num = 0; + auto mp = (*event.mutable_custom_fields()); + + if (job_id != "") { + EXPECT_EQ(mp["job_id"], job_id); + custom_key_num++; + } + if (node_id != "") { + EXPECT_EQ(mp["node_id"], node_id); + custom_key_num++; + } + if (task_id != "") { + EXPECT_EQ(mp["task_id"], task_id); + custom_key_num++; + } + EXPECT_EQ(mp.size(), custom_key_num); + EXPECT_EQ(rpc::Event_SourceType_Name(event.source_type()), source_type); + EXPECT_EQ(rpc::Event_Severity_Name(event.severity()), severity); + EXPECT_EQ(event.label(), label); + EXPECT_EQ(event.message(), message); + EXPECT_EQ(event.source_pid(), getpid()); +} + +TEST(EVENT_TEST, TEST_BASIC) { + TestEventReporter::event_list.clear(); + ray::EventManager::Instance().ClearReporters(); + + RAY_EVENT(WARNING, "label") << "test for empty reporters"; + + // If there are no reporters, it would not Publish event + EXPECT_EQ(TestEventReporter::event_list.size(), 0); + + ray::EventManager::Instance().AddReporter(std::make_shared()); + + RAY_EVENT(WARNING, "label 0") << "send message 0"; + + ray::RayEventContext::Instance().SetEventContext( + rpc::Event_SourceType::Event_SourceType_CORE_WORKER, + std::unordered_map( + {{"node_id", "node 1"}, {"job_id", "job 1"}, {"task_id", "task 1"}})); + + RAY_EVENT(INFO, "label 1") << "send message 1"; + + ray::RayEventContext::Instance().SetEventContext( + rpc::Event_SourceType::Event_SourceType_RAYLET, + std::unordered_map( + {{"node_id", "node 2"}, {"job_id", "job 2"}})); + RAY_EVENT(ERROR, "label 2") << "send message 2 " + << "send message again"; + + ray::RayEventContext::Instance().SetEventContext( + rpc::Event_SourceType::Event_SourceType_GCS); + RAY_EVENT(FATAL, "") << ""; + + std::vector &result = TestEventReporter::event_list; + + EXPECT_EQ(result.size(), 4); + + CheckEventDetail(result[0], "", "", "", "COMMON", "WARNING", "label 0", + "send message 0"); + + CheckEventDetail(result[1], "job 1", "node 1", "task 1", "CORE_WORKER", "INFO", + "label 1", "send message 1"); + + CheckEventDetail(result[2], "job 2", "node 2", "", "RAYLET", "ERROR", "label 2", + "send message 2 send message again"); + + CheckEventDetail(result[3], "", "", "", "GCS", "FATAL", "", ""); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 2f0546ee7..5b678d4f2 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -43,6 +43,18 @@ class stream_protocol; enum class CommandLineSyntax { System, POSIX, Windows }; +// Transfer the string to the Hex format. It can be more readable than the ANSI mode +inline std::string StringToHex(const std::string &str) { + constexpr char hex[] = "0123456789abcdef"; + std::string result; + for (size_t i = 0; i < str.size(); i++) { + unsigned char val = str[i]; + result.push_back(hex[val >> 4]); + result.push_back(hex[val & 0xf]); + } + return result; +} + /// Return the number of milliseconds since the steady clock epoch. NOTE: The /// returned timestamp may be used for accurately measuring intervals but has /// no relation to wall clock time. It must not be used for synchronization @@ -66,6 +78,13 @@ inline int64_t current_sys_time_ms() { return ms_since_epoch.count(); } +inline int64_t current_sys_time_us() { + std::chrono::microseconds mu_since_epoch = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()); + return mu_since_epoch.count(); +} + /// A helper function to parse command-line arguments in a platform-compatible manner. /// /// \param cmdline The command-line to split.