diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index c43ac3400..29991a211 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -72,7 +72,18 @@ def format_reply(reply): format_reply(item) -class Dashboard: +def measures_to_dict(measures): + measures_dict = {} + for measure in measures: + tags = measure["tags"].split(",")[-1] + if "intValue" in measure: + measures_dict[tags] = measure["intValue"] + elif "doubleValue" in measure: + measures_dict[tags] = measure["doubleValue"] + return measures_dict + + +class Dashboard(object): """A dashboard process for monitoring Ray nodes. This dashboard is made up of a REST API which collates data published by @@ -180,18 +191,43 @@ class Dashboard: actor_tree = self.node_stats.get_actor_tree( workers_info, infeasible_tasks) for address, data in D.items(): - available_resources = data["availableResources"] - total_resources = data["totalResources"] - extra_info = [] - for resource_name in sorted(available_resources.keys()): - total = total_resources[resource_name] - occupied = total - available_resources[resource_name] - total = format_resource(resource_name, total) - occupied = format_resource(resource_name, occupied) - extra_info.append("{}: {} / {}".format( - resource_name, occupied, total)) - data["extraInfo"] = ", ".join(extra_info) + # process view data + measures_dicts = {} + for view_data in data["viewData"]: + view_name = view_data["viewName"] + if view_name in ("local_available_resource", + "local_total_resource", + "object_manager_stats"): + measures_dicts[view_name] = measures_to_dict( + view_data["measures"]) + # process resources info + extra_info_strings = [] + prefix = "ResourceName:" + for resource_name, total_resource in measures_dicts[ + "local_total_resource"].items(): + available_resource = measures_dicts[ + "local_available_resource"].get(resource_name, .0) + resource_name = resource_name[len(prefix):] + extra_info_strings.append("{}: {} / {}".format( + resource_name, + format_resource(resource_name, + total_resource - available_resource), + format_resource(resource_name, total_resource))) + data["extraInfo"] = ",".join(extra_info_strings) + "\n" if os.environ.get("RAY_DASHBOARD_DEBUG"): + # process object store info + extra_info_strings = [] + prefix = "ValueType:" + for stats_name in [ + "used_object_store_memory", "num_local_objects" + ]: + stats_value = measures_dicts[ + "object_manager_stats"].get( + prefix + stats_name, .0) + extra_info_strings.append("{}: {}".format( + stats_name, stats_value)) + data["extraInfo"] += ", ".join(extra_info_strings) + # process actor info actor_tree_str = json.dumps( actor_tree, indent=2, sort_keys=True) lines = actor_tree_str.split("\n") @@ -202,7 +238,6 @@ class Dashboard: (max_line_length - len(line)) * " ") data["extraInfo"] += "\n" + "\n".join(to_print) D["actorInfo"] = actor_tree - D["infeasibleTasks"] = infeasible_tasks return await json_response(result=D) async def logs(req) -> aiohttp.web.Response: @@ -263,6 +298,21 @@ class NodeStats(threading.Thread): self._addr_to_extra_info_dict = {} self._node_stats_lock = threading.Lock() + self._default_info = { + "actorId": "", + "children": {}, + "ipAddress": "", + "isDirectCall": False, + "jobId": "", + "numLocalObjects": 0, + "numObjectIdsInScope": 0, + "port": 0, + "state": 0, + "taskQueueLength": 0, + "usedObjectStoreMemory": 0, + "usedResources": {}, + } + # Mapping from IP address to PID to list of log lines self._logs = defaultdict(lambda: defaultdict(list)) @@ -323,8 +373,9 @@ class NodeStats(threading.Thread): child_to_parent = {} with self._node_stats_lock: for addr, actor_id in self._addr_to_actor_id.items(): - flattened_tree[actor_id] = self._addr_to_extra_info_dict[addr] - flattened_tree[actor_id]["children"] = {} + flattened_tree[actor_id] = copy.deepcopy(self._default_info) + flattened_tree[actor_id].update( + self._addr_to_extra_info_dict[addr]) parent_id = self._addr_to_actor_id.get( self._addr_to_owner_addr[addr], "root") child_to_parent[actor_id] = parent_id diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 46c33917d..f3ed3f01a 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -117,14 +117,15 @@ def test_worker_stats(shutdown_only): def test_raylet_info_endpoint(shutdown_only): addresses = ray.init(include_webui=True, num_cpus=6) + @ray.remote + def f(): + return "test" + @ray.remote(num_cpus=1) class ActorA: def __init__(self): pass - def f(self): - return os.getpid() - @ray.remote(resources={"CustomResource": 1}) class ActorB: def __init__(self): @@ -135,7 +136,15 @@ def test_raylet_info_endpoint(shutdown_only): def __init__(self): self.children = [ActorA.remote(), ActorB.remote()] - _ = ActorC.remote() + def local_store(self): + self.local_storage = [f.remote() for _ in range(10)] + + def remote_store(self): + self.remote_storage = ray.put("test") + + c = ActorC.remote() + c.local_store.remote() + c.remote_store.remote() start_time = time.time() while True: @@ -148,13 +157,15 @@ def test_raylet_info_endpoint(shutdown_only): try: assert len(actor_info) == 1 _, parent_actor_info = actor_info.popitem() + assert parent_actor_info["numObjectIdsInScope"] == 11 + assert parent_actor_info["numLocalObjects"] == 10 children = parent_actor_info["children"] assert len(children) == 2 break except AssertionError: if time.time() > start_time + 30: - raise Exception( - "Timed out while waiting for actorInfo to show up.") + raise Exception("Timed out while waiting for actor info \ + or object store info update.") except requests.exceptions.ConnectionError: if time.time() > start_time + 30: raise Exception( diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2375d3ab8..ef72538c3 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -83,6 +83,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, internal_timer_(io_service_), core_worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */), reference_counter_(std::make_shared()), + task_queue_length_(0), task_execution_service_work_(task_execution_service_), task_execution_callback_(task_execution_callback), resource_ids_(new ResourceMappingType()), @@ -880,6 +881,8 @@ Status CoreWorker::AllocateReturnObjects( Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, const std::shared_ptr &resource_ids, std::vector> *return_objects) { + task_queue_length_ -= 1; + if (resource_ids != nullptr) { resource_ids_ = resource_ids; } @@ -1026,6 +1029,7 @@ void CoreWorker::HandleAssignTask(const rpc::AssignTaskRequest &request, nullptr); return; } else { + task_queue_length_ += 1; task_execution_service_.post([=] { raylet_task_receiver_->HandleAssignTask(request, reply, send_reply_callback); }); @@ -1040,6 +1044,7 @@ void CoreWorker::HandlePushTask(const rpc::PushTaskRequest &request, return; } + task_queue_length_ += 1; task_execution_service_.post([=] { direct_task_receiver_->HandlePushTask(request, reply, send_reply_callback); }); @@ -1120,6 +1125,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & absl::MutexLock lock(&mutex_); auto stats = reply->mutable_core_worker_stats(); stats->set_num_pending_tasks(task_manager_->NumPendingTasks()); + stats->set_task_queue_length(task_queue_length_); stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope()); if (!current_task_.TaskId().IsNil()) { stats->set_current_task_desc(current_task_.DebugString()); @@ -1139,6 +1145,9 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & (*used_resources_map)[it.first] = quantity; } stats->set_webui_display(webui_display_); + MemoryStoreStats memory_store_stats = memory_store_->GetMemoryStoreStatisticalData(); + stats->set_num_local_objects(memory_store_stats.num_local_objects); + stats->set_used_object_store_memory(memory_store_stats.used_object_store_memory); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 34024b73b..115aff752 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -649,6 +649,9 @@ class CoreWorker { /// String to be displayed on Web UI. std::string webui_display_ GUARDED_BY(mutex_); + /// Number of tasks that have been pushed to the actor but not executed. + std::atomic task_queue_length_; + /// Event loop where tasks are processed. boost::asio::io_service task_execution_service_; diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 78903f7b6..f66da3da9 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -396,4 +396,16 @@ bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id, bool *in_plasma) return false; } +MemoryStoreStats CoreWorkerMemoryStore::GetMemoryStoreStatisticalData() { + absl::MutexLock lock(&mu_); + MemoryStoreStats item; + for (const auto &it : objects_) { + if (!it.second->IsInPlasmaError()) { + item.num_local_objects += 1; + item.used_object_store_memory += it.second->GetSize(); + } + } + return item; +} + } // namespace ray diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 239e1e7ce..fe0bb7bd1 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -12,6 +12,11 @@ namespace ray { +struct MemoryStoreStats { + int32_t num_local_objects = 0; + int64_t used_object_store_memory = 0; +}; + class GetRequest; class CoreWorkerMemoryStore; @@ -117,6 +122,16 @@ class CoreWorkerMemoryStore { return objects_.size(); } + /// Returns stats data of memory usage. + /// + /// \return number of local objects and used memory size. + MemoryStoreStats GetMemoryStoreStatisticalData(); + + /// Returns the memory usage of this store. + /// + /// \return Total size of objects in the store. + uint64_t UsedMemory(); + private: /// Optional callback for putting objects into the plasma store. std::function store_in_plasma_; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 31e9ce69d..1bfedaa7f 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -837,6 +837,13 @@ std::string ObjectManager::DebugString() const { } void ObjectManager::RecordMetrics() const { + int64_t used_memory = 0; + for (const auto &it : local_objects_) { + object_manager::protocol::ObjectInfoT object_info = it.second.object_info; + used_memory += object_info.data_size + object_info.metadata_size; + } + stats::ObjectManagerStats().Record(used_memory, + {{stats::ValueTypeKey, "used_object_store_memory"}}); stats::ObjectManagerStats().Record(local_objects_.size(), {{stats::ValueTypeKey, "num_local_objects"}}); stats::ObjectManagerStats().Record(active_wait_requests_.size(), diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index a5cefa6f8..837ae448b 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -162,6 +162,30 @@ message ResourceMapEntry { repeated ResourceId resource_ids = 2; } +message ViewData { + message Measure { + // A short string that describes the tags for this mesaure, e.g., + // "Tag1:Value1,Tag2:Value2,Tag3:Value3" + string tags = 1; + // Int64 type value (if present). + int64 int_value = 2; + // Double type value (if present). + double double_value = 3; + // Distribution type value (if present). + double distribution_min = 4; + double distribution_mean = 5; + double distribution_max = 6; + double distribution_count = 7; + repeated double distribution_bucket_boundaries = 8; + repeated double distribution_bucket_counts = 9; + } + + // The name of this Census view. + string view_name = 1; + // The list of measures recorded under this view. + repeated Measure measures = 2; +} + // Debug info returned from the core worker. message CoreWorkerStats { // Debug string of the currently executing task. @@ -182,4 +206,10 @@ message CoreWorkerStats { map used_resources = 9; // A string displayed on Dashboard. string webui_display = 10; + // Number of objects stored in local memory. + int32 num_local_objects = 11; + // Used local object store memory. + int64 used_object_store_memory = 12; + // Length of the task queue. + int32 task_queue_length = 13; } diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 6ef1b8d7b..e98cfd8a7 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -57,37 +57,11 @@ message WorkerStats { CoreWorkerStats core_worker_stats = 3; } -message ViewData { - message Measure { - // A short string that describes the tags for this mesaure, e.g., - // "Tag1:Value1,Tag2:Value2,Tag3:Value3" - string tags = 1; - // Int64 type value (if present). - int64 int_value = 2; - // Double type value (if present). - double double_value = 3; - // Distribution type value (if present). - double distribution_min = 4; - double distribution_mean = 5; - double distribution_max = 6; - double distribution_count = 7; - repeated double distribution_bucket_boundaries = 8; - repeated double distribution_bucket_counts = 9; - } - - // The name of this Census view. - string view_name = 1; - // The list of measures recorded under this view. - repeated Measure measures = 2; -} - message GetNodeStatsReply { repeated WorkerStats workers_stats = 1; repeated ViewData view_data = 2; - map available_resources = 3; - map total_resources = 4; - uint32 num_workers = 5; - repeated TaskSpec infeasible_tasks = 6; + uint32 num_workers = 3; + repeated TaskSpec infeasible_tasks = 4; } // Service for inter-node-manager communication. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9ddf9c4c0..c9adfbdba 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2997,23 +2997,6 @@ void NodeManager::HandleNodeStatsRequest(const rpc::GetNodeStatsRequest &request worker_stats->set_pid(driver->Pid()); worker_stats->set_is_driver(true); } - // Record available resources of this node. - const auto &available_resources = - cluster_resource_map_.at(self_node_id_).GetAvailableResources().GetResourceMap(); - // Record total resources of this node. - const auto &total_resources = - cluster_resource_map_.at(self_node_id_).GetTotalResources().GetResourceMap(); - auto available_resources_map = reply->mutable_available_resources(); - auto total_resources_map = reply->mutable_total_resources(); - for (const auto &pair : total_resources) { - (*total_resources_map)[pair.first] = pair.second; - auto it = available_resources.find(pair.first); - if (it != available_resources.end()) { - (*available_resources_map)[pair.first] = it->second; - } else { - (*available_resources_map)[pair.first] = 0.0; - } - } for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { auto infeasible_task = reply->add_infeasible_tasks(); infeasible_task->ParseFromString(task.GetTaskSpecification().Serialize());