[Dashboard] Display actor task execution info (#6705)

Co-authored-by: Philipp Moritz <pcmoritz@gmail.com>
This commit is contained in:
Yunzhi Zhang
2020-01-22 22:33:55 -08:00
committed by Philipp Moritz
parent ae9a3a2237
commit 0834bda8c1
12 changed files with 320 additions and 36 deletions
+10
View File
@@ -633,6 +633,13 @@ cdef execute_task(
with core_worker.profile_event(b"task:deserialize_arguments"):
args, kwargs = deserialize_args(c_args, c_arg_reference_ids)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
actor_title = "{}({}, {})".format(
class_name, repr(args), repr(kwargs))
core_worker.set_actor_title(actor_title.encode("utf-8"))
# Execute the task.
with ray.worker._changeproctitle(title, next_title):
with core_worker.profile_event(b"task:execute"):
@@ -810,6 +817,9 @@ cdef class CoreWorker:
def set_webui_display(self, message):
self.core_worker.get().SetWebuiDisplay(message)
def set_actor_title(self, title):
self.core_worker.get().SetActorTitle(title)
def get_objects(self, object_ids, TaskID current_task_id,
int64_t timeout_ms=-1):
cdef:
+7 -1
View File
@@ -100,19 +100,25 @@ export interface RayletInfoResponse {
[actorId: string]:
| {
actorId: string;
actorTitle: string;
averageTaskExecutionSpeed: number;
children: RayletInfoResponse["actors"];
currentTaskFuncDesc: string[];
ipAddress: string;
isDirectCall: boolean;
jobId: string;
nodeId: string;
numExecutedTasks: number;
numLocalObjects: number;
numObjectIdsInScope: number;
pid: number;
port: number;
state: 0 | 1 | 2;
taskQueueLength: number;
timestamp: number;
usedObjectStoreMemory: number;
usedResources: { [key: string]: number };
currentTaskDesc?: string;
currentTaskFuncDesc?: string[];
numPendingTasks?: number;
webuiDisplay?: string;
}
@@ -67,8 +67,14 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
actor.state !== -1
? [
{
label: "ID",
value: actor.actorId
label: "ActorTitle",
value:
actor.actorTitle
},
{
label: "State",
value:
actor.state.toLocaleString()
},
{
label: "Resources",
@@ -81,14 +87,32 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
{
label: "Pending",
value:
actor.taskQueueLength !== undefined &&
actor.taskQueueLength > 0 &&
actor.taskQueueLength.toLocaleString()
},
{
label: "Executed",
value:
actor.numExecutedTasks.toLocaleString()
},
{
label: "NumObjectIdsInScope",
value:
actor.numObjectIdsInScope.toLocaleString()
},
{
label: "NumLocalObjects",
value:
actor.numLocalObjects.toLocaleString()
},
{
label: "UsedLocalObjectMemory",
value:
actor.usedObjectStoreMemory.toLocaleString()
},
{
label: "Task",
value:
actor.currentTaskFuncDesc && actor.currentTaskFuncDesc.join(".")
actor.currentTaskFuncDesc.join(".")
}
]
: [
+118 -29
View File
@@ -16,6 +16,7 @@ import threading
import time
import traceback
import yaml
import uuid
from base64 import b64decode
from collections import defaultdict
@@ -27,6 +28,8 @@ from google.protobuf.json_format import MessageToDict
import ray
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
import ray.ray_constants as ray_constants
# Logger for this module. It should be configured at the entry point
@@ -54,18 +57,18 @@ def format_resource(resource_name, quantity):
return "{}".format(round_resource_value(quantity))
def format_reply(reply):
def format_reply_id(reply):
if isinstance(reply, dict):
for k, v in reply.items():
if isinstance(v, dict) or isinstance(v, list):
format_reply(v)
format_reply_id(v)
else:
if k.endswith("Id"):
v = b64decode(v)
reply[k] = ray.utils.binary_to_hex(v)
elif isinstance(reply, list):
for item in reply:
format_reply(item)
format_reply_id(item)
def measures_to_dict(measures):
@@ -190,12 +193,14 @@ class Dashboard(object):
async def raylet_info(req) -> aiohttp.web.Response:
D = self.raylet_stats.get_raylet_stats()
workers_info = sum(
(data.get("workersStats", []) for data in D.values()), [])
workers_info_by_node = {
data["nodeId"]: data.get("workersStats")
for data in D.values()
}
infeasible_tasks = sum(
(data.get("infeasibleTasks", []) for data in D.values()), [])
actor_tree = self.node_stats.get_actor_tree(
workers_info, infeasible_tasks)
workers_info_by_node, infeasible_tasks)
for address, data in D.items():
# process view data
measures_dicts = {}
@@ -246,6 +251,24 @@ class Dashboard(object):
result = {"nodes": D, "actors": actor_tree}
return await json_response(result=result)
async def launch_profiling(req) -> aiohttp.web.Response:
node_id = req.query.get("node_id")
pid = int(req.query.get("pid"))
duration = int(req.query.get("duration"))
profiling_id = self.raylet_stats.launch_profiling(
node_id=node_id, pid=pid, duration=duration)
return aiohttp.web.json_response(str(profiling_id))
async def check_profiling_status(req) -> aiohttp.web.Response:
profiling_id = req.query.get("profiling_id")
return aiohttp.web.json_response(
self.raylet_stats.check_profiling_status(profiling_id))
async def get_profiling_info(req) -> aiohttp.web.Response:
profiling_id = req.query.get("profiling_id")
return aiohttp.web.json_response(
self.raylet_stats.get_profiling_info(profiling_id))
async def logs(req) -> aiohttp.web.Response:
hostname = req.query.get("hostname")
pid = req.query.get("pid")
@@ -280,6 +303,10 @@ class Dashboard(object):
self.app.router.add_get("/api/ray_config", ray_config)
self.app.router.add_get("/api/node_info", node_info)
self.app.router.add_get("/api/raylet_info", raylet_info)
self.app.router.add_get("/api/launch_profiling", launch_profiling)
self.app.router.add_get("/api/check_profiling_status",
check_profiling_status)
self.app.router.add_get("/api/get_profiling_info", get_profiling_info)
self.app.router.add_get("/api/logs", logs)
self.app.router.add_get("/api/errors", errors)
@@ -313,6 +340,7 @@ class NodeStats(threading.Thread):
self._default_info = {
"actorId": "",
"children": {},
"currentTaskFuncDesc": [],
"ipAddress": "",
"isDirectCall": False,
"jobId": "",
@@ -380,7 +408,7 @@ class NodeStats(threading.Thread):
"error_counts": self.calculate_error_counts(),
}
def get_actor_tree(self, workers_info, infeasible_tasks) -> Dict:
def get_actor_tree(self, workers_info_by_node, infeasible_tasks) -> Dict:
now = time.time()
# construct flattened actor tree
flattened_tree = {"root": {"children": {}}}
@@ -394,23 +422,28 @@ class NodeStats(threading.Thread):
self._addr_to_owner_addr[addr], "root")
child_to_parent[actor_id] = parent_id
for worker_info in workers_info:
if "coreWorkerStats" in worker_info:
core_worker_stats = worker_info["coreWorkerStats"]
addr = (core_worker_stats["ipAddress"],
str(core_worker_stats["port"]))
if addr in self._addr_to_actor_id:
actor_info = flattened_tree[self._addr_to_actor_id[
addr]]
if "currentTaskFuncDesc" in core_worker_stats:
core_worker_stats["currentTaskFuncDesc"] = list(
map(b64_decode,
core_worker_stats["currentTaskFuncDesc"]))
format_reply(core_worker_stats)
actor_info.update(core_worker_stats)
actor_info["averageTaskExecutionSpeed"] = round(
actor_info["numExecutedTasks"] /
(now - actor_info["timestamp"] / 1000), 2)
for node_id, workers_info in workers_info_by_node.items():
for worker_info in workers_info:
if "coreWorkerStats" in worker_info:
core_worker_stats = worker_info["coreWorkerStats"]
addr = (core_worker_stats["ipAddress"],
str(core_worker_stats["port"]))
if addr in self._addr_to_actor_id:
actor_info = flattened_tree[self._addr_to_actor_id[
addr]]
if "currentTaskFuncDesc" in core_worker_stats:
core_worker_stats[
"currentTaskFuncDesc"] = list(
map(
b64_decode, core_worker_stats[
"currentTaskFuncDesc"]))
format_reply_id(core_worker_stats)
actor_info.update(core_worker_stats)
actor_info["averageTaskExecutionSpeed"] = round(
actor_info["numExecutedTasks"] /
(now - actor_info["timestamp"] / 1000), 2)
actor_info["nodeId"] = node_id
actor_info["pid"] = worker_info["pid"]
for infeasible_task in infeasible_tasks:
actor_id = ray.utils.binary_to_hex(
@@ -423,7 +456,7 @@ class NodeStats(threading.Thread):
infeasible_task["state"] = -1
infeasible_task["functionDescriptor"] = list(
map(b64_decode, infeasible_task["functionDescriptor"]))
format_reply(infeasible_tasks)
format_reply_id(infeasible_tasks)
flattened_tree[actor_id] = infeasible_task
# construct actor tree
@@ -536,9 +569,13 @@ class RayletStats(threading.Thread):
self.nodes_lock = threading.Lock()
self.nodes = []
self.stubs = {}
self.reporter_stubs = {}
self.redis_client = ray.services.create_redis_client(
redis_address, password=redis_password)
self._raylet_stats_lock = threading.Lock()
self._raylet_stats = {}
self._profiling_stats = {}
self.update_nodes()
@@ -554,21 +591,71 @@ class RayletStats(threading.Thread):
if node_id not in node_ids:
stub = self.stubs.pop(node_id)
stub.close()
reporter_stub = self.reporter_stubs.pop(node_id)
reporter_stub.close()
# Now add node connections of new nodes.
for node in self.nodes:
node_id = node["NodeID"]
if node_id not in self.stubs:
node_ip = node["NodeManagerAddress"]
channel = grpc.insecure_channel("{}:{}".format(
node["NodeManagerAddress"], node["NodeManagerPort"]))
node_ip, node["NodeManagerPort"]))
stub = node_manager_pb2_grpc.NodeManagerServiceStub(
channel)
self.stubs[node_id] = stub
# Block wait until the reporter for the node starts.
while True:
reporter_port = self.redis_client.get(
"REPORTER_PORT:{}".format(node_ip))
if reporter_port:
break
reporter_channel = grpc.insecure_channel("{}:{}".format(
node_ip, int(reporter_port)))
reporter_stub = reporter_pb2_grpc.ReporterServiceStub(
reporter_channel)
self.reporter_stubs[node_id] = reporter_stub
assert len(self.stubs) == len(
self.reporter_stubs), (self.stubs.keys(),
self.reporter_stubs.keys())
def get_raylet_stats(self) -> Dict:
with self._raylet_stats_lock:
return copy.deepcopy(self._raylet_stats)
def launch_profiling(self, node_id, pid, duration):
profiling_id = str(uuid.uuid4())
def _callback(reply_future):
reply = reply_future.result()
with self._raylet_stats_lock:
self._profiling_stats[profiling_id] = reply
reporter_stub = self.reporter_stubs[node_id]
reply_future = reporter_stub.GetProfilingStats.future(
reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration))
reply_future.add_done_callback(_callback)
return profiling_id
def check_profiling_status(self, profiling_id):
with self._raylet_stats_lock:
is_present = profiling_id in self._profiling_stats
if is_present:
reply = self._profiling_stats[profiling_id]
if reply.stderr:
return {"status": "error", "error": reply.stderr}
else:
return {"status": "finished"}
else:
return {"status": "pending"}
def get_profiling_info(self, profiling_id):
with self._raylet_stats_lock:
profiling_stats = self._profiling_stats.get(profiling_id)
assert profiling_stats, "profiling not finished"
return json.loads(profiling_stats.profiling_stats)
def run(self):
counter = 0
while True:
@@ -579,10 +666,12 @@ class RayletStats(threading.Thread):
stub = self.stubs[node_id]
reply = stub.GetNodeStats(
node_manager_pb2.GetNodeStatsRequest(), timeout=2)
replies[node["NodeManagerAddress"]] = reply
reply_dict = MessageToDict(reply)
reply_dict["nodeId"] = node_id
replies[node["NodeManagerAddress"]] = reply_dict
with self._raylet_stats_lock:
for address, reply in replies.items():
self._raylet_stats[address] = MessageToDict(reply)
for address, reply_dict in replies.items():
self._raylet_stats[address] = reply_dict
counter += 1
# From time to time, check if new nodes have joined the cluster
# and update self.nodes
+1
View File
@@ -114,6 +114,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CTaskID GetCurrentTaskId()
const CActorID &GetActorId()
void SetWebuiDisplay(const c_string &message)
void SetActorTitle(const c_string &title)
CTaskID GetCallerId()
const ResourceMappingType &GetResourceIDs() const
CActorID DeserializeAndRegisterActorHandle(const c_string &bytes)
+38
View File
@@ -5,6 +5,9 @@ import os
import traceback
import time
import datetime
import grpc
import subprocess
from concurrent import futures
try:
import psutil
@@ -16,6 +19,8 @@ except ImportError:
import ray.ray_constants as ray_constants
import ray.services
import ray.utils
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
@@ -23,6 +28,31 @@ import ray.utils
logger = logging.getLogger(__name__)
class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer):
def __init__(self):
pass
def GetProfilingStats(self, request, context):
pid = request.pid
duration = request.duration
profiling_file_path = os.path.join("/tmp/ray/",
"{}_profiling.txt".format(pid))
process = subprocess.Popen(
"sudo $(which py-spy) record -o {} -p {} -d {} -f speedscope"
.format(profiling_file_path, pid, duration),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
profiling_stats = ""
else:
with open(profiling_file_path, "r") as f:
profiling_stats = f.read()
return reporter_pb2.GetProfilingStatsReply(
profiling_stats=profiling_stats, stdout=stdout, stderr=stderr)
def recursive_asdict(o):
if isinstance(o, tuple) and hasattr(o, "_asdict"):
return recursive_asdict(o._asdict())
@@ -157,6 +187,14 @@ class Reporter:
)
def run(self):
"""Publish the port."""
thread_pool = futures.ThreadPoolExecutor(max_workers=10)
server = grpc.server(thread_pool, options=(("grpc.so_reuseport", 0), ))
reporter_pb2_grpc.add_ReporterServiceServicer_to_server(
ReporterServer(), server)
port = server.add_insecure_port("[::]:0")
server.start()
self.redis_client.set("REPORTER_PORT:{}".format(self.ip), port)
"""Run the reporter."""
while True:
try:
+63 -1
View File
@@ -1,4 +1,5 @@
import os
import json
import grpc
import psutil
import requests
@@ -7,6 +8,8 @@ import time
import ray
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.test_utils import RayTestTimeoutException
@@ -138,7 +141,11 @@ def test_raylet_info_endpoint(shutdown_only):
def remote_store(self):
self.remote_storage = ray.put("test")
def getpid(self):
return os.getpid()
c = ActorC.remote()
actor_pid = ray.get(c.getpid.remote())
c.local_store.remote()
c.remote_store.remote()
@@ -168,7 +175,7 @@ def test_raylet_info_endpoint(shutdown_only):
"Timed out while waiting for dashboard to start.")
assert parent_actor_info["usedResources"]["CPU"] == 2
assert parent_actor_info["numExecutedTasks"] == 3
assert parent_actor_info["numExecutedTasks"] == 4
for _, child_actor_info in children.items():
if child_actor_info["state"] == -1:
assert child_actor_info["requiredResources"]["CustomResource"] == 1
@@ -177,6 +184,61 @@ def test_raylet_info_endpoint(shutdown_only):
assert len(child_actor_info["children"]) == 0
assert child_actor_info["usedResources"]["CPU"] == 1
profiling_id = requests.get(
webui_url + "/api/launch_profiling",
params={
"node_id": ray.nodes()[0]["NodeID"],
"pid": actor_pid,
"duration": 5
}).json()
start_time = time.time()
while True:
time.sleep(1)
try:
profiling_info = requests.get(
webui_url + "/api/check_profiling_status",
params={
"profiling_id": profiling_id,
}).json()
assert profiling_info["status"] in ("finished", "pending", "error")
break
except AssertionError:
if time.time() - start_time + 10:
raise Exception("Timed out while collecting profiling stats.")
def test_profiling_info_endpoint(shutdown_only):
ray.init(num_cpus=1)
redis_client = ray.worker.global_worker.redis_client
node_ip = ray.nodes()[0]["NodeManagerAddress"]
while True:
reporter_port = redis_client.get("REPORTER_PORT:{}".format(node_ip))
if reporter_port:
break
reporter_channel = grpc.insecure_channel("{}:{}".format(
node_ip, int(reporter_port)))
reporter_stub = reporter_pb2_grpc.ReporterServiceStub(reporter_channel)
@ray.remote(num_cpus=1)
class ActorA:
def __init__(self):
pass
def getpid(self):
return os.getpid()
a = ActorA.remote()
actor_pid = ray.get(a.getpid.remote())
reply = reporter_stub.GetProfilingStats(
reporter_pb2.GetProfilingStatsRequest(pid=actor_pid, duration=10))
profiling_stats = json.loads(reply.profiling_stats)
assert profiling_stats is not None
if __name__ == "__main__":
import pytest