From 1dd5d92789fee798473d0daaf37d237748684c78 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 7 Nov 2018 12:45:59 -0800 Subject: [PATCH] Enable timeline visualizations of object transfers. (#3255) * Plot object transfers. * Linting --- python/ray/experimental/state.py | 208 ++++++++++++++++++----- src/ray/common/client_connection.cc | 6 +- src/ray/object_manager/object_manager.cc | 111 ++++++++++-- src/ray/object_manager/object_manager.h | 61 ++++++- src/ray/ray_config.h | 4 +- src/ray/raylet/node_manager.cc | 32 ++++ src/ray/raylet/node_manager.h | 10 ++ src/ray/util/util.h | 12 ++ test/runtest.py | 54 ++++++ 9 files changed, 431 insertions(+), 67 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 723d4b9fb..c9d4ae8ce 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -584,24 +584,78 @@ class GlobalState(object): for component_id in component_identifiers_binary } - def chrome_tracing_dump(self, - include_task_data=False, - filename=None, - open_browser=False): + def _seconds_to_microseconds(self, time_in_seconds): + """A helper function for converting seconds to microseconds.""" + time_in_microseconds = 10**6 * time_in_seconds + return time_in_microseconds + + # Colors are specified at + # https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html. # noqa: E501 + _default_color_mapping = defaultdict( + lambda: "generic_work", { + "worker_idle": "cq_build_abandoned", + "task": "rail_response", + "task:deserialize_arguments": "rail_load", + "task:execute": "rail_animation", + "task:store_outputs": "rail_idle", + "wait_for_function": "detailed_memory_dump", + "ray.get": "good", + "ray.put": "terrible", + "ray.wait": "vsync_highlight_color", + "submit_task": "background_memory_dump", + "fetch_and_run_function": "detailed_memory_dump", + "register_remote_function": "detailed_memory_dump", + }) + + # These colors are for use in Chrome tracing. + _chrome_tracing_colors = [ + "thread_state_uninterruptible", + "thread_state_iowait", + "thread_state_running", + "thread_state_runnable", + "thread_state_sleeping", + "thread_state_unknown", + "background_memory_dump", + "light_memory_dump", + "detailed_memory_dump", + "vsync_highlight_color", + "generic_work", + "good", + "bad", + "terrible", + # "black", + # "grey", + # "white", + "yellow", + "olive", + "rail_response", + "rail_animation", + "rail_idle", + "rail_load", + "startup", + "heap_dump_stack_frame", + "heap_dump_object_type", + "heap_dump_child_node_arrow", + "cq_build_running", + "cq_build_passed", + "cq_build_failed", + "cq_build_abandoned", + "cq_build_attempt_runnig", + "cq_build_attempt_passed", + "cq_build_attempt_failed", + ] + + def chrome_tracing_dump(self, filename=None): """Return a list of profiling events that can viewed as a timeline. To view this information as a timeline, simply dump it as a json file - using json.dumps, and then load go to chrome://tracing in the Chrome - web browser and load the dumped file. Make sure to enable "Flow events" - in the "View Options" menu. + by passing in "filename" or using using json.dump, and then load go to + chrome://tracing in the Chrome web browser and load the dumped file. + Make sure to enable "Flow events" in the "View Options" menu. Args: - include_task_data: If true, we will include more task metadata such - as the task specifications in the json. filename: If a filename is provided, the timeline is dumped to that file. - open_browser: If true, we will attempt to automatically open the - timeline visualization in Chrome. Returns: If filename is not provided, this returns a list of profiling @@ -612,38 +666,15 @@ class GlobalState(object): # TODO(rkn): This should support viewing just a window of time or a # limited number of events. - if include_task_data: - raise NotImplementedError("This flag has not been implented yet.") - - if open_browser: - raise NotImplementedError("This flag has not been implented yet.") - profile_table = self.profile_table() all_events = [] - # Colors are specified at - # https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html. # noqa: E501 - default_color_mapping = defaultdict( - lambda: "generic_work", { - "worker_idle": "cq_build_abandoned", - "task": "rail_response", - "task:deserialize_arguments": "rail_load", - "task:execute": "rail_animation", - "task:store_outputs": "rail_idle", - "wait_for_function": "detailed_memory_dump", - "ray.get": "good", - "ray.put": "terrible", - "ray.wait": "vsync_highlight_color", - "submit_task": "background_memory_dump", - "fetch_and_run_function": "detailed_memory_dump", - "register_remote_function": "detailed_memory_dump", - }) - - def seconds_to_microseconds(time_in_seconds): - time_in_microseconds = 10**6 * time_in_seconds - return time_in_microseconds - for component_id_hex, component_events in profile_table.items(): + # Only consider workers and drivers. + component_type = component_events[0]["component_type"] + if component_type not in ["worker", "driver"]: + continue + for event in component_events: new_event = { # The category of the event. @@ -657,14 +688,14 @@ class GlobalState(object): "tid": event["component_type"] + ":" + event["component_id"], # The start time in microseconds. - "ts": seconds_to_microseconds(event["start_time"]), + "ts": self._seconds_to_microseconds(event["start_time"]), # The duration in microseconds. - "dur": seconds_to_microseconds(event["end_time"] - - event["start_time"]), + "dur": self._seconds_to_microseconds(event["end_time"] - + event["start_time"]), # What is this? "ph": "X", # This is the name of the color to display the box in. - "cname": default_color_mapping[event["event_type"]], + "cname": self._default_color_mapping[event["event_type"]], # The extra user-defined data. "args": event["extra_data"], } @@ -684,6 +715,97 @@ class GlobalState(object): else: return all_events + def chrome_tracing_object_transfer_dump(self, filename=None): + """Return a list of transfer events that can viewed as a timeline. + + To view this information as a timeline, simply dump it as a json file + by passing in "filename" or using using json.dump, and then load go to + chrome://tracing in the Chrome web browser and load the dumped file. + Make sure to enable "Flow events" in the "View Options" menu. + + Args: + filename: If a filename is provided, the timeline is dumped to that + file. + + Returns: + If filename is not provided, this returns a list of profiling + events. Each profile event is a dictionary. + """ + client_id_to_address = {} + for client_info in ray.global_state.client_table(): + client_id_to_address[client_info["ClientID"]] = "{}:{}".format( + client_info["NodeManagerAddress"], + client_info["ObjectManagerPort"]) + + all_events = [] + + for key, items in self.profile_table().items(): + # Only consider object manager events. + if items[0]["component_type"] != "object_manager": + continue + + for event in items: + if event["event_type"] == "transfer_send": + object_id, remote_client_id, _, _ = event["extra_data"] + + elif event["event_type"] == "transfer_receive": + object_id, remote_client_id, _, _ = event["extra_data"] + + elif event["event_type"] == "receive_pull_request": + object_id, remote_client_id = event["extra_data"] + + else: + assert False, "This should be unreachable." + + # Choose a color by reading the first couple of hex digits of + # the object ID as an integer and turning that into a color. + object_id_int = int(object_id[:2], 16) + color = self._chrome_tracing_colors[object_id_int % len( + self._chrome_tracing_colors)] + + new_event = { + # The category of the event. + "cat": event["event_type"], + # The string displayed on the event. + "name": event["event_type"], + # The identifier for the group of rows that the event + # appears in. + "pid": client_id_to_address[key], + # The identifier for the row that the event appears in. + "tid": client_id_to_address[remote_client_id], + # The start time in microseconds. + "ts": self._seconds_to_microseconds(event["start_time"]), + # The duration in microseconds. + "dur": self._seconds_to_microseconds(event["end_time"] - + event["start_time"]), + # What is this? + "ph": "X", + # This is the name of the color to display the box in. + "cname": color, + # The extra user-defined data. + "args": event["extra_data"], + } + all_events.append(new_event) + + # Add another box with a color indicating whether it was a send + # or a receive event. + if event["event_type"] == "transfer_send": + additional_event = new_event.copy() + additional_event["cname"] = "black" + all_events.append(additional_event) + elif event["event_type"] == "transfer_receive": + additional_event = new_event.copy() + additional_event["cname"] = "grey" + all_events.append(additional_event) + else: + pass + + if filename is not None: + with open(filename, "w") as outfile: + json.dump(all_events, outfile) + else: + return all_events + def dump_catapult_trace(self, path, task_info, diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index dfefb3b86..2098b8c8d 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -233,12 +233,12 @@ void ClientConnection::ProcessMessage(const boost::system::error_code &error) read_type_ = static_cast(protocol::MessageType::DisconnectClient); } - uint64_t start_ms = current_time_ms(); + int64_t start_ms = current_time_ms(); message_handler_(shared_ClientConnection_from_this(), read_type_, read_message_.data()); - uint64_t interval = current_time_ms() - start_ms; + int64_t interval = current_time_ms() - start_ms; if (interval > RayConfig::instance().handler_warning_timeout_ms()) { RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type " << read_type_ - << " took " << interval << " ms "; + << " took " << interval << " ms."; } } diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 745270587..d19e81e0b 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -283,6 +283,9 @@ void ObjectManager::PullEstablishConnection(const ObjectID &object_id, void ObjectManager::PullSendRequest(const ObjectID &object_id, std::shared_ptr &conn) { + // TODO(rkn): This would be a natural place to record a profile event + // indicating that a pull request was sent. + flatbuffers::FlatBufferBuilder fbb; auto message = object_manager_protocol::CreatePullRequestMessage( fbb, fbb.CreateString(client_id_.binary()), fbb.CreateString(object_id.binary())); @@ -311,6 +314,46 @@ void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id, } } +void ObjectManager::HandleSendFinished(const ObjectID &object_id, + const ClientID &client_id, uint64_t chunk_index, + double start_time, double end_time, + ray::Status status) { + if (!status.ok()) { + // TODO(rkn): What do we want to do if the send failed? + } + + ProfileEventT profile_event; + profile_event.event_type = "transfer_send"; + profile_event.start_time = start_time; + profile_event.end_time = end_time; + // Encode the object ID, client ID, chunk index, and status as a json list, + // which will be parsed by the reader of the profile table. + profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," + + std::to_string(chunk_index) + ",\"" + status.ToString() + + "\"]"; + profile_events_.push_back(profile_event); +} + +void ObjectManager::HandleReceiveFinished(const ObjectID &object_id, + const ClientID &client_id, uint64_t chunk_index, + double start_time, double end_time, + ray::Status status) { + if (!status.ok()) { + // TODO(rkn): What do we want to do if the send failed? + } + + ProfileEventT profile_event; + profile_event.event_type = "transfer_receive"; + profile_event.start_time = start_time; + profile_event.end_time = end_time; + // Encode the object ID, client ID, chunk index, and status as a json list, + // which will be parsed by the reader of the profile table. + profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," + + std::to_string(chunk_index) + ",\"" + status.ToString() + + "\"]"; + profile_events_.push_back(profile_event); +} + void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { if (local_objects_.count(object_id) == 0) { // Avoid setting duplicated timer for the same object and client pair. @@ -355,12 +398,21 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { send_service_.post([this, client_id, object_id, data_size, metadata_size, chunk_index, connection_info]() { + double start_time = current_sys_time_seconds(); // NOTE: When this callback executes, it's possible that the object // will have already been evicted. It's also possible that the // object could be in the process of being transferred to this // object manager from another object manager. - ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, - connection_info); + ray::Status status = ExecuteSendObject( + client_id, object_id, data_size, metadata_size, chunk_index, connection_info); + + // Notify the main thread that we have finished sending the chunk. + main_service_->post( + [this, object_id, client_id, chunk_index, start_time, status]() { + double end_time = current_sys_time_seconds(); + HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, + status); + }); }); } } else { @@ -370,10 +422,10 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { } } -void ObjectManager::ExecuteSendObject(const ClientID &client_id, - const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, - const RemoteConnectionInfo &connection_info) { +ray::Status ObjectManager::ExecuteSendObject( + const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, + const RemoteConnectionInfo &connection_info) { RAY_LOG(DEBUG) << "ExecuteSendObject " << client_id << " " << object_id << " " << chunk_index; ray::Status status; @@ -390,6 +442,7 @@ void ObjectManager::ExecuteSendObject(const ClientID &client_id, CheckIOError(status, "Push"); } } + return status; } ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, @@ -709,6 +762,14 @@ void ObjectManager::ReceivePullRequest(std::shared_ptr &con auto pr = flatbuffers::GetRoot(message); ObjectID object_id = ObjectID::from_binary(pr->object_id()->str()); ClientID client_id = ClientID::from_binary(pr->client_id()->str()); + + ProfileEventT profile_event; + profile_event.event_type = "receive_pull_request"; + profile_event.start_time = current_sys_time_seconds(); + profile_event.end_time = profile_event.start_time; + profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"]"; + profile_events_.push_back(profile_event); + Push(object_id, client_id); conn->ProcessMessages(); } @@ -718,20 +779,28 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr &con // Serialize. auto object_header = flatbuffers::GetRoot(message); - ObjectID object_id = ObjectID::from_binary(object_header->object_id()->str()); + const ObjectID object_id = ObjectID::from_binary(object_header->object_id()->str()); uint64_t chunk_index = object_header->chunk_index(); uint64_t data_size = object_header->data_size(); uint64_t metadata_size = object_header->metadata_size(); receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() { - ExecuteReceiveObject(conn->GetClientID(), object_id, data_size, metadata_size, - chunk_index, *conn); + double start_time = current_sys_time_seconds(); + const ClientID client_id = conn->GetClientID(); + auto status = ExecuteReceiveObject(client_id, object_id, data_size, metadata_size, + chunk_index, *conn); + // Notify the main thread that we have finished receiving the object. + main_service_->post([this, object_id, client_id, chunk_index, start_time, status]() { + double end_time = current_sys_time_seconds(); + HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, + status); + }); + }); } -void ObjectManager::ExecuteReceiveObject(const ClientID &client_id, - const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, - TcpClientConnection &conn) { +ray::Status ObjectManager::ExecuteReceiveObject( + const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, TcpClientConnection &conn) { RAY_LOG(DEBUG) << "ExecuteReceiveObject " << client_id << " " << object_id << " " << chunk_index; @@ -769,6 +838,8 @@ void ObjectManager::ExecuteReceiveObject(const ClientID &client_id, conn.ProcessMessages(); RAY_LOG(DEBUG) << "ReceiveCompleted " << client_id_ << " " << object_id << " " << "/" << config_.max_receives; + + return chunk_status.second; } void ObjectManager::ReceiveFreeRequest(std::shared_ptr &conn, @@ -820,4 +891,18 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector &object_ } } +ProfileTableDataT ObjectManager::GetAndResetProfilingInfo() { + ProfileTableDataT profile_info; + profile_info.component_type = "object_manager"; + profile_info.component_id = client_id_.binary(); + + for (auto const &profile_event : profile_events_) { + profile_info.profile_events.emplace_back(new ProfileEventT(profile_event)); + } + + profile_events_.clear(); + + return profile_info; +} + } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 243e29d5b..59b6bda63 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -175,6 +175,12 @@ class ObjectManager : public ObjectManagerInterface { /// or send it to all the object stores. void FreeObjects(const std::vector &object_ids, bool local_only); + /// Return profiling information and reset the profiling information. + /// + /// \return All profiling information that has accumulated since the last call + /// to this method. + ProfileTableDataT GetAndResetProfilingInfo(); + private: friend class TestObjectManager; @@ -252,17 +258,55 @@ class ObjectManager : public ObjectManagerInterface { /// Asynchronously send a pull request via remote object manager connection. /// Executes on main_service_ thread. + /// + /// \param object_id The ID of the object request. + /// \param conn The connection to the remote object manager. + /// \return Void. void PullSendRequest(const ObjectID &object_id, std::shared_ptr &conn); std::shared_ptr CreateSenderConnection( ConnectionPool::ConnectionType type, RemoteConnectionInfo info); + /// This is used to notify the main thread that the sending of a chunk has + /// completed. + /// + /// \param object_id The ID of the object that was sent. + /// \param client_id The ID of the client that the chunk was sent to. + /// \param chunk_index The index of the chunk. + /// \param start_time_us The time when the object manager began sending the + /// chunk. + /// \param end_time_us The time when the object manager finished sending the + /// chunk. + /// \param status The status of the send (e.g., did it succeed or fail). + /// \return Void. + void HandleSendFinished(const ObjectID &object_id, const ClientID &client_id, + uint64_t chunk_index, double start_time_us, double end_time_us, + ray::Status status); + + /// This is used to notify the main thread that the receiving of a chunk has + /// completed. + /// + /// \param object_id The ID of the object that was received. + /// \param client_id The ID of the client that the chunk was received from. + /// \param chunk_index The index of the chunk. + /// \param start_time_us The time when the object manager began receiving the + /// chunk. + /// \param end_time_us The time when the object manager finished receiving the + /// chunk. + /// \param status The status of the receive (e.g., did it succeed or fail). + /// \return Void. + void HandleReceiveFinished(const ObjectID &object_id, const ClientID &client_id, + uint64_t chunk_index, double start_time_us, + double end_time_us, ray::Status status); + /// Begin executing a send. /// Executes on send_service_ thread pool. - void ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, - const RemoteConnectionInfo &connection_info); + ray::Status ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, + const RemoteConnectionInfo &connection_info); + /// This method synchronously sends the object id and object size /// to the remote object manager. /// Executes on send_service_ thread pool. @@ -280,10 +324,11 @@ class ObjectManager : public ObjectManagerInterface { /// This will invoke the object receive on the receive_service_ thread pool. void ReceivePushRequest(std::shared_ptr &conn, const uint8_t *message); + /// Execute a receive on the receive_service_ thread pool. - void ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, TcpClientConnection &conn); + ray::Status ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, TcpClientConnection &conn); /// Handles receiving a pull request message. void ReceivePullRequest(std::shared_ptr &conn, @@ -351,6 +396,10 @@ class ObjectManager : public ObjectManagerInterface { unfulfilled_push_requests_; std::unordered_map pull_requests_; + + /// Profiling events that are to be batched together and added to the profile + /// table in the GCS. + std::vector profile_events_; }; } // namespace ray diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index de44ec855..bc1999307 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -12,7 +12,7 @@ class RayConfig { int64_t ray_protocol_version() const { return ray_protocol_version_; } - uint64_t handler_warning_timeout_ms() const { return handler_warning_timeout_ms_; } + int64_t handler_warning_timeout_ms() const { return handler_warning_timeout_ms_; } int64_t heartbeat_timeout_milliseconds() const { return heartbeat_timeout_milliseconds_; @@ -147,7 +147,7 @@ class RayConfig { /// The duration that a single handler on the event loop can take before a /// warning is logged that the handler is taking too long. - uint64_t handler_warning_timeout_ms_; + int64_t handler_warning_timeout_ms_; /// The duration between heartbeats. These are sent by the plasma manager and /// local scheduler. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 54145152b..e2d86df75 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -50,6 +50,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, gcs_client_(gcs_client), heartbeat_timer_(io_service), heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), + object_manager_profile_timer_(io_service), local_resources_(config.resource_config), local_available_resources_(config.resource_config), worker_pool_(config.num_initial_workers, config.num_workers_per_process, @@ -174,6 +175,9 @@ ray::Status NodeManager::RegisterGcs() { // Start sending heartbeats to the GCS. last_heartbeat_at_ms_ = current_time_ms(); Heartbeat(); + // Start the timer that gets object manager profiling information and sends it + // to the GCS. + GetObjectManagerProfileInfo(); return ray::Status::OK(); } @@ -280,6 +284,34 @@ void NodeManager::Heartbeat() { }); } +void NodeManager::GetObjectManagerProfileInfo() { + int64_t start_time_ms = current_time_ms(); + + auto profile_info = object_manager_.GetAndResetProfilingInfo(); + + if (profile_info.profile_events.size() > 0) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreateProfileTableData(fbb, &profile_info); + fbb.Finish(message); + auto profile_message = flatbuffers::GetRoot(fbb.GetBufferPointer()); + + RAY_CHECK_OK(gcs_client_->profile_table().AddProfileEventBatch(*profile_message)); + } + + // Reset the timer. + object_manager_profile_timer_.expires_from_now(heartbeat_period_); + object_manager_profile_timer_.async_wait( + [this](const boost::system::error_code &error) { + RAY_CHECK(!error); + GetObjectManagerProfileInfo(); + }); + + int64_t interval = current_time_ms() - start_time_ms; + if (interval > RayConfig::instance().handler_warning_timeout_ms()) { + RAY_LOG(WARNING) << "GetObjectManagerProfileInfo handler took " << interval << " ms."; + } +} + void NodeManager::ClientAdded(const ClientTableDataT &client_data) { const ClientID client_id = ClientID::from_binary(client_data.client_id); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 483f701d3..9e45a8e20 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -104,8 +104,15 @@ class NodeManager { /// \param client_data Data associated with the removed client. /// \return Void. void ClientRemoved(const ClientTableDataT &client_data); + /// Send heartbeats to the GCS. void Heartbeat(); + + /// Get profiling information from the object manager and push it to the GCS. + /// + /// \return Void. + void GetObjectManagerProfileInfo(); + /// Handler for a heartbeat notification from the GCS. /// /// \param client The GCS client. @@ -339,6 +346,9 @@ class NodeManager { boost::asio::steady_timer heartbeat_timer_; /// The period used for the heartbeat timer. std::chrono::milliseconds heartbeat_period_; + /// The timer used to get profiling information from the object manager and + /// push it to the GCS. + boost::asio::steady_timer object_manager_profile_timer_; /// The time that the last heartbeat was sent at. Used to make sure we are /// keeping up with heartbeats. uint64_t last_heartbeat_at_ms_; diff --git a/src/ray/util/util.h b/src/ray/util/util.h index f5fc657de..ba34cb733 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -29,6 +29,18 @@ inline int64_t current_sys_time_ms() { return ms_since_epoch.count(); } +inline int64_t current_sys_time_us() { + std::chrono::microseconds mu_since_epoch = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()); + return mu_since_epoch.count(); +} + +inline double current_sys_time_seconds() { + int64_t microseconds_in_seconds = 1000000; + return static_cast(current_sys_time_us()) / microseconds_in_seconds; +} + inline ray::Status boost_to_ray_status(const boost::system::error_code &error) { switch (error.value()) { case boost::system::errc::success: diff --git a/test/runtest.py b/test/runtest.py index 681575a4b..704d1145a 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import os import re import setproctitle @@ -17,6 +18,7 @@ import pytest import ray import ray.ray_constants as ray_constants +import ray.test.cluster_utils import ray.test.test_utils @@ -1037,6 +1039,58 @@ def test_profiling_api(shutdown_only): break +@pytest.fixture() +def ray_start_cluster(): + cluster = ray.test.cluster_utils.Cluster() + yield cluster + + # The code after the yield will run as teardown code. + ray.shutdown() + cluster.shutdown() + + +def test_object_transfer_dump(ray_start_cluster): + cluster = ray_start_cluster + + num_nodes = 3 + for i in range(num_nodes): + cluster.add_node(resources={str(i): 1}, object_store_memory=10**9) + + ray.init(redis_address=cluster.redis_address) + + @ray.remote + def f(x): + return + + # These objects will live on different nodes. + object_ids = [ + f._submit(args=[1], resources={str(i): 1}) for i in range(num_nodes) + ] + + # Broadcast each object from each machine to each other machine. + for object_id in object_ids: + ray.get([ + f._submit(args=[object_id], resources={str(i): 1}) + for i in range(num_nodes) + ]) + + # The profiling information only flushes once every second. + time.sleep(1.1) + + transfer_dump = ray.global_state.chrome_tracing_object_transfer_dump() + # Make sure the transfer dump can be serialized with JSON. + json.loads(json.dumps(transfer_dump)) + assert len(transfer_dump) >= num_nodes**2 + assert len({ + event["pid"] + for event in transfer_dump if event["name"] == "transfer_receive" + }) == num_nodes + assert len({ + event["pid"] + for event in transfer_dump if event["name"] == "transfer_send" + }) == num_nodes + + def test_identical_function_names(shutdown_only): # Define a bunch of remote functions and make sure that we don't # accidentally call an older version.