diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 3eb5362e6..63b8ac5b4 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -416,7 +416,6 @@ class GlobalState(object): list of profiling information for tasks where the events have no task ID. """ - task_info = dict() event_log_sets = self.redis_client.keys("event_log*") @@ -550,7 +549,6 @@ class GlobalState(object): obj_dep: Boolean indicating whether or not object dependency edges should be included in the trace. """ - workers = self.workers() start_time = None for info in task_info.values(): @@ -570,10 +568,20 @@ class GlobalState(object): full_trace = [] for task_id, info in task_info.items(): - # total_info is what is displayed when selecting a task in the - # timeline. - total_info = dict() - total_info["task_id"] = task_id + worker = workers[info["worker_id"]] + task_t_info = task_table[task_id] + + # The total_info dictionary is what is displayed when selecting a + # task in the timeline. We copy the task spec so that we don't + # modify it in place since we will use the original values later. + total_info = copy.copy(task_table[task_id]["TaskSpec"]) + total_info["Args"] = [ + oid.hex() if isinstance(oid, ray.local_scheduler.ObjectID) + else oid for oid in task_t_info["TaskSpec"]["Args"]] + total_info["ReturnObjectIDs"] = [ + oid.hex() for oid + in task_t_info["TaskSpec"]["ReturnObjectIDs"]] + total_info["LocalSchedulerID"] = task_t_info["LocalSchedulerID"] total_info["get_arguments"] = (info["get_arguments_end"] - info["get_arguments_start"]) total_info["execute"] = (info["execute_end"] - @@ -582,23 +590,12 @@ class GlobalState(object): info["store_outputs_start"]) total_info["function_name"] = info["function_name"] total_info["worker_id"] = info["worker_id"] - worker = workers[info["worker_id"]] - task_t_info = task_table[task_id] - task_spec = task_table[task_id]["TaskSpec"] - task_spec["Args"] = [oid.hex() if isinstance(oid, - ray.local_scheduler.ObjectID) else oid - for oid in task_t_info["TaskSpec"]["Args"]] - task_spec["ReturnObjectIDs"] = [oid.hex() for oid in - (task_t_info["TaskSpec"] - ["ReturnObjectIDs"])] - task_spec["LocalSchedulerID"] = task_t_info["LocalSchedulerID"] - total_info = copy.copy(task_spec) parent_info = task_info.get( task_table[task_id]["TaskSpec"]["ParentTaskID"]) worker = workers[info["worker_id"]] # The catapult trace format documentation can be found here: - # https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview # NOQA + # https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview # noqa: E501 if breakdowns: if "get_arguments_end" in info: get_args_trace = { @@ -735,55 +732,60 @@ class GlobalState(object): if obj_dep: args = task_table[task_id]["TaskSpec"]["Args"] for arg in args: + # Don't visualize arguments that are not object IDs. if isinstance(arg, ray.local_scheduler.ObjectID): - continue - object_info = self._object_table(arg) - if object_info["IsPut"]: - continue - if arg not in seen_obj: - seen_obj[arg] = 0 - seen_obj[arg] += 1 - owner_task = self._object_table(arg)["TaskID"] - owner_worker = (workers[task_profiles - [owner_task]["worker_id"]]) - # Adding/subtracting 2 to the time associated with the - # beginning/ending of the flow event is necessary to - # make the flow events show up reliably. When these times - # are exact, this is presumably an edge case, and catapult - # doesn't recognize that there is a duration event at that - # exact point in time that the flow event should be bound - # to. This issue is solved by adding the 2 ms to the - # start/end time of the flow event, which guarantees - # overlap with the duration event that it's associated - # with, and the flow event therefore always gets drawn. - owner = { - "cat": "obj_dependency", - "pid": "Node " + owner_worker["node_ip_address"], - "tid": task_profiles[owner_task]["worker_id"], - "ts": micros_rel(task_profiles[owner_task] - ["store_outputs_end"]) - 2, - "ph": "s", - "name": "ObjectDependency", - "args": {}, - "bp": "e", - "cname": "cq_build_attempt_failed", - "id": "obj" + str(arg) + str(seen_obj[arg]) - } - full_trace.append(owner) + object_info = self._object_table(arg) + # Don't visualize objects that were created by calls to + # put. + if not object_info["IsPut"]: + if arg not in seen_obj: + seen_obj[arg] = 0 + seen_obj[arg] += 1 + owner_task = self._object_table(arg)["TaskID"] + owner_worker = (workers[ + task_profiles[owner_task]["worker_id"]]) + # Adding/subtracting 2 to the time associated with + # the beginning/ending of the flow event is + # necessary to make the flow events show up + # reliably. When these times are exact, this is + # presumably an edge case, and catapult doesn't + # recognize that there is a duration event at that + # exact point in time that the flow event should be + # bound to. This issue is solved by adding the 2 ms + # to the start/end time of the flow event, which + # guarantees overlap with the duration event that + # it's associated with, and the flow event + # therefore always gets drawn. + owner = { + "cat": "obj_dependency", + "pid": ("Node " + + owner_worker["node_ip_address"]), + "tid": task_profiles[owner_task]["worker_id"], + "ts": micros_rel(task_profiles[ + owner_task]["store_outputs_end"]) - 2, + "ph": "s", + "name": "ObjectDependency", + "args": {}, + "bp": "e", + "cname": "cq_build_attempt_failed", + "id": "obj" + str(arg) + str(seen_obj[arg]) + } + full_trace.append(owner) - dependent = { - "cat": "obj_dependency", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "ts": micros_rel(info["get_arguments_start"]) + 2, - "ph": "f", - "name": "ObjectDependency", - "args": {}, - "cname": "cq_build_attempt_failed", - "bp": "e", - "id": "obj" + str(arg) + str(seen_obj[arg]) - } - full_trace.append(dependent) + dependent = { + "cat": "obj_dependency", + "pid": "Node " + worker["node_ip_address"], + "tid": info["worker_id"], + "ts": micros_rel( + info["get_arguments_start"]) + 2, + "ph": "f", + "name": "ObjectDependency", + "args": {}, + "cname": "cq_build_attempt_failed", + "bp": "e", + "id": "obj" + str(arg) + str(seen_obj[arg]) + } + full_trace.append(dependent) print("Creating JSON {}/{}".format(len(full_trace), len(task_info))) with open(path, "w") as outfile: