diff --git a/dashboard/actor_utils.py b/dashboard/actor_utils.py new file mode 100644 index 000000000..739dc13a7 --- /dev/null +++ b/dashboard/actor_utils.py @@ -0,0 +1,68 @@ +import time +import re +from collections import defaultdict + +PYCLASSNAME_RE = re.compile(r"(.+?)\(") + + +def construct_actor_groups(actors): + """actors is a dict from actor id to an actor or an + actor creation task The shared fields currently are + "actorClass", "actorId", and "state" """ + actor_groups = _group_actors_by_python_class(actors) + stats_by_group = { + name: _get_actor_group_stats(group) + for name, group in actor_groups.items() + } + + summarized_actor_groups = {} + for name, group in actor_groups.items(): + summarized_actor_groups[name] = { + "entries": group, + "summary": stats_by_group[name] + } + return summarized_actor_groups + + +def actor_classname_from_task_spec(task_spec): + return task_spec.get("functionDescriptor", {})\ + .get("pythonFunctionDescriptor", {})\ + .get("className", "Unknown actor class") + + +def _group_actors_by_python_class(actors): + groups = defaultdict(list) + for actor in actors.values(): + actor_class = actor["actorClass"] + groups[actor_class].append(actor) + return dict(groups) + + +def _get_actor_group_stats(group): + state_to_count = defaultdict(lambda: 0) + executed_tasks = 0 + min_timestamp = None + num_timestamps = 0 + sum_timestamps = 0 + now = time.time() * 1000 # convert S -> MS + for actor in group: + state_to_count[actor["state"]] += 1 + if "timestamp" in actor: + if not min_timestamp or actor["timestamp"] < min_timestamp: + min_timestamp = actor["timestamp"] + num_timestamps += 1 + sum_timestamps += now - actor["timestamp"] + if "numExecutedTasks" in actor: + executed_tasks += actor["numExecutedTasks"] + if num_timestamps > 0: + avg_lifetime = int((sum_timestamps / num_timestamps) / 1000) + max_lifetime = int((now - min_timestamp) / 1000) + else: + avg_lifetime = 0 + max_lifetime = 0 + return { + "stateToCount": state_to_count, + "avgLifetime": avg_lifetime, + "maxLifetime": max_lifetime, + "numExecutedTasks": executed_tasks, + } diff --git a/dashboard/memory_utils.py b/dashboard/memory_utils.py new file mode 100644 index 000000000..544aabce4 --- /dev/null +++ b/dashboard/memory_utils.py @@ -0,0 +1,302 @@ +import base64 + +from collections import defaultdict +from enum import Enum +from typing import List + +import ray + +from ray._raylet import (TaskID, ActorID, JobID) +import logging + +logger = logging.getLogger(__name__) + +# These values are used to calculate if objectRefs are actor handles. +TASKID_BYTES_SIZE = TaskID.size() +ACTORID_BYTES_SIZE = ActorID.size() +JOBID_BYTES_SIZE = JobID.size() +# We need to multiply 2 because we need bits size instead of bytes size. +TASKID_RANDOM_BITS_SIZE = (TASKID_BYTES_SIZE - ACTORID_BYTES_SIZE) * 2 +ACTORID_RANDOM_BITS_SIZE = (ACTORID_BYTES_SIZE - JOBID_BYTES_SIZE) * 2 + + +def decode_object_ref_if_needed(object_ref: str) -> bytes: + """Decode objectRef bytes string. + + gRPC reply contains an objectRef that is encodded by Base64. + This function is used to decode the objectRef. + Note that there are times that objectRef is already decoded as + a hex string. In this case, just convert it to a binary number. + """ + if object_ref.endswith("="): + # If the object ref ends with =, that means it is base64 encoded. + # Object refs will always have = as a padding + # when it is base64 encoded because objectRef is always 20B. + return base64.standard_b64decode(object_ref) + else: + return ray.utils.hex_to_binary(object_ref) + + +class SortingType(Enum): + PID = 1 + OBJECT_SIZE = 3 + REFERENCE_TYPE = 4 + + +class GroupByType(Enum): + NODE_ADDRESS = "node" + STACK_TRACE = "stack_trace" + + +class ReferenceType: + # We don't use enum because enum is not json serializable. + ACTOR_HANDLE = "ACTOR_HANDLE" + PINNED_IN_MEMORY = "PINNED_IN_MEMORY" + LOCAL_REFERENCE = "LOCAL_REFERENCE" + USED_BY_PENDING_TASK = "USED_BY_PENDING_TASK" + CAPTURED_IN_OBJECT = "CAPTURED_IN_OBJECT" + UNKNOWN_STATUS = "UNKNOWN_STATUS" + + +class MemoryTableEntry: + def __init__(self, *, object_ref: dict, node_address: str, is_driver: bool, + pid: int): + # worker info + self.is_driver = is_driver + self.pid = pid + self.node_address = node_address + + # object info + self.object_size = int(object_ref.get("objectSize", -1)) + self.call_site = object_ref.get("callSite", "") + self.object_ref = ray.ObjectRef( + decode_object_ref_if_needed(object_ref["objectId"])) + + # reference info + self.local_ref_count = int(object_ref.get("localRefCount", 0)) + self.pinned_in_memory = bool(object_ref.get("pinnedInMemory", False)) + self.submitted_task_ref_count = int( + object_ref.get("submittedTaskRefCount", 0)) + self.contained_in_owned = [ + ray.ObjectRef(decode_object_ref_if_needed(object_ref)) + for object_ref in object_ref.get("containedInOwned", []) + ] + self.reference_type = self._get_reference_type() + + def is_valid(self) -> bool: + # If the entry doesn't have a reference type or some invalid state, + # (e.g., no object ref presented), it is considered invalid. + if (not self.pinned_in_memory and self.local_ref_count == 0 + and self.submitted_task_ref_count == 0 + and len(self.contained_in_owned) == 0): + return False + elif self.object_ref.is_nil(): + return False + else: + return True + + def group_key(self, group_by_type: GroupByType) -> str: + if group_by_type == GroupByType.NODE_ADDRESS: + return self.node_address + elif group_by_type == GroupByType.STACK_TRACE: + return self.call_site + else: + raise ValueError(f"group by type {group_by_type} is invalid.") + + def _get_reference_type(self) -> str: + if self._is_object_ref_actor_handle(): + return ReferenceType.ACTOR_HANDLE + if self.pinned_in_memory: + return ReferenceType.PINNED_IN_MEMORY + elif self.submitted_task_ref_count > 0: + return ReferenceType.USED_BY_PENDING_TASK + elif self.local_ref_count > 0: + return ReferenceType.LOCAL_REFERENCE + elif len(self.contained_in_owned) > 0: + return ReferenceType.CAPTURED_IN_OBJECT + else: + return ReferenceType.UNKNOWN_STATUS + + def _is_object_ref_actor_handle(self) -> bool: + object_ref_hex = self.object_ref.hex() + + # random (8B) | ActorID(6B) | flag (2B) | index (6B) + # ActorID(6B) == ActorRandomByte(4B) + JobID(2B) + # If random bytes are all 'f', but ActorRandomBytes + # are not all 'f', that means it is an actor creation + # task, which is an actor handle. + random_bits = object_ref_hex[:TASKID_RANDOM_BITS_SIZE] + actor_random_bits = object_ref_hex[TASKID_RANDOM_BITS_SIZE: + TASKID_RANDOM_BITS_SIZE + + ACTORID_RANDOM_BITS_SIZE] + if (random_bits == "f" * 16 and not actor_random_bits == "f" * 8): + return True + else: + return False + + def as_dict(self): + return { + "object_ref": self.object_ref.hex(), + "pid": self.pid, + "node_ip_address": self.node_address, + "object_size": self.object_size, + "reference_type": self.reference_type, + "call_site": self.call_site, + "local_ref_count": self.local_ref_count, + "pinned_in_memory": self.pinned_in_memory, + "submitted_task_ref_count": self.submitted_task_ref_count, + "contained_in_owned": [ + object_ref.hex() for object_ref in self.contained_in_owned + ], + "type": "Driver" if self.is_driver else "Worker" + } + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return str(self.as_dict()) + + +class MemoryTable: + def __init__(self, + entries: List[MemoryTableEntry], + group_by_type: GroupByType = GroupByType.NODE_ADDRESS, + sort_by_type: SortingType = SortingType.PID): + self.table = entries + # Group is a list of memory tables grouped by a group key. + self.group = {} + self.summary = defaultdict(int) + # NOTE YOU MUST SORT TABLE BEFORE GROUPING. + # self._group_by(..)._sort_by(..) != self._sort_by(..)._group_by(..) + if group_by_type and sort_by_type: + self.setup(group_by_type, sort_by_type) + elif group_by_type: + self._group_by(group_by_type) + elif sort_by_type: + self._sort_by(sort_by_type) + + def setup(self, group_by_type: GroupByType, sort_by_type: SortingType): + """Setup memory table. + + This will sort entries first and group them after. + Sort order will be still kept. + """ + self._sort_by(sort_by_type)._group_by(group_by_type) + for group_memory_table in self.group.values(): + group_memory_table.summarize() + self.summarize() + return self + + def insert_entry(self, entry: MemoryTableEntry): + self.table.append(entry) + + def summarize(self): + # Reset summary. + total_object_size = 0 + total_local_ref_count = 0 + total_pinned_in_memory = 0 + total_used_by_pending_task = 0 + total_captured_in_objects = 0 + total_actor_handles = 0 + + for entry in self.table: + if entry.object_size > 0: + total_object_size += entry.object_size + if entry.reference_type == ReferenceType.LOCAL_REFERENCE: + total_local_ref_count += 1 + elif entry.reference_type == ReferenceType.PINNED_IN_MEMORY: + total_pinned_in_memory += 1 + elif entry.reference_type == ReferenceType.USED_BY_PENDING_TASK: + total_used_by_pending_task += 1 + elif entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT: + total_captured_in_objects += 1 + elif entry.reference_type == ReferenceType.ACTOR_HANDLE: + total_actor_handles += 1 + + self.summary = { + "total_object_size": total_object_size, + "total_local_ref_count": total_local_ref_count, + "total_pinned_in_memory": total_pinned_in_memory, + "total_used_by_pending_task": total_used_by_pending_task, + "total_captured_in_objects": total_captured_in_objects, + "total_actor_handles": total_actor_handles + } + return self + + def _sort_by(self, sorting_type: SortingType): + if sorting_type == SortingType.PID: + self.table.sort(key=lambda entry: entry.pid) + elif sorting_type == SortingType.OBJECT_SIZE: + self.table.sort(key=lambda entry: entry.object_size) + elif sorting_type == SortingType.REFERENCE_TYPE: + self.table.sort(key=lambda entry: entry.reference_type) + else: + raise ValueError(f"Give sorting type: {sorting_type} is invalid.") + return self + + def _group_by(self, group_by_type: GroupByType): + """Group entries and summarize the result. + + NOTE: Each group is another MemoryTable. + """ + # Reset group + self.group = {} + + # Build entries per group. + group = defaultdict(list) + for entry in self.table: + group[entry.group_key(group_by_type)].append(entry) + + # Build a group table. + for group_key, entries in group.items(): + self.group[group_key] = MemoryTable( + entries, group_by_type=None, sort_by_type=None) + for group_key, group_memory_table in self.group.items(): + group_memory_table.summarize() + return self + + def as_dict(self): + return { + "summary": self.summary, + "group": { + group_key: { + "entries": group_memory_table.get_entries(), + "summary": group_memory_table.summary + } + for group_key, group_memory_table in self.group.items() + } + } + + def get_entries(self) -> List[dict]: + return [entry.__dict__() for entry in self.table] + + def __repr__(self): + return str(self.__dict__()) + + def __str__(self): + return self.__repr__() + + +def construct_memory_table(workers_info: List, + group_by: GroupByType = GroupByType.NODE_ADDRESS, + sort_by=SortingType.OBJECT_SIZE) -> MemoryTable: + memory_table_entries = [] + for worker_info in workers_info: + pid = worker_info["pid"] + is_driver = worker_info.get("isDriver", False) + core_worker_stats = worker_info["coreWorkerStats"] + node_address = core_worker_stats["ipAddress"] + object_refs = core_worker_stats.get("objectRefs", []) + + for object_ref in object_refs: + memory_table_entry = MemoryTableEntry( + object_ref=object_ref, + node_address=node_address, + is_driver=is_driver, + pid=pid) + if memory_table_entry.is_valid(): + memory_table_entries.append(memory_table_entry) + memory_table = MemoryTable( + memory_table_entries, group_by_type=group_by, sort_by_type=sort_by) + return memory_table diff --git a/dashboard/tests/test_memory_utils.py b/dashboard/tests/test_memory_utils.py new file mode 100644 index 000000000..f58ecd8ae --- /dev/null +++ b/dashboard/tests/test_memory_utils.py @@ -0,0 +1,252 @@ +import ray +from ray.new_dashboard.memory_utils import ( + ReferenceType, decode_object_ref_if_needed, MemoryTableEntry, MemoryTable, + SortingType) +"""Memory Table Unit Test""" + +NODE_ADDRESS = "127.0.0.1" +IS_DRIVER = True +PID = 1 +OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA=" +ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000" +DECODED_ID = decode_object_ref_if_needed(OBJECT_ID) +OBJECT_SIZE = 100 + + +def build_memory_entry(*, + local_ref_count, + pinned_in_memory, + submitted_task_reference_count, + contained_in_owned, + object_size, + pid, + object_id=OBJECT_ID, + node_address=NODE_ADDRESS): + object_ref = { + "objectId": object_id, + "callSite": "(task call) /Users:458", + "objectSize": object_size, + "localRefCount": local_ref_count, + "pinnedInMemory": pinned_in_memory, + "submittedTaskRefCount": submitted_task_reference_count, + "containedInOwned": contained_in_owned + } + return MemoryTableEntry( + object_ref=object_ref, + node_address=node_address, + is_driver=IS_DRIVER, + pid=pid) + + +def build_local_reference_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=1, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_used_by_pending_task_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=2, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_captured_in_object_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[OBJECT_ID], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_actor_handle_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=1, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address, + object_id=ACTOR_ID) + + +def build_pinned_in_memory_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=0, + pinned_in_memory=True, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS, + reference_type=ReferenceType.PINNED_IN_MEMORY): + if reference_type == ReferenceType.USED_BY_PENDING_TASK: + return build_used_by_pending_task_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.LOCAL_REFERENCE: + return build_local_reference_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.PINNED_IN_MEMORY: + return build_pinned_in_memory_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.ACTOR_HANDLE: + return build_actor_handle_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.CAPTURED_IN_OBJECT: + return build_captured_in_object_entry( + pid=pid, object_size=object_size, node_address=node_address) + + +def test_invalid_memory_entry(): + memory_entry = build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=OBJECT_SIZE, + pid=PID) + assert memory_entry.is_valid() is False + memory_entry = build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=-1, + pid=PID) + assert memory_entry.is_valid() is False + + +def test_valid_reference_memory_entry(): + memory_entry = build_local_reference_entry() + assert memory_entry.reference_type == ReferenceType.LOCAL_REFERENCE + assert memory_entry.object_ref == ray.ObjectRef( + decode_object_ref_if_needed(OBJECT_ID)) + assert memory_entry.is_valid() is True + + +def test_reference_type(): + # pinned in memory + memory_entry = build_pinned_in_memory_entry() + assert memory_entry.reference_type == ReferenceType.PINNED_IN_MEMORY + + # used by pending task + memory_entry = build_used_by_pending_task_entry() + assert memory_entry.reference_type == ReferenceType.USED_BY_PENDING_TASK + + # captued in object + memory_entry = build_captured_in_object_entry() + assert memory_entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT + + # actor handle + memory_entry = build_actor_handle_entry() + assert memory_entry.reference_type == ReferenceType.ACTOR_HANDLE + + +def test_memory_table_summary(): + entries = [ + build_pinned_in_memory_entry(), + build_used_by_pending_task_entry(), + build_captured_in_object_entry(), + build_actor_handle_entry(), + build_local_reference_entry(), + build_local_reference_entry() + ] + memory_table = MemoryTable(entries) + assert len(memory_table.group) == 1 + assert memory_table.summary["total_actor_handles"] == 1 + assert memory_table.summary["total_captured_in_objects"] == 1 + assert memory_table.summary["total_local_ref_count"] == 2 + assert memory_table.summary[ + "total_object_size"] == len(entries) * OBJECT_SIZE + assert memory_table.summary["total_pinned_in_memory"] == 1 + assert memory_table.summary["total_used_by_pending_task"] == 1 + + +def test_memory_table_sort_by_pid(): + unsort = [1, 3, 2] + entries = [build_entry(pid=pid) for pid in unsort] + memory_table = MemoryTable(entries, sort_by_type=SortingType.PID) + sort = sorted(unsort) + for pid, entry in zip(sort, memory_table.table): + assert pid == entry.pid + + +def test_memory_table_sort_by_reference_type(): + unsort = [ + ReferenceType.USED_BY_PENDING_TASK, ReferenceType.LOCAL_REFERENCE, + ReferenceType.LOCAL_REFERENCE, ReferenceType.PINNED_IN_MEMORY + ] + entries = [ + build_entry(reference_type=reference_type) for reference_type in unsort + ] + memory_table = MemoryTable( + entries, sort_by_type=SortingType.REFERENCE_TYPE) + sort = sorted(unsort) + for reference_type, entry in zip(sort, memory_table.table): + assert reference_type == entry.reference_type + + +def test_memory_table_sort_by_object_size(): + unsort = [312, 214, -1, 1244, 642] + entries = [build_entry(object_size=object_size) for object_size in unsort] + memory_table = MemoryTable(entries, sort_by_type=SortingType.OBJECT_SIZE) + sort = sorted(unsort) + for object_size, entry in zip(sort, memory_table.table): + assert object_size == entry.object_size + + +def test_group_by(): + node_second = "127.0.0.2" + node_first = "127.0.0.1" + entries = [ + build_entry(node_address=node_second, pid=2), + build_entry(node_address=node_second, pid=1), + build_entry(node_address=node_first, pid=2), + build_entry(node_address=node_first, pid=1) + ] + memory_table = MemoryTable(entries) + + # Make sure it is correctly grouped + assert node_first in memory_table.group + assert node_second in memory_table.group + + # make sure pid is sorted in the right order. + for group_key, group_memory_table in memory_table.group.items(): + pid = 1 + for entry in group_memory_table.table: + assert pid == entry.pid + pid += 1 + + +if __name__ == "__main__": + import sys + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/utils.py b/dashboard/utils.py index 9fd170890..c4f8010dc 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -15,7 +15,7 @@ from base64 import b64decode from collections.abc import MutableMapping, Mapping from collections import namedtuple from typing import Any - +import os import aioredis import aiohttp.web from aiohttp import hdrs @@ -218,13 +218,21 @@ class CustomEncoder(json.JSONEncoder): async def rest_response(success, message, **kwargs) -> aiohttp.web.Response: + # In the dev context we allow a dev server running on a + # different port to consume the API, meaning we need to allow + # cross-origin access + if os.environ.get("RAY_DASHBOARD_DEV") == "1": + headers = {"Access-Control-Allow-Origin": "*"} + else: + headers = {} return aiohttp.web.json_response( { "result": success, "msg": message, "data": to_google_style(kwargs) }, - dumps=functools.partial(json.dumps, cls=CustomEncoder)) + dumps=functools.partial(json.dumps, cls=CustomEncoder), + headers=headers) def to_camel_case(snake_str): @@ -238,6 +246,7 @@ def to_camel_case(snake_str): def to_google_style(d): """Recursive convert all keys in dict to google style.""" new_dict = {} + for k, v in d.items(): if isinstance(v, dict): new_dict[to_camel_case(k)] = to_google_style(v) @@ -387,6 +396,9 @@ class Dict(MutableMapping): def __iter__(self): return iter(copy.deepcopy(self._data)) + def __str__(self): + return str(self._data) + def reset(self, d): assert isinstance(d, Mapping) for key in self._data.keys() - d.keys():