[Dashboard] Add utility functions for actor and memory APIs (#11011)

* Add actor and memory utility functions needed by upcoming logical view and memory view APIs

* Add a method to allow printing Dict custom class and add support for hot-reloading local dev environment.

* Address PR comments

* Add unit tests from test metrics to branch for new memory_utils module

* Add note about sorting / grouping

* lint

Co-authored-by: Max Fitton <max@semprehealth.com>
This commit is contained in:
Max Fitton
2020-10-01 23:48:03 -07:00
committed by GitHub
parent 874da9a25f
commit 9a6d01ebf9
4 changed files with 636 additions and 2 deletions
+68
View File
@@ -0,0 +1,68 @@
import time
import re
from collections import defaultdict
PYCLASSNAME_RE = re.compile(r"(.+?)\(")
def construct_actor_groups(actors):
"""actors is a dict from actor id to an actor or an
actor creation task The shared fields currently are
"actorClass", "actorId", and "state" """
actor_groups = _group_actors_by_python_class(actors)
stats_by_group = {
name: _get_actor_group_stats(group)
for name, group in actor_groups.items()
}
summarized_actor_groups = {}
for name, group in actor_groups.items():
summarized_actor_groups[name] = {
"entries": group,
"summary": stats_by_group[name]
}
return summarized_actor_groups
def actor_classname_from_task_spec(task_spec):
return task_spec.get("functionDescriptor", {})\
.get("pythonFunctionDescriptor", {})\
.get("className", "Unknown actor class")
def _group_actors_by_python_class(actors):
groups = defaultdict(list)
for actor in actors.values():
actor_class = actor["actorClass"]
groups[actor_class].append(actor)
return dict(groups)
def _get_actor_group_stats(group):
state_to_count = defaultdict(lambda: 0)
executed_tasks = 0
min_timestamp = None
num_timestamps = 0
sum_timestamps = 0
now = time.time() * 1000 # convert S -> MS
for actor in group:
state_to_count[actor["state"]] += 1
if "timestamp" in actor:
if not min_timestamp or actor["timestamp"] < min_timestamp:
min_timestamp = actor["timestamp"]
num_timestamps += 1
sum_timestamps += now - actor["timestamp"]
if "numExecutedTasks" in actor:
executed_tasks += actor["numExecutedTasks"]
if num_timestamps > 0:
avg_lifetime = int((sum_timestamps / num_timestamps) / 1000)
max_lifetime = int((now - min_timestamp) / 1000)
else:
avg_lifetime = 0
max_lifetime = 0
return {
"stateToCount": state_to_count,
"avgLifetime": avg_lifetime,
"maxLifetime": max_lifetime,
"numExecutedTasks": executed_tasks,
}
+302
View File
@@ -0,0 +1,302 @@
import base64
from collections import defaultdict
from enum import Enum
from typing import List
import ray
from ray._raylet import (TaskID, ActorID, JobID)
import logging
logger = logging.getLogger(__name__)
# These values are used to calculate if objectRefs are actor handles.
TASKID_BYTES_SIZE = TaskID.size()
ACTORID_BYTES_SIZE = ActorID.size()
JOBID_BYTES_SIZE = JobID.size()
# We need to multiply 2 because we need bits size instead of bytes size.
TASKID_RANDOM_BITS_SIZE = (TASKID_BYTES_SIZE - ACTORID_BYTES_SIZE) * 2
ACTORID_RANDOM_BITS_SIZE = (ACTORID_BYTES_SIZE - JOBID_BYTES_SIZE) * 2
def decode_object_ref_if_needed(object_ref: str) -> bytes:
"""Decode objectRef bytes string.
gRPC reply contains an objectRef that is encodded by Base64.
This function is used to decode the objectRef.
Note that there are times that objectRef is already decoded as
a hex string. In this case, just convert it to a binary number.
"""
if object_ref.endswith("="):
# If the object ref ends with =, that means it is base64 encoded.
# Object refs will always have = as a padding
# when it is base64 encoded because objectRef is always 20B.
return base64.standard_b64decode(object_ref)
else:
return ray.utils.hex_to_binary(object_ref)
class SortingType(Enum):
PID = 1
OBJECT_SIZE = 3
REFERENCE_TYPE = 4
class GroupByType(Enum):
NODE_ADDRESS = "node"
STACK_TRACE = "stack_trace"
class ReferenceType:
# We don't use enum because enum is not json serializable.
ACTOR_HANDLE = "ACTOR_HANDLE"
PINNED_IN_MEMORY = "PINNED_IN_MEMORY"
LOCAL_REFERENCE = "LOCAL_REFERENCE"
USED_BY_PENDING_TASK = "USED_BY_PENDING_TASK"
CAPTURED_IN_OBJECT = "CAPTURED_IN_OBJECT"
UNKNOWN_STATUS = "UNKNOWN_STATUS"
class MemoryTableEntry:
def __init__(self, *, object_ref: dict, node_address: str, is_driver: bool,
pid: int):
# worker info
self.is_driver = is_driver
self.pid = pid
self.node_address = node_address
# object info
self.object_size = int(object_ref.get("objectSize", -1))
self.call_site = object_ref.get("callSite", "<Unknown>")
self.object_ref = ray.ObjectRef(
decode_object_ref_if_needed(object_ref["objectId"]))
# reference info
self.local_ref_count = int(object_ref.get("localRefCount", 0))
self.pinned_in_memory = bool(object_ref.get("pinnedInMemory", False))
self.submitted_task_ref_count = int(
object_ref.get("submittedTaskRefCount", 0))
self.contained_in_owned = [
ray.ObjectRef(decode_object_ref_if_needed(object_ref))
for object_ref in object_ref.get("containedInOwned", [])
]
self.reference_type = self._get_reference_type()
def is_valid(self) -> bool:
# If the entry doesn't have a reference type or some invalid state,
# (e.g., no object ref presented), it is considered invalid.
if (not self.pinned_in_memory and self.local_ref_count == 0
and self.submitted_task_ref_count == 0
and len(self.contained_in_owned) == 0):
return False
elif self.object_ref.is_nil():
return False
else:
return True
def group_key(self, group_by_type: GroupByType) -> str:
if group_by_type == GroupByType.NODE_ADDRESS:
return self.node_address
elif group_by_type == GroupByType.STACK_TRACE:
return self.call_site
else:
raise ValueError(f"group by type {group_by_type} is invalid.")
def _get_reference_type(self) -> str:
if self._is_object_ref_actor_handle():
return ReferenceType.ACTOR_HANDLE
if self.pinned_in_memory:
return ReferenceType.PINNED_IN_MEMORY
elif self.submitted_task_ref_count > 0:
return ReferenceType.USED_BY_PENDING_TASK
elif self.local_ref_count > 0:
return ReferenceType.LOCAL_REFERENCE
elif len(self.contained_in_owned) > 0:
return ReferenceType.CAPTURED_IN_OBJECT
else:
return ReferenceType.UNKNOWN_STATUS
def _is_object_ref_actor_handle(self) -> bool:
object_ref_hex = self.object_ref.hex()
# random (8B) | ActorID(6B) | flag (2B) | index (6B)
# ActorID(6B) == ActorRandomByte(4B) + JobID(2B)
# If random bytes are all 'f', but ActorRandomBytes
# are not all 'f', that means it is an actor creation
# task, which is an actor handle.
random_bits = object_ref_hex[:TASKID_RANDOM_BITS_SIZE]
actor_random_bits = object_ref_hex[TASKID_RANDOM_BITS_SIZE:
TASKID_RANDOM_BITS_SIZE +
ACTORID_RANDOM_BITS_SIZE]
if (random_bits == "f" * 16 and not actor_random_bits == "f" * 8):
return True
else:
return False
def as_dict(self):
return {
"object_ref": self.object_ref.hex(),
"pid": self.pid,
"node_ip_address": self.node_address,
"object_size": self.object_size,
"reference_type": self.reference_type,
"call_site": self.call_site,
"local_ref_count": self.local_ref_count,
"pinned_in_memory": self.pinned_in_memory,
"submitted_task_ref_count": self.submitted_task_ref_count,
"contained_in_owned": [
object_ref.hex() for object_ref in self.contained_in_owned
],
"type": "Driver" if self.is_driver else "Worker"
}
def __str__(self):
return self.__repr__()
def __repr__(self):
return str(self.as_dict())
class MemoryTable:
def __init__(self,
entries: List[MemoryTableEntry],
group_by_type: GroupByType = GroupByType.NODE_ADDRESS,
sort_by_type: SortingType = SortingType.PID):
self.table = entries
# Group is a list of memory tables grouped by a group key.
self.group = {}
self.summary = defaultdict(int)
# NOTE YOU MUST SORT TABLE BEFORE GROUPING.
# self._group_by(..)._sort_by(..) != self._sort_by(..)._group_by(..)
if group_by_type and sort_by_type:
self.setup(group_by_type, sort_by_type)
elif group_by_type:
self._group_by(group_by_type)
elif sort_by_type:
self._sort_by(sort_by_type)
def setup(self, group_by_type: GroupByType, sort_by_type: SortingType):
"""Setup memory table.
This will sort entries first and group them after.
Sort order will be still kept.
"""
self._sort_by(sort_by_type)._group_by(group_by_type)
for group_memory_table in self.group.values():
group_memory_table.summarize()
self.summarize()
return self
def insert_entry(self, entry: MemoryTableEntry):
self.table.append(entry)
def summarize(self):
# Reset summary.
total_object_size = 0
total_local_ref_count = 0
total_pinned_in_memory = 0
total_used_by_pending_task = 0
total_captured_in_objects = 0
total_actor_handles = 0
for entry in self.table:
if entry.object_size > 0:
total_object_size += entry.object_size
if entry.reference_type == ReferenceType.LOCAL_REFERENCE:
total_local_ref_count += 1
elif entry.reference_type == ReferenceType.PINNED_IN_MEMORY:
total_pinned_in_memory += 1
elif entry.reference_type == ReferenceType.USED_BY_PENDING_TASK:
total_used_by_pending_task += 1
elif entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT:
total_captured_in_objects += 1
elif entry.reference_type == ReferenceType.ACTOR_HANDLE:
total_actor_handles += 1
self.summary = {
"total_object_size": total_object_size,
"total_local_ref_count": total_local_ref_count,
"total_pinned_in_memory": total_pinned_in_memory,
"total_used_by_pending_task": total_used_by_pending_task,
"total_captured_in_objects": total_captured_in_objects,
"total_actor_handles": total_actor_handles
}
return self
def _sort_by(self, sorting_type: SortingType):
if sorting_type == SortingType.PID:
self.table.sort(key=lambda entry: entry.pid)
elif sorting_type == SortingType.OBJECT_SIZE:
self.table.sort(key=lambda entry: entry.object_size)
elif sorting_type == SortingType.REFERENCE_TYPE:
self.table.sort(key=lambda entry: entry.reference_type)
else:
raise ValueError(f"Give sorting type: {sorting_type} is invalid.")
return self
def _group_by(self, group_by_type: GroupByType):
"""Group entries and summarize the result.
NOTE: Each group is another MemoryTable.
"""
# Reset group
self.group = {}
# Build entries per group.
group = defaultdict(list)
for entry in self.table:
group[entry.group_key(group_by_type)].append(entry)
# Build a group table.
for group_key, entries in group.items():
self.group[group_key] = MemoryTable(
entries, group_by_type=None, sort_by_type=None)
for group_key, group_memory_table in self.group.items():
group_memory_table.summarize()
return self
def as_dict(self):
return {
"summary": self.summary,
"group": {
group_key: {
"entries": group_memory_table.get_entries(),
"summary": group_memory_table.summary
}
for group_key, group_memory_table in self.group.items()
}
}
def get_entries(self) -> List[dict]:
return [entry.__dict__() for entry in self.table]
def __repr__(self):
return str(self.__dict__())
def __str__(self):
return self.__repr__()
def construct_memory_table(workers_info: List,
group_by: GroupByType = GroupByType.NODE_ADDRESS,
sort_by=SortingType.OBJECT_SIZE) -> MemoryTable:
memory_table_entries = []
for worker_info in workers_info:
pid = worker_info["pid"]
is_driver = worker_info.get("isDriver", False)
core_worker_stats = worker_info["coreWorkerStats"]
node_address = core_worker_stats["ipAddress"]
object_refs = core_worker_stats.get("objectRefs", [])
for object_ref in object_refs:
memory_table_entry = MemoryTableEntry(
object_ref=object_ref,
node_address=node_address,
is_driver=is_driver,
pid=pid)
if memory_table_entry.is_valid():
memory_table_entries.append(memory_table_entry)
memory_table = MemoryTable(
memory_table_entries, group_by_type=group_by, sort_by_type=sort_by)
return memory_table
+252
View File
@@ -0,0 +1,252 @@
import ray
from ray.new_dashboard.memory_utils import (
ReferenceType, decode_object_ref_if_needed, MemoryTableEntry, MemoryTable,
SortingType)
"""Memory Table Unit Test"""
NODE_ADDRESS = "127.0.0.1"
IS_DRIVER = True
PID = 1
OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA="
ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000"
DECODED_ID = decode_object_ref_if_needed(OBJECT_ID)
OBJECT_SIZE = 100
def build_memory_entry(*,
local_ref_count,
pinned_in_memory,
submitted_task_reference_count,
contained_in_owned,
object_size,
pid,
object_id=OBJECT_ID,
node_address=NODE_ADDRESS):
object_ref = {
"objectId": object_id,
"callSite": "(task call) /Users:458",
"objectSize": object_size,
"localRefCount": local_ref_count,
"pinnedInMemory": pinned_in_memory,
"submittedTaskRefCount": submitted_task_reference_count,
"containedInOwned": contained_in_owned
}
return MemoryTableEntry(
object_ref=object_ref,
node_address=node_address,
is_driver=IS_DRIVER,
pid=pid)
def build_local_reference_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=1,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_used_by_pending_task_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=2,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_captured_in_object_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[OBJECT_ID],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_actor_handle_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=1,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address,
object_id=ACTOR_ID)
def build_pinned_in_memory_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=0,
pinned_in_memory=True,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS,
reference_type=ReferenceType.PINNED_IN_MEMORY):
if reference_type == ReferenceType.USED_BY_PENDING_TASK:
return build_used_by_pending_task_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.LOCAL_REFERENCE:
return build_local_reference_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.PINNED_IN_MEMORY:
return build_pinned_in_memory_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.ACTOR_HANDLE:
return build_actor_handle_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.CAPTURED_IN_OBJECT:
return build_captured_in_object_entry(
pid=pid, object_size=object_size, node_address=node_address)
def test_invalid_memory_entry():
memory_entry = build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=OBJECT_SIZE,
pid=PID)
assert memory_entry.is_valid() is False
memory_entry = build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=-1,
pid=PID)
assert memory_entry.is_valid() is False
def test_valid_reference_memory_entry():
memory_entry = build_local_reference_entry()
assert memory_entry.reference_type == ReferenceType.LOCAL_REFERENCE
assert memory_entry.object_ref == ray.ObjectRef(
decode_object_ref_if_needed(OBJECT_ID))
assert memory_entry.is_valid() is True
def test_reference_type():
# pinned in memory
memory_entry = build_pinned_in_memory_entry()
assert memory_entry.reference_type == ReferenceType.PINNED_IN_MEMORY
# used by pending task
memory_entry = build_used_by_pending_task_entry()
assert memory_entry.reference_type == ReferenceType.USED_BY_PENDING_TASK
# captued in object
memory_entry = build_captured_in_object_entry()
assert memory_entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT
# actor handle
memory_entry = build_actor_handle_entry()
assert memory_entry.reference_type == ReferenceType.ACTOR_HANDLE
def test_memory_table_summary():
entries = [
build_pinned_in_memory_entry(),
build_used_by_pending_task_entry(),
build_captured_in_object_entry(),
build_actor_handle_entry(),
build_local_reference_entry(),
build_local_reference_entry()
]
memory_table = MemoryTable(entries)
assert len(memory_table.group) == 1
assert memory_table.summary["total_actor_handles"] == 1
assert memory_table.summary["total_captured_in_objects"] == 1
assert memory_table.summary["total_local_ref_count"] == 2
assert memory_table.summary[
"total_object_size"] == len(entries) * OBJECT_SIZE
assert memory_table.summary["total_pinned_in_memory"] == 1
assert memory_table.summary["total_used_by_pending_task"] == 1
def test_memory_table_sort_by_pid():
unsort = [1, 3, 2]
entries = [build_entry(pid=pid) for pid in unsort]
memory_table = MemoryTable(entries, sort_by_type=SortingType.PID)
sort = sorted(unsort)
for pid, entry in zip(sort, memory_table.table):
assert pid == entry.pid
def test_memory_table_sort_by_reference_type():
unsort = [
ReferenceType.USED_BY_PENDING_TASK, ReferenceType.LOCAL_REFERENCE,
ReferenceType.LOCAL_REFERENCE, ReferenceType.PINNED_IN_MEMORY
]
entries = [
build_entry(reference_type=reference_type) for reference_type in unsort
]
memory_table = MemoryTable(
entries, sort_by_type=SortingType.REFERENCE_TYPE)
sort = sorted(unsort)
for reference_type, entry in zip(sort, memory_table.table):
assert reference_type == entry.reference_type
def test_memory_table_sort_by_object_size():
unsort = [312, 214, -1, 1244, 642]
entries = [build_entry(object_size=object_size) for object_size in unsort]
memory_table = MemoryTable(entries, sort_by_type=SortingType.OBJECT_SIZE)
sort = sorted(unsort)
for object_size, entry in zip(sort, memory_table.table):
assert object_size == entry.object_size
def test_group_by():
node_second = "127.0.0.2"
node_first = "127.0.0.1"
entries = [
build_entry(node_address=node_second, pid=2),
build_entry(node_address=node_second, pid=1),
build_entry(node_address=node_first, pid=2),
build_entry(node_address=node_first, pid=1)
]
memory_table = MemoryTable(entries)
# Make sure it is correctly grouped
assert node_first in memory_table.group
assert node_second in memory_table.group
# make sure pid is sorted in the right order.
for group_key, group_memory_table in memory_table.group.items():
pid = 1
for entry in group_memory_table.table:
assert pid == entry.pid
pid += 1
if __name__ == "__main__":
import sys
import pytest
sys.exit(pytest.main(["-v", __file__]))
+14 -2
View File
@@ -15,7 +15,7 @@ from base64 import b64decode
from collections.abc import MutableMapping, Mapping
from collections import namedtuple
from typing import Any
import os
import aioredis
import aiohttp.web
from aiohttp import hdrs
@@ -218,13 +218,21 @@ class CustomEncoder(json.JSONEncoder):
async def rest_response(success, message, **kwargs) -> aiohttp.web.Response:
# In the dev context we allow a dev server running on a
# different port to consume the API, meaning we need to allow
# cross-origin access
if os.environ.get("RAY_DASHBOARD_DEV") == "1":
headers = {"Access-Control-Allow-Origin": "*"}
else:
headers = {}
return aiohttp.web.json_response(
{
"result": success,
"msg": message,
"data": to_google_style(kwargs)
},
dumps=functools.partial(json.dumps, cls=CustomEncoder))
dumps=functools.partial(json.dumps, cls=CustomEncoder),
headers=headers)
def to_camel_case(snake_str):
@@ -238,6 +246,7 @@ def to_camel_case(snake_str):
def to_google_style(d):
"""Recursive convert all keys in dict to google style."""
new_dict = {}
for k, v in d.items():
if isinstance(v, dict):
new_dict[to_camel_case(k)] = to_google_style(v)
@@ -387,6 +396,9 @@ class Dict(MutableMapping):
def __iter__(self):
return iter(copy.deepcopy(self._data))
def __str__(self):
return str(self._data)
def reset(self, d):
assert isinstance(d, Mapping)
for key in self._data.keys() - d.keys():