mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 16:49:48 +08:00
Chrome trace timeline with sliders. (#731)
* Trace timeline with sliders. * Trace. * Switched ujson to json. * Fixed tests. * linting fixes * Fixed bug. * Cleaned up code. * Fixes according to comments. * removed checkpoints. * Undid accidental delete. * Fixed linting error. * Added documentation to notebook. * Undid accidental deletes. * Add comments and small formatting fixes. * Small fix.
This commit is contained in:
committed by
Robert Nishihara
parent
420013774c
commit
2b3190ad13
+313
-21
@@ -1,45 +1,45 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Evaluate the box below to initialize the web UI."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import os\n",
|
||||
"import pprint\n",
|
||||
"import ipywidgets as widgets\n",
|
||||
"import os\n",
|
||||
"import ray\n",
|
||||
"import subprocess\n",
|
||||
"import tempfile\n",
|
||||
"\n",
|
||||
"from IPython.display import display\n",
|
||||
"\n",
|
||||
"ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])"
|
||||
"ray.init(redis_address=os.environ['REDIS_ADDRESS'])"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"source": [
|
||||
"**Evaluate the box below to search for objects.**"
|
||||
"#### Evaluate the box below to search for objects."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"object_search = widgets.Text(\n",
|
||||
" value=\"\",\n",
|
||||
" placeholder=\"Object ID\",\n",
|
||||
" description=\"Search for an object:\",\n",
|
||||
" value='',\n",
|
||||
" placeholder='Object ID',\n",
|
||||
" description='Search for an object:',\n",
|
||||
" disabled=False\n",
|
||||
")\n",
|
||||
"display(object_search)\n",
|
||||
@@ -55,7 +55,7 @@
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"**Evaluate the box below to search for tasks.**"
|
||||
"#### Evaluate the box below to search for tasks."
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -67,9 +67,9 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task_search = widgets.Text(\n",
|
||||
" value=\"\",\n",
|
||||
" placeholder=\"Task ID\",\n",
|
||||
" description=\"Search for a task:\",\n",
|
||||
" value='',\n",
|
||||
" placeholder='Task ID',\n",
|
||||
" description='Search for a task:',\n",
|
||||
" disabled=False\n",
|
||||
")\n",
|
||||
"display(task_search)\n",
|
||||
@@ -80,6 +80,298 @@
|
||||
"\n",
|
||||
"task_search.on_submit(handle_submit)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Instances of this class maintains keep track of whether or not a\n",
|
||||
"# callback is currently executing. Since the execution of the callback\n",
|
||||
"# may trigger more calls to the callback, this is used to prevent infinite\n",
|
||||
"# recursions.\n",
|
||||
"class _EventRecursionContextManager(object):\n",
|
||||
" def __init__(self):\n",
|
||||
" self.should_recurse = True\n",
|
||||
" \n",
|
||||
" def __enter__(self):\n",
|
||||
" self.should_recurse = False\n",
|
||||
" \n",
|
||||
" def __exit__(self, *args):\n",
|
||||
" self.should_recurse = True\n",
|
||||
"\n",
|
||||
"total_time_value = '% total time'\n",
|
||||
"total_tasks_value = '% total tasks'\n",
|
||||
"\n",
|
||||
"# Function that returns instances of sliders and handles associated events.\n",
|
||||
"def get_sliders(update):\n",
|
||||
" # Start_box value indicates the desired start point of queried window.\n",
|
||||
" start_box = widgets.FloatText(\n",
|
||||
" description='Start Time:',\n",
|
||||
" disabled=True,\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # End_box value indicates the desired end point of queried window.\n",
|
||||
" end_box = widgets.FloatText(\n",
|
||||
" description='End Time:',\n",
|
||||
" disabled=True,\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Percentage slider. Indicates either % of total time or total tasks\n",
|
||||
" # depending on what breakdown_opt is set to.\n",
|
||||
" range_slider = widgets.IntRangeSlider(\n",
|
||||
" value=[70, 100],\n",
|
||||
" min=0,\n",
|
||||
" max=100,\n",
|
||||
" step=1,\n",
|
||||
" description='%:',\n",
|
||||
" continuous_update=False,\n",
|
||||
" orientation='horizontal',\n",
|
||||
" readout=True,\n",
|
||||
" readout_format='.0i%',\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Indicates the number of tasks that the user wants to be returned. Is\n",
|
||||
" # disabled when the breakdown_opt value is set to total_time_value. \n",
|
||||
" num_tasks_box = widgets.IntText(\n",
|
||||
" description='Num Tasks:',\n",
|
||||
" disabled=False\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Dropdown bar that lets the user choose between modifying % of total\n",
|
||||
" # time or total number of tasks.\n",
|
||||
" breakdown_opt = widgets.Dropdown(\n",
|
||||
" options=[total_time_value, total_tasks_value],\n",
|
||||
" value=total_tasks_value,\n",
|
||||
" description='Selection Options:'\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Initially passed in to the update_wrapper function.\n",
|
||||
" INIT_EVENT = 'INIT'\n",
|
||||
" \n",
|
||||
" # Create instance of context manager to determine whether callback is\n",
|
||||
" # currently executing \n",
|
||||
" out_recursion = _EventRecursionContextManager()\n",
|
||||
" \n",
|
||||
" def update_wrapper(event):\n",
|
||||
" # Feature received a callback, but it shouldn't be executed\n",
|
||||
" # because the callback was the result of a different feature\n",
|
||||
" # executing its callback based on user input. \n",
|
||||
" if not out_recursion.should_recurse:\n",
|
||||
" return\n",
|
||||
" \n",
|
||||
" # Feature received a callback and it should be executed because\n",
|
||||
" # the callback was the result of user input. \n",
|
||||
" with out_recursion:\n",
|
||||
" smallest, largest, num_tasks = ray.global_state._job_length()\n",
|
||||
" diff = largest - smallest\n",
|
||||
" if num_tasks is not 0: \n",
|
||||
" \n",
|
||||
" # Describes the initial values that the slider/text box\n",
|
||||
" # values should be set to.\n",
|
||||
" if event == INIT_EVENT:\n",
|
||||
" if breakdown_opt.value == total_tasks_value:\n",
|
||||
" num_tasks_box.value = -min(10000, num_tasks)\n",
|
||||
" range_slider.value = (int(100 - (100. * -num_tasks_box.value) / num_tasks), 100)\n",
|
||||
" else:\n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" start_box.value = round(diff * low, 2)\n",
|
||||
" end_box.value = round(diff * high, 2)\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the start_box value. \n",
|
||||
" elif event['owner'] == start_box:\n",
|
||||
" if start_box.value > end_box.value:\n",
|
||||
" start_box.value = end_box.value\n",
|
||||
" elif start_box.value < 0:\n",
|
||||
" start_box.value = 0\n",
|
||||
" low, high = range_slider.value\n",
|
||||
" range_slider.value = (int((start_box.value * 100.) / diff), high)\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the end_box value. \n",
|
||||
" elif event['owner'] == end_box:\n",
|
||||
" if start_box.value > end_box.value:\n",
|
||||
" end_box.value = start_box.value\n",
|
||||
" elif end_box.value > diff:\n",
|
||||
" end_box.value = diff\n",
|
||||
" low, high = range_slider.value\n",
|
||||
" range_slider.value = (low, int((end_box.value * 100.) / diff))\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the breakdown options\n",
|
||||
" # toggle.\n",
|
||||
" elif event['owner'] == breakdown_opt:\n",
|
||||
" if breakdown_opt.value == total_tasks_value:\n",
|
||||
" start_box.disabled = True\n",
|
||||
" end_box.disabled = True\n",
|
||||
" num_tasks_box.disabled = False\n",
|
||||
" num_tasks_box.value = min(10000, num_tasks)\n",
|
||||
" range_slider.value = (int(100 - (100. * num_tasks_box.value) / num_tasks), 100)\n",
|
||||
" else:\n",
|
||||
" start_box.disabled = False\n",
|
||||
" end_box.disabled = False\n",
|
||||
" num_tasks_box.disabled = True\n",
|
||||
" range_slider.value = (int((start_box.value * 100.) / diff),\n",
|
||||
" int((end_box.value * 100.) / diff))\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the range_slider\n",
|
||||
" # value.\n",
|
||||
" elif event['owner'] == range_slider:\n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" if breakdown_opt.value == total_tasks_value:\n",
|
||||
" old_low, old_high = event['old']\n",
|
||||
" new_low, new_high = event['new']\n",
|
||||
" if old_low != new_low:\n",
|
||||
" range_slider.value = (new_low, 100)\n",
|
||||
" num_tasks_box.value = -(100. - new_low) / 100. * num_tasks\n",
|
||||
" else:\n",
|
||||
" range_slider.value = (0, new_high)\n",
|
||||
" num_tasks_box.value = new_high / 100. * num_tasks\n",
|
||||
" else:\n",
|
||||
" start_box.value = round(diff * low, 2)\n",
|
||||
" end_box.value = round(diff * high, 2)\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the num_tasks_box\n",
|
||||
" # value.\n",
|
||||
" elif event['owner'] == num_tasks_box:\n",
|
||||
" if num_tasks_box.value > 0:\n",
|
||||
" range_slider.value = (0, int(100 * float(num_tasks_box.value) / num_tasks))\n",
|
||||
" elif num_tasks_box.value < 0:\n",
|
||||
" range_slider.value = (100 + int(100 * float(num_tasks_box.value) / num_tasks), 100)\n",
|
||||
" \n",
|
||||
" # This should never happen. \n",
|
||||
" else:\n",
|
||||
" raise ValueError('Unknown event owner!')\n",
|
||||
"\n",
|
||||
" if not update:\n",
|
||||
" return\n",
|
||||
"\n",
|
||||
" diff = largest - smallest\n",
|
||||
" \n",
|
||||
" # Low and high are used to scale the times that are\n",
|
||||
" # queried to be relative to the absolute time. \n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" \n",
|
||||
" # Queries to task_profiles based on the slider and text\n",
|
||||
" # box values.\n",
|
||||
" # (Querying based on the % total amount of time.) \n",
|
||||
" if breakdown_opt.value == total_time_value:\n",
|
||||
" tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n",
|
||||
" \n",
|
||||
" # (Querying based on % of total number of tasks that were\n",
|
||||
" # run.)\n",
|
||||
" elif breakdown_opt.value == total_tasks_value:\n",
|
||||
" if range_slider.value[0] == 0:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n",
|
||||
" else:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n",
|
||||
" \n",
|
||||
" # This should never happen.\n",
|
||||
" else:\n",
|
||||
" raise ValueError('Value \"{}\" is not a legal breakdown value'.format(breakdown_opt.value))\n",
|
||||
"\n",
|
||||
" update(smallest, largest, num_tasks, tasks)\n",
|
||||
" \n",
|
||||
" # Get updated values from a slider or text box, and update the rest of\n",
|
||||
" # them accordingly.\n",
|
||||
" range_slider.observe(update_wrapper, names='value')\n",
|
||||
" breakdown_opt.observe(update_wrapper, names='value')\n",
|
||||
" start_box.observe(update_wrapper, names='value')\n",
|
||||
" end_box.observe(update_wrapper, names='value')\n",
|
||||
" num_tasks_box.observe(update_wrapper, names='value')\n",
|
||||
" \n",
|
||||
" # Initializes the sliders\n",
|
||||
" update_wrapper(INIT_EVENT)\n",
|
||||
" \n",
|
||||
" # Display sliders and search boxes \n",
|
||||
" display(start_box, end_box, range_slider, num_tasks_box, breakdown_opt)\n",
|
||||
" \n",
|
||||
" # Return the sliders and text boxes \n",
|
||||
" return start_box, end_box, range_slider, breakdown_opt"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def task_timeline():\n",
|
||||
" path_input = widgets.Button(description='View task timeline')\n",
|
||||
"\n",
|
||||
" breakdown_basic = 'Basic'\n",
|
||||
" breakdown_task = 'Task Breakdowns'\n",
|
||||
" \n",
|
||||
" breakdown_opt = widgets.Dropdown(\n",
|
||||
" options=['Basic', 'Task Breakdowns'],\n",
|
||||
" value='Basic',\n",
|
||||
" description='View options:',\n",
|
||||
" disabled=False,\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" start_box, end_box, range_slider, time_opt = get_sliders(False)\n",
|
||||
" display(breakdown_opt)\n",
|
||||
" display(path_input)\n",
|
||||
"\n",
|
||||
" def find_trace2html():\n",
|
||||
" trace2html = '/tmp/ray/catapult/tracing/bin/trace2html'\n",
|
||||
" # Clone the catapult repository if it doesn't exist. TODO(rkn): We\n",
|
||||
" # could do this in the build.sh script later on.\n",
|
||||
" if not os.path.exists(trace2html): \n",
|
||||
" cmd = ['git', 'clone', 'https://github.com/catapult-project/catapult.git', '/tmp/ray/catapult']\n",
|
||||
" subprocess.check_output(cmd) \n",
|
||||
" print('Cloning catapult to /tmp/ray/catapult.')\n",
|
||||
" assert os.path.exists(trace2html)\n",
|
||||
" return trace2html\n",
|
||||
"\n",
|
||||
" def handle_submit(sender):\n",
|
||||
" tmp = tempfile.mktemp() + '.json'\n",
|
||||
" tmp2 = tempfile.mktemp() + '.html'\n",
|
||||
" \n",
|
||||
" if breakdown_opt.value == breakdown_basic:\n",
|
||||
" breakdown = False\n",
|
||||
" elif breakdown_opt.value == breakdown_task:\n",
|
||||
" breakdown = True\n",
|
||||
" else:\n",
|
||||
" raise ValueError('Unexpected breakdown value \"{}\"'.format(breakdown_opt.value))\n",
|
||||
" \n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" \n",
|
||||
" smallest, largest, num_tasks = ray.global_state._job_length()\n",
|
||||
" diff = largest - smallest\n",
|
||||
"\n",
|
||||
" if time_opt.value == total_time_value:\n",
|
||||
" tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n",
|
||||
" elif time_opt.value == total_tasks_value:\n",
|
||||
" if range_slider.value[0] == 0:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n",
|
||||
" else:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n",
|
||||
" else:\n",
|
||||
" raise ValueError('Unexpected time value \"{}\"'.format(time_opt.value))\n",
|
||||
" \n",
|
||||
" print('{} tasks to trace'.format(len(tasks)))\n",
|
||||
" print('Dumping task profiling data to ' + tmp)\n",
|
||||
" ray.global_state.dump_catapult_trace(tmp, tasks, breakdowns=breakdown)\n",
|
||||
" print('Converting chrome trace to ' + tmp2)\n",
|
||||
" trace2html = find_trace2html()\n",
|
||||
" # TODO(rkn): The trace2html script currently requires Python 2.\n",
|
||||
" # Remove this dependency.\n",
|
||||
" subprocess.check_output(['python2', trace2html, tmp, '--output', tmp2])\n",
|
||||
" # Open the timeline in Chrome. TODO(rkn): We should remove the\n",
|
||||
" # dependency on Chrome and use whatever browser is currently being\n",
|
||||
" # used. Note that this currently does not work when Ray is being\n",
|
||||
" # used on a cluster and the browser is running locally.\n",
|
||||
" print('Opening html file in browser...')\n",
|
||||
" subprocess.Popen(['open', '-a', 'Google Chrome', tmp2])\n",
|
||||
"\n",
|
||||
" path_input.on_click(handle_submit)\n",
|
||||
"\n",
|
||||
"task_timeline()"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
|
||||
@@ -352,7 +352,7 @@ class GlobalState(object):
|
||||
|
||||
return ip_filename_file
|
||||
|
||||
def task_profiles(self, start=None, end=None, num=None):
|
||||
def task_profiles(self, start=None, end=None, num_tasks=None, fwd=True):
|
||||
"""Fetch and return a list of task profiles.
|
||||
|
||||
Args:
|
||||
@@ -360,7 +360,12 @@ class GlobalState(object):
|
||||
tasks.
|
||||
end: The end point in time of the time window that is queried for
|
||||
tasks.
|
||||
num: A limit on the number of tasks that task_profiles will return.
|
||||
num_tasks: A limit on the number of tasks that task_profiles will
|
||||
return.
|
||||
fwd: If True, means that zrange will be used. If False, zrevrange.
|
||||
This argument is only meaningful in conjunction with the
|
||||
num_tasks argument. This controls whether the tasks returned
|
||||
are the most recent or the least recent.
|
||||
|
||||
Returns:
|
||||
A tuple of two elements. The first element is a dictionary mapping
|
||||
@@ -369,10 +374,6 @@ class GlobalState(object):
|
||||
list of profiling information for tasks where the events have
|
||||
no task ID.
|
||||
"""
|
||||
if start is None:
|
||||
start = 0
|
||||
if num is None:
|
||||
num = sys.maxsize
|
||||
|
||||
task_info = dict()
|
||||
event_log_sets = self.redis_client.keys("event_log*")
|
||||
@@ -383,31 +384,69 @@ class GlobalState(object):
|
||||
# component of each task. Calling heappop will result in the taks with
|
||||
# the earliest "get_task_start" to be removed from the heap.
|
||||
|
||||
heap = []
|
||||
heapq.heapify(heap)
|
||||
heap_size = 0
|
||||
# Don't maintain the heap if we're not slicing some number
|
||||
if num_tasks is not None:
|
||||
heap = []
|
||||
heapq.heapify(heap)
|
||||
heap_size = 0
|
||||
|
||||
# Set up a param dict to pass the redis command
|
||||
params = {"withscores": True}
|
||||
if start is not None:
|
||||
params["min"] = start
|
||||
elif end is not None:
|
||||
params["min"] = 0
|
||||
|
||||
if end is not None:
|
||||
params["max"] = end
|
||||
elif start is not None:
|
||||
params["max"] = time.time()
|
||||
|
||||
if num_tasks is not None:
|
||||
if start is None and end is None:
|
||||
params["end"] = num_tasks - 1
|
||||
else:
|
||||
params["num"] = num_tasks
|
||||
params["start"] = 0
|
||||
|
||||
# Parse through event logs to determine task start and end points.
|
||||
for i in range(len(event_log_sets)):
|
||||
event_list = self.redis_client.zrangebyscore(event_log_sets[i],
|
||||
min=start,
|
||||
max=end,
|
||||
start=start,
|
||||
num=num)
|
||||
for event in event_list:
|
||||
for event_log_set in event_log_sets:
|
||||
if start is None and end is None:
|
||||
if fwd:
|
||||
event_list = self.redis_client.zrange(
|
||||
event_log_set,
|
||||
**params)
|
||||
else:
|
||||
event_list = self.redis_client.zrevrange(
|
||||
event_log_set,
|
||||
**params)
|
||||
else:
|
||||
if fwd:
|
||||
event_list = self.redis_client.zrangebyscore(
|
||||
event_log_set,
|
||||
**params)
|
||||
else:
|
||||
event_list = self.redis_client.zrevrangebyscore(
|
||||
event_log_set,
|
||||
**params)
|
||||
|
||||
for (event, score) in event_list:
|
||||
event_dict = json.loads(event)
|
||||
task_id = ""
|
||||
for event in event_dict:
|
||||
if "task_id" in event[3]:
|
||||
task_id = event[3]["task_id"]
|
||||
task_info[task_id] = dict()
|
||||
task_info[task_id]["score"] = score
|
||||
# Add task to (min/max) heap by its start point.
|
||||
# if fwd, we want to delete the largest elements, so -score
|
||||
if num_tasks is not None:
|
||||
heapq.heappush(heap, (-score if fwd else score, task_id))
|
||||
heap_size += 1
|
||||
|
||||
for event in event_dict:
|
||||
if event[1] == "ray:get_task" and event[2] == 1:
|
||||
task_info[task_id]["get_task_start"] = event[0]
|
||||
# Add task to min heap by its start point.
|
||||
heapq.heappush(heap,
|
||||
(task_info[task_id]["get_task_start"],
|
||||
task_id))
|
||||
heap_size += 1
|
||||
if event[1] == "ray:get_task" and event[2] == 2:
|
||||
task_info[task_id]["get_task_end"] = event[0]
|
||||
if (event[1] == "ray:import_remote_function" and
|
||||
@@ -437,13 +476,15 @@ class GlobalState(object):
|
||||
if "function_name" in event[3]:
|
||||
task_info[task_id]["function_name"] = (
|
||||
event[3]["function_name"])
|
||||
if heap_size > num:
|
||||
|
||||
if num_tasks is not None and heap_size > num_tasks:
|
||||
min_task, task_id_hex = heapq.heappop(heap)
|
||||
del task_info[task_id_hex]
|
||||
heap_size -= 1
|
||||
|
||||
return task_info
|
||||
|
||||
def dump_catapult_trace(self, path, start=None, end=None, num=None):
|
||||
def dump_catapult_trace(self, path, task_info, breakdowns=False):
|
||||
"""Dump task profiling information to a file.
|
||||
|
||||
This information can be viewed as a timeline of profiling information
|
||||
@@ -452,10 +493,11 @@ class GlobalState(object):
|
||||
|
||||
Args:
|
||||
path: The filepath to dump the profiling information to.
|
||||
task_info: The task info to use to generate the trace.
|
||||
breakdowns: Boolean indicating whether to break down the tasks into
|
||||
more fine-grained segments.
|
||||
"""
|
||||
if end is None:
|
||||
end = time.time()
|
||||
task_info = self.task_profiles(start=start, end=end, num=num)
|
||||
|
||||
workers = self.workers()
|
||||
start_time = None
|
||||
for info in task_info.values():
|
||||
@@ -464,66 +506,85 @@ class GlobalState(object):
|
||||
start_time = task_start
|
||||
|
||||
def micros(ts):
|
||||
return int(1e6 * (ts - start_time))
|
||||
return int(1e6 * ts)
|
||||
|
||||
def micros_rel(ts):
|
||||
return micros(ts - start_time)
|
||||
|
||||
full_trace = []
|
||||
for task_id, info in task_info.items():
|
||||
task_id_hex = ray.local_scheduler.ObjectID(hex_to_binary(task_id))
|
||||
task_data = self._task_table(task_id_hex)
|
||||
parent_info = task_info.get(task_data["TaskSpec"]["ParentTaskID"])
|
||||
times = self._get_times(info)
|
||||
delta_info = dict()
|
||||
delta_info["task_id"] = task_id
|
||||
delta_info["get_arguments"] = (info["get_arguments_end"] -
|
||||
info["get_arguments_start"])
|
||||
delta_info["execute"] = (info["execute_end"] -
|
||||
info["execute_start"])
|
||||
delta_info["store_outputs"] = (info["store_outputs_end"] -
|
||||
info["store_outputs_start"])
|
||||
delta_info["function_name"] = info["function_name"]
|
||||
delta_info["worker_id"] = info["worker_id"]
|
||||
worker = workers[info["worker_id"]]
|
||||
if parent_info:
|
||||
parent_worker = workers[parent_info["worker_id"]]
|
||||
parent_times = self._get_times(parent_info)
|
||||
parent_trace = {
|
||||
"cat": "submit_task",
|
||||
"pid": "Node " + str(parent_worker["node_ip_address"]),
|
||||
"tid": parent_info["worker_id"],
|
||||
"ts": micros(min(parent_times)),
|
||||
"ph": "s",
|
||||
"name": "SubmitTask",
|
||||
"args": {},
|
||||
"id": str(worker)
|
||||
if breakdowns:
|
||||
if "get_arguments_end" in info:
|
||||
get_args_trace = {
|
||||
"cat": "get_arguments",
|
||||
"pid": "Node " + str(worker["node_ip_address"]),
|
||||
"tid": info["worker_id"],
|
||||
"id": str(worker),
|
||||
"ts": micros_rel(info["get_arguments_start"]),
|
||||
"ph": "X",
|
||||
"name": info["function_name"] + ":get_arguments",
|
||||
"args": delta_info,
|
||||
"dur": micros(info["get_arguments_end"] -
|
||||
info["get_arguments_start"])
|
||||
}
|
||||
full_trace.append(get_args_trace)
|
||||
|
||||
if "store_outputs_end" in info:
|
||||
outputs_trace = {
|
||||
"cat": "store_outputs",
|
||||
"pid": "Node " + str(worker["node_ip_address"]),
|
||||
"tid": info["worker_id"],
|
||||
"id": str(worker),
|
||||
"ts": micros_rel(info["store_outputs_start"]),
|
||||
"ph": "X",
|
||||
"name": info["function_name"] + ":store_outputs",
|
||||
"args": delta_info,
|
||||
"dur": micros(info["store_outputs_end"] -
|
||||
info["store_outputs_start"])
|
||||
}
|
||||
full_trace.append(outputs_trace)
|
||||
|
||||
if "execute_end" in info:
|
||||
execute_trace = {
|
||||
"cat": "execute",
|
||||
"pid": "Node " + str(worker["node_ip_address"]),
|
||||
"tid": info["worker_id"],
|
||||
"id": str(worker),
|
||||
"ts": micros_rel(info["execute_start"]),
|
||||
"ph": "X",
|
||||
"name": info["function_name"] + ":execute",
|
||||
"args": delta_info,
|
||||
"dur": micros(info["execute_end"] -
|
||||
info["execute_start"])
|
||||
}
|
||||
full_trace.append(execute_trace)
|
||||
else:
|
||||
task = {
|
||||
"cat": "task",
|
||||
"pid": "Node " + str(worker["node_ip_address"]),
|
||||
"tid": info["worker_id"],
|
||||
"id": str(worker),
|
||||
"ts": micros_rel(info["get_arguments_start"]),
|
||||
"ph": "X",
|
||||
"name": info["function_name"],
|
||||
"args": delta_info,
|
||||
"dur": micros(info["store_outputs_end"] -
|
||||
info["get_arguments_start"])
|
||||
}
|
||||
full_trace.append(parent_trace)
|
||||
|
||||
parent = {
|
||||
"cat": "submit_task",
|
||||
"pid": "Node " + str(parent_worker["node_ip_address"]),
|
||||
"tid": parent_info["worker_id"],
|
||||
"ts": micros(min(parent_times)),
|
||||
"ph": "s",
|
||||
"name": "SubmitTask",
|
||||
"args": {},
|
||||
"id": str(worker)
|
||||
}
|
||||
full_trace.append(parent)
|
||||
|
||||
task_trace = {
|
||||
"cat": "submit_task",
|
||||
"pid": "Node " + str(worker["node_ip_address"]),
|
||||
"tid": info["worker_id"],
|
||||
"ts": micros(min(times)),
|
||||
"ph": "f",
|
||||
"name": "SubmitTask",
|
||||
"args": {},
|
||||
"id": str(worker)
|
||||
}
|
||||
full_trace.append(task_trace)
|
||||
|
||||
task = {
|
||||
"name": info["function_name"],
|
||||
"cat": "ray_task",
|
||||
"ph": "X",
|
||||
"ts": micros(min(times)),
|
||||
"dur": micros(max(times)) - micros(min(times)),
|
||||
"pid": "Node " + str(worker["node_ip_address"]),
|
||||
"tid": info["worker_id"],
|
||||
"args": info
|
||||
}
|
||||
full_trace.append(task)
|
||||
full_trace.append(task)
|
||||
|
||||
print("dumping {}/{}".format(len(full_trace), len(task_info)))
|
||||
with open(path, "w") as outfile:
|
||||
json.dump(full_trace, outfile)
|
||||
|
||||
@@ -557,10 +618,11 @@ class GlobalState(object):
|
||||
worker_id = binary_to_hex(worker_key[len("Workers:"):])
|
||||
|
||||
workers_data[worker_id] = {
|
||||
"local_scheduler_socket": (
|
||||
worker_info[b"local_scheduler_socket"].decode("ascii")),
|
||||
"node_ip_address": (
|
||||
worker_info[b"node_ip_address"].decode("ascii")),
|
||||
"local_scheduler_socket":
|
||||
(worker_info[b"local_scheduler_socket"]
|
||||
.decode("ascii")),
|
||||
"node_ip_address": (worker_info[b"node_ip_address"]
|
||||
.decode("ascii")),
|
||||
"plasma_manager_socket": (worker_info[b"plasma_manager_socket"]
|
||||
.decode("ascii")),
|
||||
"plasma_store_socket": (worker_info[b"plasma_store_socket"]
|
||||
@@ -569,3 +631,28 @@ class GlobalState(object):
|
||||
"stdout_file": worker_info[b"stdout_file"].decode("ascii")
|
||||
}
|
||||
return workers_data
|
||||
|
||||
def _job_length(self):
|
||||
event_log_sets = self.redis_client.keys("event_log*")
|
||||
overall_smallest = sys.maxsize
|
||||
overall_largest = 0
|
||||
num_tasks = 0
|
||||
for event_log_set in event_log_sets:
|
||||
fwd_range = self.redis_client.zrange(event_log_set,
|
||||
start=0,
|
||||
end=0,
|
||||
withscores=True)
|
||||
overall_smallest = min(overall_smallest, fwd_range[0][1])
|
||||
|
||||
rev_range = self.redis_client.zrevrange(event_log_set,
|
||||
start=0,
|
||||
end=0,
|
||||
withscores=True)
|
||||
overall_largest = max(overall_largest, rev_range[0][1])
|
||||
|
||||
num_tasks += self.redis_client.zcount(event_log_set,
|
||||
min=0,
|
||||
max=time.time())
|
||||
if num_tasks is 0:
|
||||
return 0, 0, 0
|
||||
return overall_smallest, overall_largest, num_tasks
|
||||
|
||||
Reference in New Issue
Block a user