diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 3452464c1..942f1849c 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -121,6 +121,7 @@ from ray.worker import ( register_custom_serializer, remote, shutdown, + show_in_webui, wait, ) # noqa: E402 import ray.internal # noqa: E402 @@ -169,6 +170,7 @@ __all__ = [ "register_custom_serializer", "remote", "shutdown", + "show_in_webui", "wait", ] diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 12123dc0f..a82a66b51 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -805,6 +805,9 @@ cdef class CoreWorker: def get_actor_id(self): return ActorID(self.core_worker.get().GetActorId().Binary()) + def set_webui_display(self, message): + self.core_worker.get().SetWebuiDisplay(message) + def get_objects(self, object_ids, TaskID current_task_id, int64_t timeout_ms=-1): cdef: diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index a345e7748..b15f2daa6 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -388,7 +388,8 @@ class RayletStats(threading.Thread): for node in self.nodes: node_id = node["NodeID"] stub = self.stubs[node_id] - reply = stub.GetNodeStats(node_manager_pb2.NodeStatsRequest()) + reply = stub.GetNodeStats( + node_manager_pb2.NodeStatsRequest(), timeout=2) replies[node["NodeManagerAddress"]] = reply with self._raylet_stats_lock: for address, reply in replies.items(): diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 1e8abf08b..134755118 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -113,6 +113,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CJobID GetCurrentJobId() CTaskID GetCurrentTaskId() const CActorID &GetActorId() + void SetWebuiDisplay(const c_string &message) CTaskID GetCallerId() const ResourceMappingType &GetResourceIDs() const CActorID DeserializeAndRegisterActorHandle(const c_string &bytes) diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 5977b16f1..7496ed749 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -21,12 +21,64 @@ def test_worker_stats(ray_start_regular): channel = grpc.insecure_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) - reply = stub.GetNodeStats(node_manager_pb2.NodeStatsRequest()) + + def try_get_node_stats(num_retry=5, timeout=2): + reply = None + for _ in range(num_retry): + try: + reply = stub.GetNodeStats( + node_manager_pb2.NodeStatsRequest(), timeout=timeout) + break + except grpc.RpcError: + continue + assert reply is not None + return reply + + reply = try_get_node_stats() # Check that there is one connected driver. drivers = [worker for worker in reply.workers_stats if worker.is_driver] assert len(drivers) == 1 assert os.getpid() == drivers[0].pid + @ray.remote + def f(): + ray.show_in_webui("test") + return os.getpid() + + @ray.remote + class Actor(object): + def __init__(self): + pass + + def f(self): + ray.show_in_webui("test") + return os.getpid() + + # Test show_in_webui for remote functions. + worker_pid = ray.get(f.remote()) + reply = try_get_node_stats() + target_worker_present = False + for worker in reply.workers_stats: + if worker.webui_display == "test": + target_worker_present = True + assert worker.pid == worker_pid + else: + assert worker.webui_display == "" + assert target_worker_present + + # Test show_in_webui for remote actors. + a = Actor.remote() + worker_pid = ray.get(a.f.remote()) + reply = try_get_node_stats() + target_worker_present = False + for worker in reply.workers_stats: + if worker.webui_display == "test": + target_worker_present = True + assert worker.pid == worker_pid + else: + assert worker.webui_display == "" + assert target_worker_present + timeout_seconds = 20 start_time = time.time() while True: @@ -37,7 +89,7 @@ def test_worker_stats(ray_start_regular): # Wait for the workers to start. if len(reply.workers_stats) < num_cpus + 1: time.sleep(1) - reply = stub.GetNodeStats(node_manager_pb2.NodeStatsRequest()) + reply = try_get_node_stats() continue # Check that the rest of the processes are workers, 1 for each CPU. diff --git a/python/ray/worker.py b/python/ray/worker.py index b0fa9b43c..2fba5496e 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1412,6 +1412,21 @@ def register_custom_serializer(cls, class_id=class_id) +def show_in_webui(message): + """Display message in dashboard. + + Display message for the current task or actor in the dashboard. + For example, this can be used to display the status of a long-running + computation. + + Args: + message (str): Message to be displayed. + """ + worker = global_worker + worker.check_connected() + worker.core_worker.set_webui_display(message.encode()) + + def get(object_ids, timeout=None): """Get a remote object or a list of remote objects from the object store. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e5c31e60d..2ae41f399 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1032,6 +1032,13 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques } } +void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request, + rpc::GetCoreWorkerStatsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + reply->set_webui_display(webui_display_); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void CoreWorker::YieldCurrentFiber(FiberEvent &event) { RAY_CHECK(worker_context_.CurrentActorIsAsync()); boost::this_fiber::yield(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index a9a47c534..03d4b9abd 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -32,7 +32,8 @@ RAY_CORE_WORKER_RPC_HANDLER(AssignTask, 5) \ RAY_CORE_WORKER_RPC_HANDLER(PushTask, 9999) \ RAY_CORE_WORKER_RPC_HANDLER(DirectActorCallArgWaitComplete, 100) \ - RAY_CORE_WORKER_RPC_HANDLER(GetObjectStatus, 9999) + RAY_CORE_WORKER_RPC_HANDLER(GetObjectStatus, 9999) \ + RAY_CORE_WORKER_RPC_HANDLER(GetCoreWorkerStats, 100) namespace ray { @@ -101,6 +102,9 @@ class CoreWorker { actor_id_ = actor_id; } + void SetWebuiDisplay(const std::string &message) { webui_display_ = message; } + + /// Increase the reference count for this object ID. /// Increase the local reference count for this object ID. Should be called /// by the language frontend when a new reference is created. /// @@ -402,6 +406,11 @@ class CoreWorker { rpc::GetObjectStatusReply *reply, rpc::SendReplyCallback send_reply_callback); + /// Get statistics from core worker. + void HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request, + rpc::GetCoreWorkerStatsReply *reply, + rpc::SendReplyCallback send_reply_callback); + /// /// Public methods related to async actor call. This should only be used when /// the actor is (1) direct actor and (2) using asyncio mode. @@ -626,6 +635,9 @@ class CoreWorker { /// Our actor ID. If this is nil, then we execute only stateless tasks. ActorID actor_id_; + /// String to be displayed on Web UI. + std::string webui_display_; + /// Event loop where tasks are processed. boost::asio::io_service task_execution_service_; diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 593f5cdfb..3a9c5dc68 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -117,6 +117,16 @@ message GetObjectStatusReply { ObjectStatus status = 1; } +message GetCoreWorkerStatsRequest { + // The ID of the worker this message is intended for. + bytes intended_worker_id = 1; +} + +message GetCoreWorkerStatsReply { + // String displayed on Web UI. + string webui_display = 1; +} + service CoreWorkerService { // Push a task to a worker from the raylet. rpc AssignTask(AssignTaskRequest) returns (AssignTaskReply); @@ -127,4 +137,6 @@ service CoreWorkerService { returns (DirectActorCallArgWaitCompleteReply); // Ask the object's owner about the object's current status. rpc GetObjectStatus(GetObjectStatusRequest) returns (GetObjectStatusReply); + // Get metrics from core workers. + rpc GetCoreWorkerStats(GetCoreWorkerStatsRequest) returns (GetCoreWorkerStatsReply); } diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index e7a5987c9..955c0799d 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -53,6 +53,8 @@ message WorkerStats { uint32 pid = 1; // Whether this is a driver. bool is_driver = 2; + // String displayed on Web UI. + string webui_display = 3; } message ViewData { @@ -84,6 +86,7 @@ message NodeStatsReply { repeated ViewData view_data = 2; map available_resources = 3; map total_resources = 4; + uint32 num_workers = 5; } // Service for inter-node-manager communication. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 8d076b49a..0a9d492ff 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2974,11 +2974,6 @@ std::string compact_tag_string(const opencensus::stats::ViewDescriptor &view, void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, rpc::NodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { - for (const auto &worker : worker_pool_.GetAllWorkers()) { - auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(worker->Pid()); - worker_stats->set_is_driver(false); - } for (const auto &driver : worker_pool_.GetAllDrivers()) { auto worker_stats = reply->add_workers_stats(); worker_stats->set_pid(driver->Pid()); @@ -3039,7 +3034,38 @@ void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, } } } - send_reply_callback(Status::OK(), nullptr, nullptr); + // As a result of the HandleNodeStatsRequest, we are collecting information from all + // workers on this node. This is done by calling GetCoreWorkerStats on each worker. In + // order to send up-to-date information back, we wait until all workers have replied, + // and return the information from HandleNodesStatsRequest. The caller of + // HandleNodeStatsRequest should set a timeout so that the rpc finishes even if not all + // workers have replied. + auto all_workers = worker_pool_.GetAllWorkers(); + for (const auto &worker : all_workers) { + rpc::GetCoreWorkerStatsRequest request; + request.set_intended_worker_id(worker->WorkerId().Binary()); + auto status = worker->rpc_client()->GetCoreWorkerStats( + request, [reply, worker, all_workers, send_reply_callback]( + const ray::Status &status, const rpc::GetCoreWorkerStatsReply &r) { + if (!status.ok()) { + RAY_LOG(WARNING) << "Failed to send get core worker stats request: " + << status.ToString(); + } else { + auto worker_stats = reply->add_workers_stats(); + worker_stats->set_pid(worker->Pid()); + worker_stats->set_is_driver(false); + reply->set_num_workers(reply->num_workers() + 1); + worker_stats->set_webui_display(r.webui_display()); + if (reply->num_workers() == all_workers.size()) { + send_reply_callback(Status::OK(), nullptr, nullptr); + } + } + }); + if (!status.ok()) { + RAY_LOG(WARNING) << "Failed to send get core worker stats request: " + << status.ToString(); + } + } } void NodeManager::RecordMetrics() { diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index ac186f990..de2caa981 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -68,6 +68,8 @@ class Worker { void DirectActorCallArgWaitComplete(int64_t tag); void WorkerLeaseGranted(const std::string &address, int port); + rpc::CoreWorkerClient *rpc_client() { return rpc_client_.get(); } + private: /// The worker's ID. WorkerID worker_id_; diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index b0a5a2472..61262781d 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -121,6 +121,12 @@ class CoreWorkerClientInterface { return Status::NotImplemented(""); } + virtual ray::Status GetCoreWorkerStats( + const GetCoreWorkerStatsRequest &request, + const ClientCallback &callback) { + return Status::NotImplemented(""); + } + virtual ~CoreWorkerClientInterface(){}; }; @@ -197,6 +203,17 @@ class CoreWorkerClient : public std::enable_shared_from_this, return call->GetStatus(); } + virtual ray::Status GetCoreWorkerStats( + const GetCoreWorkerStatsRequest &request, + const ClientCallback &callback) override { + auto call = + client_call_manager_.CreateCall( + *stub_, &CoreWorkerService::Stub::PrepareAsyncGetCoreWorkerStats, request, + callback); + return call->GetStatus(); + } + /// Send as many pending tasks as possible. This method is thread-safe. /// /// The client will guarantee no more than kMaxBytesInFlight bytes of RPCs are being