mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 21:38:18 +08:00
Fix bugs in task timeline visualization. (#836)
* Fix bugs in task timeline visualization. * Some cleanups. * Remove print statements.
This commit is contained in:
committed by
Philipp Moritz
parent
a75ccc8032
commit
ca53e9ae7b
@@ -416,7 +416,6 @@ class GlobalState(object):
|
||||
list of profiling information for tasks where the events have
|
||||
no task ID.
|
||||
"""
|
||||
|
||||
task_info = dict()
|
||||
event_log_sets = self.redis_client.keys("event_log*")
|
||||
|
||||
@@ -550,7 +549,6 @@ class GlobalState(object):
|
||||
obj_dep: Boolean indicating whether or not object dependency edges
|
||||
should be included in the trace.
|
||||
"""
|
||||
|
||||
workers = self.workers()
|
||||
start_time = None
|
||||
for info in task_info.values():
|
||||
@@ -570,10 +568,20 @@ class GlobalState(object):
|
||||
|
||||
full_trace = []
|
||||
for task_id, info in task_info.items():
|
||||
# total_info is what is displayed when selecting a task in the
|
||||
# timeline.
|
||||
total_info = dict()
|
||||
total_info["task_id"] = task_id
|
||||
worker = workers[info["worker_id"]]
|
||||
task_t_info = task_table[task_id]
|
||||
|
||||
# The total_info dictionary is what is displayed when selecting a
|
||||
# task in the timeline. We copy the task spec so that we don't
|
||||
# modify it in place since we will use the original values later.
|
||||
total_info = copy.copy(task_table[task_id]["TaskSpec"])
|
||||
total_info["Args"] = [
|
||||
oid.hex() if isinstance(oid, ray.local_scheduler.ObjectID)
|
||||
else oid for oid in task_t_info["TaskSpec"]["Args"]]
|
||||
total_info["ReturnObjectIDs"] = [
|
||||
oid.hex() for oid
|
||||
in task_t_info["TaskSpec"]["ReturnObjectIDs"]]
|
||||
total_info["LocalSchedulerID"] = task_t_info["LocalSchedulerID"]
|
||||
total_info["get_arguments"] = (info["get_arguments_end"] -
|
||||
info["get_arguments_start"])
|
||||
total_info["execute"] = (info["execute_end"] -
|
||||
@@ -582,23 +590,12 @@ class GlobalState(object):
|
||||
info["store_outputs_start"])
|
||||
total_info["function_name"] = info["function_name"]
|
||||
total_info["worker_id"] = info["worker_id"]
|
||||
worker = workers[info["worker_id"]]
|
||||
task_t_info = task_table[task_id]
|
||||
task_spec = task_table[task_id]["TaskSpec"]
|
||||
task_spec["Args"] = [oid.hex() if isinstance(oid,
|
||||
ray.local_scheduler.ObjectID) else oid
|
||||
for oid in task_t_info["TaskSpec"]["Args"]]
|
||||
task_spec["ReturnObjectIDs"] = [oid.hex() for oid in
|
||||
(task_t_info["TaskSpec"]
|
||||
["ReturnObjectIDs"])]
|
||||
task_spec["LocalSchedulerID"] = task_t_info["LocalSchedulerID"]
|
||||
total_info = copy.copy(task_spec)
|
||||
|
||||
parent_info = task_info.get(
|
||||
task_table[task_id]["TaskSpec"]["ParentTaskID"])
|
||||
worker = workers[info["worker_id"]]
|
||||
# The catapult trace format documentation can be found here:
|
||||
# https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview # NOQA
|
||||
# https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview # noqa: E501
|
||||
if breakdowns:
|
||||
if "get_arguments_end" in info:
|
||||
get_args_trace = {
|
||||
@@ -735,55 +732,60 @@ class GlobalState(object):
|
||||
if obj_dep:
|
||||
args = task_table[task_id]["TaskSpec"]["Args"]
|
||||
for arg in args:
|
||||
# Don't visualize arguments that are not object IDs.
|
||||
if isinstance(arg, ray.local_scheduler.ObjectID):
|
||||
continue
|
||||
object_info = self._object_table(arg)
|
||||
if object_info["IsPut"]:
|
||||
continue
|
||||
if arg not in seen_obj:
|
||||
seen_obj[arg] = 0
|
||||
seen_obj[arg] += 1
|
||||
owner_task = self._object_table(arg)["TaskID"]
|
||||
owner_worker = (workers[task_profiles
|
||||
[owner_task]["worker_id"]])
|
||||
# Adding/subtracting 2 to the time associated with the
|
||||
# beginning/ending of the flow event is necessary to
|
||||
# make the flow events show up reliably. When these times
|
||||
# are exact, this is presumably an edge case, and catapult
|
||||
# doesn't recognize that there is a duration event at that
|
||||
# exact point in time that the flow event should be bound
|
||||
# to. This issue is solved by adding the 2 ms to the
|
||||
# start/end time of the flow event, which guarantees
|
||||
# overlap with the duration event that it's associated
|
||||
# with, and the flow event therefore always gets drawn.
|
||||
owner = {
|
||||
"cat": "obj_dependency",
|
||||
"pid": "Node " + owner_worker["node_ip_address"],
|
||||
"tid": task_profiles[owner_task]["worker_id"],
|
||||
"ts": micros_rel(task_profiles[owner_task]
|
||||
["store_outputs_end"]) - 2,
|
||||
"ph": "s",
|
||||
"name": "ObjectDependency",
|
||||
"args": {},
|
||||
"bp": "e",
|
||||
"cname": "cq_build_attempt_failed",
|
||||
"id": "obj" + str(arg) + str(seen_obj[arg])
|
||||
}
|
||||
full_trace.append(owner)
|
||||
object_info = self._object_table(arg)
|
||||
# Don't visualize objects that were created by calls to
|
||||
# put.
|
||||
if not object_info["IsPut"]:
|
||||
if arg not in seen_obj:
|
||||
seen_obj[arg] = 0
|
||||
seen_obj[arg] += 1
|
||||
owner_task = self._object_table(arg)["TaskID"]
|
||||
owner_worker = (workers[
|
||||
task_profiles[owner_task]["worker_id"]])
|
||||
# Adding/subtracting 2 to the time associated with
|
||||
# the beginning/ending of the flow event is
|
||||
# necessary to make the flow events show up
|
||||
# reliably. When these times are exact, this is
|
||||
# presumably an edge case, and catapult doesn't
|
||||
# recognize that there is a duration event at that
|
||||
# exact point in time that the flow event should be
|
||||
# bound to. This issue is solved by adding the 2 ms
|
||||
# to the start/end time of the flow event, which
|
||||
# guarantees overlap with the duration event that
|
||||
# it's associated with, and the flow event
|
||||
# therefore always gets drawn.
|
||||
owner = {
|
||||
"cat": "obj_dependency",
|
||||
"pid": ("Node " +
|
||||
owner_worker["node_ip_address"]),
|
||||
"tid": task_profiles[owner_task]["worker_id"],
|
||||
"ts": micros_rel(task_profiles[
|
||||
owner_task]["store_outputs_end"]) - 2,
|
||||
"ph": "s",
|
||||
"name": "ObjectDependency",
|
||||
"args": {},
|
||||
"bp": "e",
|
||||
"cname": "cq_build_attempt_failed",
|
||||
"id": "obj" + str(arg) + str(seen_obj[arg])
|
||||
}
|
||||
full_trace.append(owner)
|
||||
|
||||
dependent = {
|
||||
"cat": "obj_dependency",
|
||||
"pid": "Node " + worker["node_ip_address"],
|
||||
"tid": info["worker_id"],
|
||||
"ts": micros_rel(info["get_arguments_start"]) + 2,
|
||||
"ph": "f",
|
||||
"name": "ObjectDependency",
|
||||
"args": {},
|
||||
"cname": "cq_build_attempt_failed",
|
||||
"bp": "e",
|
||||
"id": "obj" + str(arg) + str(seen_obj[arg])
|
||||
}
|
||||
full_trace.append(dependent)
|
||||
dependent = {
|
||||
"cat": "obj_dependency",
|
||||
"pid": "Node " + worker["node_ip_address"],
|
||||
"tid": info["worker_id"],
|
||||
"ts": micros_rel(
|
||||
info["get_arguments_start"]) + 2,
|
||||
"ph": "f",
|
||||
"name": "ObjectDependency",
|
||||
"args": {},
|
||||
"cname": "cq_build_attempt_failed",
|
||||
"bp": "e",
|
||||
"id": "obj" + str(arg) + str(seen_obj[arg])
|
||||
}
|
||||
full_trace.append(dependent)
|
||||
|
||||
print("Creating JSON {}/{}".format(len(full_trace), len(task_info)))
|
||||
with open(path, "w") as outfile:
|
||||
|
||||
Reference in New Issue
Block a user