diff --git a/CMakeLists.txt b/CMakeLists.txt index 3980de004..a6734e62c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -111,7 +111,10 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES") list(APPEND ray_file_list "src/credis/redis/src/redis-server") endif() - if (DEFINED ENV{INCLUDE_UI} AND "$ENV{INCLUDE_UI}" STREQUAL "1") + # The goal of the if statement below is to require the catapult files to be + # present INCLUDE_UI=1 is set and to include the UI files if they are present. + # This should match the logic in build_ui.sh. + if (EXISTS "${CMAKE_BINARY_DIR}/src/catapult_files/index.html" OR "$ENV{INCLUDE_UI}" STREQUAL "1") list(APPEND ray_file_list "src/catapult_files/index.html") list(APPEND ray_file_list "src/catapult_files/trace_viewer_full.html") endif() diff --git a/python/ray/WebUI.ipynb b/python/ray/WebUI.ipynb index 390263827..229366eba 100644 --- a/python/ray/WebUI.ipynb +++ b/python/ray/WebUI.ipynb @@ -1,150 +1,97 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Ray UI\n", - "\n", - "Start the UI with **Kernel -> Restart and Run All**." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "import ray\n", - "import ray.experimental.ui as ui\n", - "\n", - "ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Object search." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.object_search_bar()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Task search." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.task_search_bar()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Task trace timeline." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "To view arrows, go to View Options and select Flow Events." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.task_timeline()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Task durations." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.task_completion_time_distribution()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### CPU usage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.cpu_usage()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Cluster usage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.cluster_usage()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.1" - } - }, - "nbformat": 4, - "nbformat_minor": 2 + "cells": [{ + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Ray UI\n", "\n", + "Start the UI with **Kernel -> Restart and Run All**." + ] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", "import ray\n", + "import ray.experimental.ui as ui\n", "\n", + "ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])" + ] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Task trace timeline."] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To view arrows, go to View Options and select Flow Events." + ] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.task_timeline()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Object transfer timeline."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.object_transfer_timeline()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Task durations."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.task_completion_time_distribution()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### CPU usage."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.cpu_usage()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Cluster usage."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.cluster_usage()"] + }], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.1" + } + }, + "nbformat": 4, + "nbformat_minor": 2 } diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 538e52652..d97cc274f 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -2,9 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import copy from collections import defaultdict -import heapq import json import redis import sys @@ -398,129 +396,6 @@ class GlobalState(object): return ip_filename_file - def task_profiles(self, num_tasks, start=None, end=None, fwd=True): - """Fetch and return a list of task profiles. - - Args: - num_tasks: A limit on the number of tasks that task_profiles will - return. - start: The start point of the time window that is queried for - tasks. - end: The end point in time of the time window that is queried for - tasks. - fwd: If True, means that zrange will be used. If False, zrevrange. - This argument is only meaningful in conjunction with the - num_tasks argument. This controls whether the tasks returned - are the most recent or the least recent. - - Returns: - A tuple of two elements. The first element is a dictionary mapping - the task ID of a task to a list of the profiling information - for all of the executions of that task. The second element is a - list of profiling information for tasks where the events have - no task ID. - """ - task_info = {} - event_log_sets = self.redis_client.keys("event_log*") - - # The heap is used to maintain the set of x tasks that occurred the - # most recently across all of the workers, where x is defined as the - # function parameter num. The key is the start time of the "get_task" - # component of each task. Calling heappop will result in the task with - # the earliest "get_task_start" to be removed from the heap. - heap = [] - heapq.heapify(heap) - heap_size = 0 - - # Set up a param dict to pass the redis command - params = {"withscores": True} - if start is not None: - params["min"] = start - elif end is not None: - params["min"] = 0 - - if end is not None: - params["max"] = end - elif start is not None: - params["max"] = time.time() - - if start is None and end is None: - params["end"] = num_tasks - 1 - else: - params["num"] = num_tasks - params["start"] = 0 - - # Parse through event logs to determine task start and end points. - for event_log_set in event_log_sets: - if start is None and end is None: - if fwd: - event_list = self.redis_client.zrange( - event_log_set, **params) - else: - event_list = self.redis_client.zrevrange( - event_log_set, **params) - else: - if fwd: - event_list = self.redis_client.zrangebyscore( - event_log_set, **params) - else: - event_list = self.redis_client.zrevrangebyscore( - event_log_set, **params) - - for (event, score) in event_list: - event_dict = json.loads(decode(event)) - task_id = "" - for event in event_dict: - if "task_id" in event[3]: - task_id = event[3]["task_id"] - task_info[task_id] = {} - task_info[task_id]["score"] = score - # Add task to (min/max) heap by its start point. - # if fwd, we want to delete the largest elements, so -score - heapq.heappush(heap, (-score if fwd else score, task_id)) - heap_size += 1 - - for event in event_dict: - if event[1] == "get_task" and event[2] == 1: - task_info[task_id]["get_task_start"] = event[0] - if event[1] == "get_task" and event[2] == 2: - task_info[task_id]["get_task_end"] = event[0] - if (event[1] == "register_remote_function" - and event[2] == 1): - task_info[task_id]["import_remote_start"] = event[0] - if (event[1] == "register_remote_function" - and event[2] == 2): - task_info[task_id]["import_remote_end"] = event[0] - if (event[1] == "task:deserialize_arguments" - and event[2] == 1): - task_info[task_id]["get_arguments_start"] = event[0] - if (event[1] == "task:deserialize_arguments" - and event[2] == 2): - task_info[task_id]["get_arguments_end"] = event[0] - if event[1] == "task:execute" and event[2] == 1: - task_info[task_id]["execute_start"] = event[0] - if event[1] == "task:execute" and event[2] == 2: - task_info[task_id]["execute_end"] = event[0] - if event[1] == "task:store_outputs" and event[2] == 1: - task_info[task_id]["store_outputs_start"] = event[0] - if event[1] == "task:store_outputs" and event[2] == 2: - task_info[task_id]["store_outputs_end"] = event[0] - if "worker_id" in event[3]: - task_info[task_id]["worker_id"] = event[3]["worker_id"] - if "function_name" in event[3]: - task_info[task_id]["function_name"] = ( - event[3]["function_name"]) - - if heap_size > num_tasks: - min_task, task_id_hex = heapq.heappop(heap) - del task_info[task_id_hex] - heap_size -= 1 - - for key, info in task_info.items(): - self._add_missing_timestamps(info) - - return task_info - def _profile_table(self, component_id): """Get the profile events for a given component. @@ -806,341 +681,6 @@ class GlobalState(object): else: return all_events - def dump_catapult_trace(self, - path, - task_info, - breakdowns=True, - task_dep=True, - obj_dep=True): - """Dump task profiling information to a file. - - This information can be viewed as a timeline of profiling information - by going to chrome://tracing in the chrome web browser and loading the - appropriate file. - - Args: - path: The filepath to dump the profiling information to. - task_info: The task info to use to generate the trace. Should be - the output of ray.global_state.task_profiles(). - breakdowns: Boolean indicating whether to break down the tasks into - more fine-grained segments. - task_dep: Boolean indicating whether or not task submission edges - should be included in the trace. - obj_dep: Boolean indicating whether or not object dependency edges - should be included in the trace. - """ - workers = self.workers() - - task_table = {} - # TODO(ekl) reduce the number of RPCs here with MGET - for task_id, _ in task_info.items(): - try: - # TODO (hme): do something to correct slider here, - # slider should be correct to begin with, though. - task_table[task_id] = self.task_table(task_id) - task_table[task_id]["TaskSpec"]["Args"] = [ - repr(arg) - for arg in task_table[task_id]["TaskSpec"]["Args"] - ] - except Exception: - print("Could not find task {}".format(task_id)) - - # filter out tasks not in task_table - task_info = {k: v for k, v in task_info.items() if k in task_table} - - start_time = None - for info in task_info.values(): - task_start = min(self._get_times(info)) - if not start_time or task_start < start_time: - start_time = task_start - - def micros(ts): - return int(1e6 * ts) - - def micros_rel(ts): - return micros(ts - start_time) - - seen_obj = {} - - full_trace = [] - for task_id, info in task_info.items(): - worker = workers[info["worker_id"]] - task_t_info = task_table[task_id] - - # The total_info dictionary is what is displayed when selecting a - # task in the timeline. We copy the task spec so that we don't - # modify it in place since we will use the original values later. - total_info = copy.copy(task_table[task_id]["TaskSpec"]) - total_info["Args"] = [ - oid.hex() if isinstance(oid, ray.ObjectID) else oid - for oid in task_t_info["TaskSpec"]["Args"] - ] - total_info["ReturnObjectIDs"] = [ - oid.hex() for oid in task_t_info["TaskSpec"]["ReturnObjectIDs"] - ] - total_info["LocalSchedulerID"] = task_t_info["LocalSchedulerID"] - total_info["get_arguments"] = ( - info["get_arguments_end"] - info["get_arguments_start"]) - total_info["execute"] = ( - info["execute_end"] - info["execute_start"]) - total_info["store_outputs"] = ( - info["store_outputs_end"] - info["store_outputs_start"]) - total_info["function_name"] = info["function_name"] - total_info["worker_id"] = info["worker_id"] - - parent_info = task_info.get( - task_table[task_id]["TaskSpec"]["ParentTaskID"]) - worker = workers[info["worker_id"]] - # The catapult trace format documentation can be found here: - # https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview # noqa: E501 - if breakdowns: - if "get_arguments_end" in info: - get_args_trace = { - "cat": "get_arguments", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["get_arguments_start"]), - "ph": "X", - "name": info["function_name"] + ":get_arguments", - "args": total_info, - "dur": micros(info["get_arguments_end"] - - info["get_arguments_start"]), - "cname": "rail_idle" - } - full_trace.append(get_args_trace) - - if "store_outputs_end" in info: - outputs_trace = { - "cat": "store_outputs", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["store_outputs_start"]), - "ph": "X", - "name": info["function_name"] + ":store_outputs", - "args": total_info, - "dur": micros(info["store_outputs_end"] - - info["store_outputs_start"]), - "cname": "thread_state_runnable" - } - full_trace.append(outputs_trace) - - if "execute_end" in info: - execute_trace = { - "cat": "execute", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["execute_start"]), - "ph": "X", - "name": info["function_name"] + ":execute", - "args": total_info, - "dur": micros(info["execute_end"] - - info["execute_start"]), - "cname": "rail_animation" - } - full_trace.append(execute_trace) - - else: - if parent_info: - parent_worker = workers[parent_info["worker_id"]] - parent_times = self._get_times(parent_info) - parent_profile = task_info.get( - task_table[task_id]["TaskSpec"]["ParentTaskID"]) - - _parent_id = parent_info["worker_id"] + str( - micros(min(parent_times))) - - parent = { - "cat": "submit_task", - "pid": "Node " + parent_worker["node_ip_address"], - "tid": parent_info["worker_id"], - "ts": micros_rel( - parent_profile - and parent_profile["get_arguments_start"] - or start_time), - "ph": "s", - "name": "SubmitTask", - "args": {}, - "id": _parent_id, - } - full_trace.append(parent) - - _id = info["worker_id"] + str(micros(min(parent_times))) - - task_trace = { - "cat": "submit_task", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "ts": micros_rel(info["get_arguments_start"]), - "ph": "f", - "name": "SubmitTask", - "args": {}, - "id": _id, - "bp": "e", - "cname": "olive" - } - full_trace.append(task_trace) - - task = { - "cat": "task", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["get_arguments_start"]), - "ph": "X", - "name": info["function_name"], - "args": total_info, - "dur": micros(info["store_outputs_end"] - - info["get_arguments_start"]), - "cname": "thread_state_runnable" - } - full_trace.append(task) - - if task_dep: - if parent_info: - parent_worker = workers[parent_info["worker_id"]] - parent_times = self._get_times(parent_info) - parent_profile = task_info.get( - task_table[task_id]["TaskSpec"]["ParentTaskID"]) - - _parent_id = parent_info["worker_id"] + str( - micros(min(parent_times))) - - parent = { - "cat": "submit_task", - "pid": "Node " + parent_worker["node_ip_address"], - "tid": parent_info["worker_id"], - "ts": micros_rel( - parent_profile - and parent_profile["get_arguments_start"] - or start_time), - "ph": "s", - "name": "SubmitTask", - "args": {}, - "id": _parent_id, - } - full_trace.append(parent) - - _id = info["worker_id"] + str(micros(min(parent_times))) - - task_trace = { - "cat": "submit_task", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "ts": micros_rel(info["get_arguments_start"]), - "ph": "f", - "name": "SubmitTask", - "args": {}, - "id": _id, - "bp": "e" - } - full_trace.append(task_trace) - - if obj_dep: - args = task_table[task_id]["TaskSpec"]["Args"] - for arg in args: - # Don't visualize arguments that are not object IDs. - if isinstance(arg, ray.ObjectID): - object_info = self._object_table(arg) - # Don't visualize objects that were created by calls to - # put. - if not object_info["IsPut"]: - if arg not in seen_obj: - seen_obj[arg] = 0 - seen_obj[arg] += 1 - owner_task = self._object_table(arg)["TaskID"] - if owner_task in task_info: - owner_worker = (workers[task_info[owner_task][ - "worker_id"]]) - # Adding/subtracting 2 to the time associated - # with the beginning/ending of the flow event - # is necessary to make the flow events show up - # reliably. When these times are exact, this is - # presumably an edge case, and catapult doesn't - # recognize that there is a duration event at - # that exact point in time that the flow event - # should be bound to. This issue is solved by - # adding the 2 ms to the start/end time of the - # flow event, which guarantees overlap with the - # duration event that it's associated with, and - # the flow event therefore always gets drawn. - owner = { - "cat": "obj_dependency", - "pid": ("Node " + - owner_worker["node_ip_address"]), - "tid": task_info[owner_task]["worker_id"], - "ts": micros_rel(task_info[owner_task] - ["store_outputs_end"]) - - 2, - "ph": "s", - "name": "ObjectDependency", - "args": {}, - "bp": "e", - "cname": "cq_build_attempt_failed", - "id": "obj" + str(arg) + str(seen_obj[arg]) - } - full_trace.append(owner) - - dependent = { - "cat": "obj_dependency", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "ts": micros_rel(info["get_arguments_start"]) + - 2, - "ph": "f", - "name": "ObjectDependency", - "args": {}, - "cname": "cq_build_attempt_failed", - "bp": "e", - "id": "obj" + str(arg) + str(seen_obj[arg]) - } - full_trace.append(dependent) - - print("Creating JSON {}/{}".format(len(full_trace), len(task_info))) - with open(path, "w") as outfile: - json.dump(full_trace, outfile) - - def _get_times(self, data): - """Extract the numerical times from a task profile. - - This is a helper method for dump_catapult_trace. - - Args: - data: This must be a value in the dictionary returned by the - task_profiles function. - """ - all_times = [] - all_times.append(data["acquire_lock_start"]) - all_times.append(data["acquire_lock_end"]) - all_times.append(data["get_arguments_start"]) - all_times.append(data["get_arguments_end"]) - all_times.append(data["execute_start"]) - all_times.append(data["execute_end"]) - all_times.append(data["store_outputs_start"]) - all_times.append(data["store_outputs_end"]) - return all_times - - def _add_missing_timestamps(self, info): - """Fills in any missing timestamp values in a task info. - - Task timestamps may be missing if the task fails or is partially - executed. - """ - - keys = [ - "acquire_lock_start", "acquire_lock_end", "get_arguments_start", - "get_arguments_end", "execute_start", "execute_end", - "store_outputs_start", "store_outputs_end" - ] - - latest_timestamp = 0 - for key in keys: - cur = info.get(key, latest_timestamp) - info[key] = cur - latest_timestamp = cur - def workers(self): """Get a dictionary mapping worker ID to worker information.""" worker_keys = self.redis_client.keys("Worker*") diff --git a/python/ray/experimental/ui.py b/python/ray/experimental/ui.py index da4ee9e57..15a6fd05f 100644 --- a/python/ray/experimental/ui.py +++ b/python/ray/experimental/ui.py @@ -1,20 +1,23 @@ -import ipywidgets as widgets +import logging import numpy as np import os import pprint -import ray import shutil import tempfile import time +import ipywidgets as widgets from IPython.display import display, IFrame, clear_output +import ray + +logger = logging.getLogger(__name__) + + # Instances of this class maintains keep track of whether or not a # callback is currently executing. Since the execution of the callback # may trigger more calls to the callback, this is used to prevent infinite # recursions. - - class _EventRecursionContextManager(object): def __init__(self): self.should_recurse = True @@ -185,36 +188,6 @@ def get_sliders(update): range_slider.value = (100 + int( 100 * float(num_tasks_box.value) / num_tasks), 100) - if not update: - return - - diff = largest - smallest - - # Low and high are used to scale the times that are - # queried to be relative to the absolute time. - low, high = map(lambda x: x / 100., range_slider.value) - - # Queries to task_profiles based on the slider and text - # box values. - # (Querying based on the % total amount of time.) - if breakdown_opt.value == total_time_value: - tasks = _truncated_task_profiles( - start=(smallest + diff * low), - end=(smallest + diff * high)) - - # (Querying based on % of total number of tasks that were - # run.) - elif breakdown_opt.value == total_tasks_value: - if range_slider.value[0] == 0: - tasks = _truncated_task_profiles( - num_tasks=(int(num_tasks * high)), fwd=True) - else: - tasks = _truncated_task_profiles( - num_tasks=(int(num_tasks * (high - low))), - fwd=False) - - update(smallest, largest, num_tasks, tasks) - # Get updated values from a slider or text box, and update the rest of # them accordingly. range_slider.observe(update_wrapper, names="value") @@ -268,20 +241,6 @@ def task_search_bar(): MAX_TASKS_TO_VISUALIZE = 10000 -# Wrapper that enforces a limit on the number of tasks to visualize -def _truncated_task_profiles(start=None, end=None, num_tasks=None, fwd=True): - if num_tasks is None: - num_tasks = MAX_TASKS_TO_VISUALIZE - print("Warning: at most {} tasks will be fetched within this " - "time range.".format(MAX_TASKS_TO_VISUALIZE)) - elif num_tasks > MAX_TASKS_TO_VISUALIZE: - print("Warning: too many tasks to visualize, " - "fetching only the first {} of {}.".format( - MAX_TASKS_TO_VISUALIZE, num_tasks)) - num_tasks = MAX_TASKS_TO_VISUALIZE - return ray.global_state.task_profiles(num_tasks, start, end, fwd) - - # Helper function that guarantees unique and writeable temp files. # Prevents clashes in task trace files when multiple notebooks are running. def _get_temp_file_path(**kwargs): @@ -293,32 +252,6 @@ def _get_temp_file_path(**kwargs): def task_timeline(): - path_input = widgets.Button(description="View task timeline") - - breakdown_basic = "Basic" - breakdown_task = "Task Breakdowns" - - breakdown_opt = widgets.Dropdown( - options=["Basic", "Task Breakdowns"], - value="Task Breakdowns", - disabled=False, - ) - obj_dep = widgets.Checkbox( - value=True, disabled=False, layout=widgets.Layout(width='20px')) - task_dep = widgets.Checkbox( - value=True, disabled=False, layout=widgets.Layout(width='20px')) - # Labels to bypass width limitation for descriptions. - label_tasks = widgets.Label( - value='Task submissions', layout=widgets.Layout(width='110px')) - label_objects = widgets.Label( - value='Object dependencies', layout=widgets.Layout(width='130px')) - label_options = widgets.Label( - value='View options:', layout=widgets.Layout(width='100px')) - start_box, end_box, range_slider, time_opt = get_sliders(False) - display(widgets.HBox([task_dep, label_tasks, obj_dep, label_objects])) - display(widgets.HBox([label_options, breakdown_opt])) - display(path_input) - # Check that the trace viewer renderer file is present, and copy it to the # current working directory if it is not present. if not os.path.exists("trace_viewer_full.html"): @@ -328,76 +261,69 @@ def task_timeline(): "../core/src/catapult_files/trace_viewer_full.html"), "trace_viewer_full.html") - def handle_submit(sender): - json_tmp = tempfile.mktemp() + ".json" + trace_viewer_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../core/src/catapult_files/index.html") - # Determine whether task components should be displayed or not. - if breakdown_opt.value == breakdown_basic: - breakdown = False - elif breakdown_opt.value == breakdown_task: - breakdown = True - else: - raise ValueError("Unexpected breakdown value '{}'".format( - breakdown_opt.value)) + html_file_path = _get_temp_file_path(suffix=".html") + json_file_path = _get_temp_file_path(suffix=".json") - low, high = map(lambda x: x / 100., range_slider.value) + ray.global_state.chrome_tracing_dump(filename=json_file_path) - smallest, largest, num_tasks = ray.global_state._job_length() - diff = largest - smallest + with open(trace_viewer_path) as f: + data = f.read() - if time_opt.value == total_time_value: - tasks = _truncated_task_profiles( - start=smallest + diff * low, end=smallest + diff * high) - elif time_opt.value == total_tasks_value: - if range_slider.value[0] == 0: - tasks = _truncated_task_profiles( - num_tasks=int(num_tasks * high), fwd=True) - else: - tasks = _truncated_task_profiles( - num_tasks=int(num_tasks * (high - low)), fwd=False) - else: - raise ValueError("Unexpected time value '{}'".format( - time_opt.value)) - # Write trace to a JSON file - print("Collected profiles for {} tasks.".format(len(tasks))) - print("Dumping task profile data to {}, " - "this might take a while...".format(json_tmp)) - ray.global_state.dump_catapult_trace( - json_tmp, - tasks, - breakdowns=breakdown, - obj_dep=obj_dep.value, - task_dep=task_dep.value) - print("Opening html file in browser...") + # Replace the demo data path with our own + # https://github.com/catapult-project/catapult/blob/ + # 33a9271eb3cf5caf925293ec6a4b47c94f1ac968/tracing/bin/index.html#L107 + data = data.replace("../test_data/big_trace.json", json_file_path) - trace_viewer_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "../core/src/catapult_files/index.html") + with open(html_file_path, "w+") as f: + f.write(data) - html_file_path = _get_temp_file_path(suffix=".html") - json_file_path = _get_temp_file_path(suffix=".json") + # Display the task trace within the Jupyter notebook + clear_output(wait=True) + logger.info("To view fullscreen, open chrome://tracing in Google Chrome " + "and load `{}`".format(os.path.abspath(json_file_path))) + display(IFrame(html_file_path, 900, 800)) - print("Pointing to {} named {}".format(json_tmp, json_file_path)) - shutil.copy(json_tmp, json_file_path) - with open(trace_viewer_path) as f: - data = f.read() +def object_transfer_timeline(): + # Check that the trace viewer renderer file is present, and copy it to the + # current working directory if it is not present. + if not os.path.exists("trace_viewer_full.html"): + shutil.copy( + os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../core/src/catapult_files/trace_viewer_full.html"), + "trace_viewer_full.html") - # Replace the demo data path with our own - # https://github.com/catapult-project/catapult/blob/ - # 33a9271eb3cf5caf925293ec6a4b47c94f1ac968/tracing/bin/index.html#L107 - data = data.replace("../test_data/big_trace.json", json_file_path) + trace_viewer_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../core/src/catapult_files/index.html") - with open(html_file_path, "w+") as f: - f.write(data) + html_file_path = _get_temp_file_path(suffix=".html") + json_file_path = _get_temp_file_path(suffix=".json") - # Display the task trace within the Jupyter notebook - clear_output(wait=True) - print("To view fullscreen, open chrome://tracing in Google Chrome " - "and load `{}`".format(json_tmp)) - display(IFrame(html_file_path, 900, 800)) + ray.global_state.chrome_tracing_object_transfer_dump( + filename=json_file_path) - path_input.on_click(handle_submit) + with open(trace_viewer_path) as f: + data = f.read() + + # Replace the demo data path with our own + # https://github.com/catapult-project/catapult/blob/ + # 33a9271eb3cf5caf925293ec6a4b47c94f1ac968/tracing/bin/index.html#L107 + data = data.replace("../test_data/big_trace.json", json_file_path) + + with open(html_file_path, "w+") as f: + f.write(data) + + # Display the task trace within the Jupyter notebook + clear_output(wait=True) + logger.info("To view fullscreen, open chrome://tracing in Google Chrome " + "and load `{}`".format(os.path.abspath(json_file_path))) + display(IFrame(html_file_path, 900, 800)) def task_completion_time_distribution(): @@ -562,12 +488,7 @@ def cpu_usage(): output_notebook(resources=CDN) # Parse the client table to determine how many CPUs are available - num_cpus = 0 - client_table = ray.global_state.client_table() - for node_ip, client_list in client_table.items(): - for client in client_list: - if "CPU" in client: - num_cpus += client["CPU"] + num_cpus = ray.global_state.cluster_resources()["CPU"] # Update the plot based on the sliders def plot_utilization():