diff --git a/python/ray/WebUI.ipynb b/python/ray/WebUI.ipynb index 011d3ce66..3f89fbab7 100644 --- a/python/ray/WebUI.ipynb +++ b/python/ray/WebUI.ipynb @@ -1,45 +1,45 @@ { "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Evaluate the box below to initialize the web UI." - ] - }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ - "import os\n", - "import pprint\n", "import ipywidgets as widgets\n", + "import os\n", "import ray\n", + "import subprocess\n", + "import tempfile\n", "\n", "from IPython.display import display\n", "\n", - "ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])" + "ray.init(redis_address=os.environ['REDIS_ADDRESS'])" ] }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "collapsed": true + }, "source": [ - "**Evaluate the box below to search for objects.**" + "#### Evaluate the box below to search for objects." ] }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "object_search = widgets.Text(\n", - " value=\"\",\n", - " placeholder=\"Object ID\",\n", - " description=\"Search for an object:\",\n", + " value='',\n", + " placeholder='Object ID',\n", + " description='Search for an object:',\n", " disabled=False\n", ")\n", "display(object_search)\n", @@ -55,7 +55,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "**Evaluate the box below to search for tasks.**" + "#### Evaluate the box below to search for tasks." ] }, { @@ -67,9 +67,9 @@ "outputs": [], "source": [ "task_search = widgets.Text(\n", - " value=\"\",\n", - " placeholder=\"Task ID\",\n", - " description=\"Search for a task:\",\n", + " value='',\n", + " placeholder='Task ID',\n", + " description='Search for a task:',\n", " disabled=False\n", ")\n", "display(task_search)\n", @@ -80,6 +80,298 @@ "\n", "task_search.on_submit(handle_submit)" ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Instances of this class maintains keep track of whether or not a\n", + "# callback is currently executing. Since the execution of the callback\n", + "# may trigger more calls to the callback, this is used to prevent infinite\n", + "# recursions.\n", + "class _EventRecursionContextManager(object):\n", + " def __init__(self):\n", + " self.should_recurse = True\n", + " \n", + " def __enter__(self):\n", + " self.should_recurse = False\n", + " \n", + " def __exit__(self, *args):\n", + " self.should_recurse = True\n", + "\n", + "total_time_value = '% total time'\n", + "total_tasks_value = '% total tasks'\n", + "\n", + "# Function that returns instances of sliders and handles associated events.\n", + "def get_sliders(update):\n", + " # Start_box value indicates the desired start point of queried window.\n", + " start_box = widgets.FloatText(\n", + " description='Start Time:',\n", + " disabled=True,\n", + " )\n", + " \n", + " # End_box value indicates the desired end point of queried window.\n", + " end_box = widgets.FloatText(\n", + " description='End Time:',\n", + " disabled=True,\n", + " )\n", + " \n", + " # Percentage slider. Indicates either % of total time or total tasks\n", + " # depending on what breakdown_opt is set to.\n", + " range_slider = widgets.IntRangeSlider(\n", + " value=[70, 100],\n", + " min=0,\n", + " max=100,\n", + " step=1,\n", + " description='%:',\n", + " continuous_update=False,\n", + " orientation='horizontal',\n", + " readout=True,\n", + " readout_format='.0i%',\n", + " )\n", + " \n", + " # Indicates the number of tasks that the user wants to be returned. Is\n", + " # disabled when the breakdown_opt value is set to total_time_value. \n", + " num_tasks_box = widgets.IntText(\n", + " description='Num Tasks:',\n", + " disabled=False\n", + " )\n", + " \n", + " # Dropdown bar that lets the user choose between modifying % of total\n", + " # time or total number of tasks.\n", + " breakdown_opt = widgets.Dropdown(\n", + " options=[total_time_value, total_tasks_value],\n", + " value=total_tasks_value,\n", + " description='Selection Options:'\n", + " )\n", + " \n", + " # Initially passed in to the update_wrapper function.\n", + " INIT_EVENT = 'INIT'\n", + " \n", + " # Create instance of context manager to determine whether callback is\n", + " # currently executing \n", + " out_recursion = _EventRecursionContextManager()\n", + " \n", + " def update_wrapper(event):\n", + " # Feature received a callback, but it shouldn't be executed\n", + " # because the callback was the result of a different feature\n", + " # executing its callback based on user input. \n", + " if not out_recursion.should_recurse:\n", + " return\n", + " \n", + " # Feature received a callback and it should be executed because\n", + " # the callback was the result of user input. \n", + " with out_recursion:\n", + " smallest, largest, num_tasks = ray.global_state._job_length()\n", + " diff = largest - smallest\n", + " if num_tasks is not 0: \n", + " \n", + " # Describes the initial values that the slider/text box\n", + " # values should be set to.\n", + " if event == INIT_EVENT:\n", + " if breakdown_opt.value == total_tasks_value:\n", + " num_tasks_box.value = -min(10000, num_tasks)\n", + " range_slider.value = (int(100 - (100. * -num_tasks_box.value) / num_tasks), 100)\n", + " else:\n", + " low, high = map(lambda x: x / 100., range_slider.value)\n", + " start_box.value = round(diff * low, 2)\n", + " end_box.value = round(diff * high, 2)\n", + " \n", + " # Event was triggered by a change in the start_box value. \n", + " elif event['owner'] == start_box:\n", + " if start_box.value > end_box.value:\n", + " start_box.value = end_box.value\n", + " elif start_box.value < 0:\n", + " start_box.value = 0\n", + " low, high = range_slider.value\n", + " range_slider.value = (int((start_box.value * 100.) / diff), high)\n", + " \n", + " # Event was triggered by a change in the end_box value. \n", + " elif event['owner'] == end_box:\n", + " if start_box.value > end_box.value:\n", + " end_box.value = start_box.value\n", + " elif end_box.value > diff:\n", + " end_box.value = diff\n", + " low, high = range_slider.value\n", + " range_slider.value = (low, int((end_box.value * 100.) / diff))\n", + " \n", + " # Event was triggered by a change in the breakdown options\n", + " # toggle.\n", + " elif event['owner'] == breakdown_opt:\n", + " if breakdown_opt.value == total_tasks_value:\n", + " start_box.disabled = True\n", + " end_box.disabled = True\n", + " num_tasks_box.disabled = False\n", + " num_tasks_box.value = min(10000, num_tasks)\n", + " range_slider.value = (int(100 - (100. * num_tasks_box.value) / num_tasks), 100)\n", + " else:\n", + " start_box.disabled = False\n", + " end_box.disabled = False\n", + " num_tasks_box.disabled = True\n", + " range_slider.value = (int((start_box.value * 100.) / diff),\n", + " int((end_box.value * 100.) / diff))\n", + " \n", + " # Event was triggered by a change in the range_slider\n", + " # value.\n", + " elif event['owner'] == range_slider:\n", + " low, high = map(lambda x: x / 100., range_slider.value)\n", + " if breakdown_opt.value == total_tasks_value:\n", + " old_low, old_high = event['old']\n", + " new_low, new_high = event['new']\n", + " if old_low != new_low:\n", + " range_slider.value = (new_low, 100)\n", + " num_tasks_box.value = -(100. - new_low) / 100. * num_tasks\n", + " else:\n", + " range_slider.value = (0, new_high)\n", + " num_tasks_box.value = new_high / 100. * num_tasks\n", + " else:\n", + " start_box.value = round(diff * low, 2)\n", + " end_box.value = round(diff * high, 2)\n", + " \n", + " # Event was triggered by a change in the num_tasks_box\n", + " # value.\n", + " elif event['owner'] == num_tasks_box:\n", + " if num_tasks_box.value > 0:\n", + " range_slider.value = (0, int(100 * float(num_tasks_box.value) / num_tasks))\n", + " elif num_tasks_box.value < 0:\n", + " range_slider.value = (100 + int(100 * float(num_tasks_box.value) / num_tasks), 100)\n", + " \n", + " # This should never happen. \n", + " else:\n", + " raise ValueError('Unknown event owner!')\n", + "\n", + " if not update:\n", + " return\n", + "\n", + " diff = largest - smallest\n", + " \n", + " # Low and high are used to scale the times that are\n", + " # queried to be relative to the absolute time. \n", + " low, high = map(lambda x: x / 100., range_slider.value)\n", + " \n", + " # Queries to task_profiles based on the slider and text\n", + " # box values.\n", + " # (Querying based on the % total amount of time.) \n", + " if breakdown_opt.value == total_time_value:\n", + " tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n", + " \n", + " # (Querying based on % of total number of tasks that were\n", + " # run.)\n", + " elif breakdown_opt.value == total_tasks_value:\n", + " if range_slider.value[0] == 0:\n", + " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n", + " else:\n", + " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n", + " \n", + " # This should never happen.\n", + " else:\n", + " raise ValueError('Value \"{}\" is not a legal breakdown value'.format(breakdown_opt.value))\n", + "\n", + " update(smallest, largest, num_tasks, tasks)\n", + " \n", + " # Get updated values from a slider or text box, and update the rest of\n", + " # them accordingly.\n", + " range_slider.observe(update_wrapper, names='value')\n", + " breakdown_opt.observe(update_wrapper, names='value')\n", + " start_box.observe(update_wrapper, names='value')\n", + " end_box.observe(update_wrapper, names='value')\n", + " num_tasks_box.observe(update_wrapper, names='value')\n", + " \n", + " # Initializes the sliders\n", + " update_wrapper(INIT_EVENT)\n", + " \n", + " # Display sliders and search boxes \n", + " display(start_box, end_box, range_slider, num_tasks_box, breakdown_opt)\n", + " \n", + " # Return the sliders and text boxes \n", + " return start_box, end_box, range_slider, breakdown_opt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "def task_timeline():\n", + " path_input = widgets.Button(description='View task timeline')\n", + "\n", + " breakdown_basic = 'Basic'\n", + " breakdown_task = 'Task Breakdowns'\n", + " \n", + " breakdown_opt = widgets.Dropdown(\n", + " options=['Basic', 'Task Breakdowns'],\n", + " value='Basic',\n", + " description='View options:',\n", + " disabled=False,\n", + " )\n", + " \n", + " start_box, end_box, range_slider, time_opt = get_sliders(False)\n", + " display(breakdown_opt)\n", + " display(path_input)\n", + "\n", + " def find_trace2html():\n", + " trace2html = '/tmp/ray/catapult/tracing/bin/trace2html'\n", + " # Clone the catapult repository if it doesn't exist. TODO(rkn): We\n", + " # could do this in the build.sh script later on.\n", + " if not os.path.exists(trace2html): \n", + " cmd = ['git', 'clone', 'https://github.com/catapult-project/catapult.git', '/tmp/ray/catapult']\n", + " subprocess.check_output(cmd) \n", + " print('Cloning catapult to /tmp/ray/catapult.')\n", + " assert os.path.exists(trace2html)\n", + " return trace2html\n", + "\n", + " def handle_submit(sender):\n", + " tmp = tempfile.mktemp() + '.json'\n", + " tmp2 = tempfile.mktemp() + '.html'\n", + " \n", + " if breakdown_opt.value == breakdown_basic:\n", + " breakdown = False\n", + " elif breakdown_opt.value == breakdown_task:\n", + " breakdown = True\n", + " else:\n", + " raise ValueError('Unexpected breakdown value \"{}\"'.format(breakdown_opt.value))\n", + " \n", + " low, high = map(lambda x: x / 100., range_slider.value)\n", + " \n", + " smallest, largest, num_tasks = ray.global_state._job_length()\n", + " diff = largest - smallest\n", + "\n", + " if time_opt.value == total_time_value:\n", + " tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n", + " elif time_opt.value == total_tasks_value:\n", + " if range_slider.value[0] == 0:\n", + " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n", + " else:\n", + " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n", + " else:\n", + " raise ValueError('Unexpected time value \"{}\"'.format(time_opt.value))\n", + " \n", + " print('{} tasks to trace'.format(len(tasks)))\n", + " print('Dumping task profiling data to ' + tmp)\n", + " ray.global_state.dump_catapult_trace(tmp, tasks, breakdowns=breakdown)\n", + " print('Converting chrome trace to ' + tmp2)\n", + " trace2html = find_trace2html()\n", + " # TODO(rkn): The trace2html script currently requires Python 2.\n", + " # Remove this dependency.\n", + " subprocess.check_output(['python2', trace2html, tmp, '--output', tmp2])\n", + " # Open the timeline in Chrome. TODO(rkn): We should remove the\n", + " # dependency on Chrome and use whatever browser is currently being\n", + " # used. Note that this currently does not work when Ray is being\n", + " # used on a cluster and the browser is running locally.\n", + " print('Opening html file in browser...')\n", + " subprocess.Popen(['open', '-a', 'Google Chrome', tmp2])\n", + "\n", + " path_input.on_click(handle_submit)\n", + "\n", + "task_timeline()" + ] } ], "metadata": { diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index e9fd9a804..522226951 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -352,7 +352,7 @@ class GlobalState(object): return ip_filename_file - def task_profiles(self, start=None, end=None, num=None): + def task_profiles(self, start=None, end=None, num_tasks=None, fwd=True): """Fetch and return a list of task profiles. Args: @@ -360,7 +360,12 @@ class GlobalState(object): tasks. end: The end point in time of the time window that is queried for tasks. - num: A limit on the number of tasks that task_profiles will return. + num_tasks: A limit on the number of tasks that task_profiles will + return. + fwd: If True, means that zrange will be used. If False, zrevrange. + This argument is only meaningful in conjunction with the + num_tasks argument. This controls whether the tasks returned + are the most recent or the least recent. Returns: A tuple of two elements. The first element is a dictionary mapping @@ -369,10 +374,6 @@ class GlobalState(object): list of profiling information for tasks where the events have no task ID. """ - if start is None: - start = 0 - if num is None: - num = sys.maxsize task_info = dict() event_log_sets = self.redis_client.keys("event_log*") @@ -383,31 +384,69 @@ class GlobalState(object): # component of each task. Calling heappop will result in the taks with # the earliest "get_task_start" to be removed from the heap. - heap = [] - heapq.heapify(heap) - heap_size = 0 + # Don't maintain the heap if we're not slicing some number + if num_tasks is not None: + heap = [] + heapq.heapify(heap) + heap_size = 0 + + # Set up a param dict to pass the redis command + params = {"withscores": True} + if start is not None: + params["min"] = start + elif end is not None: + params["min"] = 0 + + if end is not None: + params["max"] = end + elif start is not None: + params["max"] = time.time() + + if num_tasks is not None: + if start is None and end is None: + params["end"] = num_tasks - 1 + else: + params["num"] = num_tasks + params["start"] = 0 + # Parse through event logs to determine task start and end points. - for i in range(len(event_log_sets)): - event_list = self.redis_client.zrangebyscore(event_log_sets[i], - min=start, - max=end, - start=start, - num=num) - for event in event_list: + for event_log_set in event_log_sets: + if start is None and end is None: + if fwd: + event_list = self.redis_client.zrange( + event_log_set, + **params) + else: + event_list = self.redis_client.zrevrange( + event_log_set, + **params) + else: + if fwd: + event_list = self.redis_client.zrangebyscore( + event_log_set, + **params) + else: + event_list = self.redis_client.zrevrangebyscore( + event_log_set, + **params) + + for (event, score) in event_list: event_dict = json.loads(event) task_id = "" for event in event_dict: if "task_id" in event[3]: task_id = event[3]["task_id"] task_info[task_id] = dict() + task_info[task_id]["score"] = score + # Add task to (min/max) heap by its start point. + # if fwd, we want to delete the largest elements, so -score + if num_tasks is not None: + heapq.heappush(heap, (-score if fwd else score, task_id)) + heap_size += 1 + for event in event_dict: if event[1] == "ray:get_task" and event[2] == 1: task_info[task_id]["get_task_start"] = event[0] - # Add task to min heap by its start point. - heapq.heappush(heap, - (task_info[task_id]["get_task_start"], - task_id)) - heap_size += 1 if event[1] == "ray:get_task" and event[2] == 2: task_info[task_id]["get_task_end"] = event[0] if (event[1] == "ray:import_remote_function" and @@ -437,13 +476,15 @@ class GlobalState(object): if "function_name" in event[3]: task_info[task_id]["function_name"] = ( event[3]["function_name"]) - if heap_size > num: + + if num_tasks is not None and heap_size > num_tasks: min_task, task_id_hex = heapq.heappop(heap) del task_info[task_id_hex] heap_size -= 1 + return task_info - def dump_catapult_trace(self, path, start=None, end=None, num=None): + def dump_catapult_trace(self, path, task_info, breakdowns=False): """Dump task profiling information to a file. This information can be viewed as a timeline of profiling information @@ -452,10 +493,11 @@ class GlobalState(object): Args: path: The filepath to dump the profiling information to. + task_info: The task info to use to generate the trace. + breakdowns: Boolean indicating whether to break down the tasks into + more fine-grained segments. """ - if end is None: - end = time.time() - task_info = self.task_profiles(start=start, end=end, num=num) + workers = self.workers() start_time = None for info in task_info.values(): @@ -464,66 +506,85 @@ class GlobalState(object): start_time = task_start def micros(ts): - return int(1e6 * (ts - start_time)) + return int(1e6 * ts) + + def micros_rel(ts): + return micros(ts - start_time) full_trace = [] for task_id, info in task_info.items(): - task_id_hex = ray.local_scheduler.ObjectID(hex_to_binary(task_id)) - task_data = self._task_table(task_id_hex) - parent_info = task_info.get(task_data["TaskSpec"]["ParentTaskID"]) - times = self._get_times(info) + delta_info = dict() + delta_info["task_id"] = task_id + delta_info["get_arguments"] = (info["get_arguments_end"] - + info["get_arguments_start"]) + delta_info["execute"] = (info["execute_end"] - + info["execute_start"]) + delta_info["store_outputs"] = (info["store_outputs_end"] - + info["store_outputs_start"]) + delta_info["function_name"] = info["function_name"] + delta_info["worker_id"] = info["worker_id"] worker = workers[info["worker_id"]] - if parent_info: - parent_worker = workers[parent_info["worker_id"]] - parent_times = self._get_times(parent_info) - parent_trace = { - "cat": "submit_task", - "pid": "Node " + str(parent_worker["node_ip_address"]), - "tid": parent_info["worker_id"], - "ts": micros(min(parent_times)), - "ph": "s", - "name": "SubmitTask", - "args": {}, - "id": str(worker) + if breakdowns: + if "get_arguments_end" in info: + get_args_trace = { + "cat": "get_arguments", + "pid": "Node " + str(worker["node_ip_address"]), + "tid": info["worker_id"], + "id": str(worker), + "ts": micros_rel(info["get_arguments_start"]), + "ph": "X", + "name": info["function_name"] + ":get_arguments", + "args": delta_info, + "dur": micros(info["get_arguments_end"] - + info["get_arguments_start"]) + } + full_trace.append(get_args_trace) + + if "store_outputs_end" in info: + outputs_trace = { + "cat": "store_outputs", + "pid": "Node " + str(worker["node_ip_address"]), + "tid": info["worker_id"], + "id": str(worker), + "ts": micros_rel(info["store_outputs_start"]), + "ph": "X", + "name": info["function_name"] + ":store_outputs", + "args": delta_info, + "dur": micros(info["store_outputs_end"] - + info["store_outputs_start"]) + } + full_trace.append(outputs_trace) + + if "execute_end" in info: + execute_trace = { + "cat": "execute", + "pid": "Node " + str(worker["node_ip_address"]), + "tid": info["worker_id"], + "id": str(worker), + "ts": micros_rel(info["execute_start"]), + "ph": "X", + "name": info["function_name"] + ":execute", + "args": delta_info, + "dur": micros(info["execute_end"] - + info["execute_start"]) + } + full_trace.append(execute_trace) + else: + task = { + "cat": "task", + "pid": "Node " + str(worker["node_ip_address"]), + "tid": info["worker_id"], + "id": str(worker), + "ts": micros_rel(info["get_arguments_start"]), + "ph": "X", + "name": info["function_name"], + "args": delta_info, + "dur": micros(info["store_outputs_end"] - + info["get_arguments_start"]) } - full_trace.append(parent_trace) - - parent = { - "cat": "submit_task", - "pid": "Node " + str(parent_worker["node_ip_address"]), - "tid": parent_info["worker_id"], - "ts": micros(min(parent_times)), - "ph": "s", - "name": "SubmitTask", - "args": {}, - "id": str(worker) - } - full_trace.append(parent) - - task_trace = { - "cat": "submit_task", - "pid": "Node " + str(worker["node_ip_address"]), - "tid": info["worker_id"], - "ts": micros(min(times)), - "ph": "f", - "name": "SubmitTask", - "args": {}, - "id": str(worker) - } - full_trace.append(task_trace) - - task = { - "name": info["function_name"], - "cat": "ray_task", - "ph": "X", - "ts": micros(min(times)), - "dur": micros(max(times)) - micros(min(times)), - "pid": "Node " + str(worker["node_ip_address"]), - "tid": info["worker_id"], - "args": info - } - full_trace.append(task) + full_trace.append(task) + print("dumping {}/{}".format(len(full_trace), len(task_info))) with open(path, "w") as outfile: json.dump(full_trace, outfile) @@ -557,10 +618,11 @@ class GlobalState(object): worker_id = binary_to_hex(worker_key[len("Workers:"):]) workers_data[worker_id] = { - "local_scheduler_socket": ( - worker_info[b"local_scheduler_socket"].decode("ascii")), - "node_ip_address": ( - worker_info[b"node_ip_address"].decode("ascii")), + "local_scheduler_socket": + (worker_info[b"local_scheduler_socket"] + .decode("ascii")), + "node_ip_address": (worker_info[b"node_ip_address"] + .decode("ascii")), "plasma_manager_socket": (worker_info[b"plasma_manager_socket"] .decode("ascii")), "plasma_store_socket": (worker_info[b"plasma_store_socket"] @@ -569,3 +631,28 @@ class GlobalState(object): "stdout_file": worker_info[b"stdout_file"].decode("ascii") } return workers_data + + def _job_length(self): + event_log_sets = self.redis_client.keys("event_log*") + overall_smallest = sys.maxsize + overall_largest = 0 + num_tasks = 0 + for event_log_set in event_log_sets: + fwd_range = self.redis_client.zrange(event_log_set, + start=0, + end=0, + withscores=True) + overall_smallest = min(overall_smallest, fwd_range[0][1]) + + rev_range = self.redis_client.zrevrange(event_log_set, + start=0, + end=0, + withscores=True) + overall_largest = max(overall_largest, rev_range[0][1]) + + num_tasks += self.redis_client.zcount(event_log_set, + min=0, + max=time.time()) + if num_tasks is 0: + return 0, 0, 0 + return overall_smallest, overall_largest, num_tasks diff --git a/test/runtest.py b/test/runtest.py index 4da81d773..f36a27cc8 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1611,7 +1611,7 @@ class GlobalStateAPI(unittest.TestCase): profiles = ray.global_state.task_profiles(start=0, end=time.time()) limited_profiles = ray.global_state.task_profiles(start=0, end=time.time(), - num=1) + num_tasks=1) if len(profiles) == num_calls and len(limited_profiles) == 1: break time.sleep(0.1) @@ -1676,7 +1676,8 @@ class GlobalStateAPI(unittest.TestCase): ray.get([actor.method.remote() for actor in actors]) path = os.path.join("/tmp/ray_test_trace") - ray.global_state.dump_catapult_trace(path) + task_info = ray.global_state.task_profiles(start=0, end=time.time()) + ray.global_state.dump_catapult_trace(path, task_info) # TODO(rkn): This test is not perfect because it does not verify that # the visualization actually renders (e.g., the context of the dumped