From 8a3e180156c4d1fd2d611f464231f25ba7b3e08b Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 16 Jul 2018 09:09:52 +0800 Subject: [PATCH] Move profiling code to a new file and fix thread safety (#2397) --- python/ray/__init__.py | 13 +- python/ray/import_thread.py | 16 ++- python/ray/profiling.py | 271 ++++++++++++++++++++++++++++++++++++ python/ray/worker.py | 259 ++-------------------------------- 4 files changed, 301 insertions(+), 258 deletions(-) create mode 100644 python/ray/profiling.py diff --git a/python/ray/__init__.py b/python/ray/__init__.py index da38977d1..e689eb9fc 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -47,9 +47,9 @@ except ImportError as e: raise from ray.local_scheduler import ObjectID, _config # noqa: E402 +from ray.profiling import profile # noqa: E402 from ray.worker import (error_info, init, connect, disconnect, get, put, wait, - remote, profile, flush_profile_data, get_gpu_ids, - get_resource_ids, get_webui_url, + remote, get_gpu_ids, get_resource_ids, get_webui_url, register_custom_serializer, shutdown) # noqa: E402 from ray.worker import (SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, SILENT_MODE) # noqa: E402 @@ -65,11 +65,10 @@ __version__ = "0.5.0" __all__ = [ "error_info", "init", "connect", "disconnect", "get", "put", "wait", - "remote", "profile", "flush_profile_data", "actor", "method", - "get_gpu_ids", "get_resource_ids", "get_webui_url", - "register_custom_serializer", "shutdown", "SCRIPT_MODE", "WORKER_MODE", - "LOCAL_MODE", "SILENT_MODE", "global_state", "ObjectID", "_config", - "__version__" + "remote", "profile", "actor", "method", "get_gpu_ids", "get_resource_ids", + "get_webui_url", "register_custom_serializer", "shutdown", "SCRIPT_MODE", + "WORKER_MODE", "LOCAL_MODE", "SILENT_MODE", "global_state", "ObjectID", + "_config", "__version__" ] import ctypes # noqa: E402 diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 0207d5659..6d41f78eb 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -10,6 +10,7 @@ import redis import ray from ray import ray_constants from ray import cloudpickle as pickle +from ray import profiling from ray import utils @@ -74,21 +75,23 @@ class ImportThread(object): def _process_key(self, key): """Process the given export key from redis.""" - from ray.worker import profile, WORKER_MODE # Handle the driver case first. - if self.mode != WORKER_MODE: + if self.mode != ray.WORKER_MODE: if key.startswith(b"FunctionsToRun"): - with profile("fetch_and_run_function", worker=self.worker): + with profiling.profile( + "fetch_and_run_function", worker=self.worker): self.fetch_and_execute_function_to_run(key) # Return because FunctionsToRun are the only things that # the driver should import. return if key.startswith(b"RemoteFunction"): - with profile("register_remote_function", worker=self.worker): + with profiling.profile( + "register_remote_function", worker=self.worker): self.fetch_and_register_remote_function(key) elif key.startswith(b"FunctionsToRun"): - with profile("fetch_and_run_function", worker=self.worker): + with profiling.profile( + "fetch_and_run_function", worker=self.worker): self.fetch_and_execute_function_to_run(key) elif key.startswith(b"ActorClass"): # Keep track of the fact that this actor class has been @@ -154,11 +157,10 @@ class ImportThread(object): def fetch_and_execute_function_to_run(self, key): """Run on arbitrary function on the worker.""" - from ray.worker import SCRIPT_MODE, SILENT_MODE driver_id, serialized_function = self.redis_client.hmget( key, ["driver_id", "function"]) - if (self.worker.mode in [SCRIPT_MODE, SILENT_MODE] + if (self.worker.mode in [ray.SCRIPT_MODE, ray.SILENT_MODE] and driver_id != self.worker.task_driver_id.id()): # This export was from a different driver and there's no need for # this driver to import it. diff --git a/python/ray/profiling.py b/python/ray/profiling.py new file mode 100644 index 000000000..6d5b8f7e7 --- /dev/null +++ b/python/ray/profiling.py @@ -0,0 +1,271 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import json +import time +import threading +import traceback + +import ray + +LOG_POINT = 0 +LOG_SPAN_START = 1 +LOG_SPAN_END = 2 + + +class _NullLogSpan(object): + """A log span context manager that does nothing""" + + def __enter__(self): + pass + + def __exit__(self, type, value, tb): + pass + + +NULL_LOG_SPAN = _NullLogSpan() + + +def profile(event_type, extra_data=None, worker=None): + """Profile a span of time so that it appears in the timeline visualization. + + Note that this only works in the raylet code path. + + This function can be used as follows (both on the driver or within a task). + + .. code-block:: python + + with ray.profile("custom event", extra_data={'key': 'value'}): + # Do some computation here. + + Optionally, a dictionary can be passed as the "extra_data" argument, and + it can have keys "name" and "cname" if you want to override the default + timeline display text and box color. Other values will appear at the bottom + of the chrome tracing GUI when you click on the box corresponding to this + profile span. + + Args: + event_type: A string describing the type of the event. + extra_data: This must be a dictionary mapping strings to strings. This + data will be added to the json objects that are used to populate + the timeline, so if you want to set a particular color, you can + simply set the "cname" attribute to an appropriate color. + Similarly, if you set the "name" attribute, then that will set the + text displayed on the box in the timeline. + + Returns: + An object that can profile a span of time via a "with" statement. + """ + if worker is None: + worker = ray.worker.global_worker + if not worker.use_raylet: + # Log the event if this is a worker and not a driver, since the + # driver's event log never gets flushed. + if worker.mode == ray.WORKER_MODE: + return RayLogSpanNonRaylet( + worker.profiler, event_type, contents=extra_data) + else: + return NULL_LOG_SPAN + else: + return RayLogSpanRaylet( + worker.profiler, event_type, extra_data=extra_data) + + +class Profiler(object): + """A class that holds the profiling states. + + Attributes: + worker: the worker to profile. + events: the buffer of events. + lock: the lock to protect access of events. + """ + + def __init__(self, worker): + self.worker = worker + self.events = [] + self.lock = threading.Lock() + + def start_flush_thread(self): + t = threading.Thread(target=self._periodically_flush_profile_events) + # Making the thread a daemon causes it to exit when the main thread + # exits. + t.daemon = True + t.start() + + def _periodically_flush_profile_events(self): + """Drivers run this as a thread to flush profile data in the + background.""" + # Note(rkn): This is run on a background thread in the driver. It uses + # the local scheduler client. This should be ok because it doesn't read + # from the local scheduler client and we have the GIL here. However, + # if either of those things changes, then we could run into issues. + try: + while True: + time.sleep(1) + self.flush_profile_data() + except AttributeError: + # This is to suppress errors that occur at shutdown. + pass + + # TODO(rkn): Support calling this function in the middle of a task, and + # also call this periodically in the background from the driver. + def flush_profile_data(self): + """Push the logged profiling data to the global control store. + + By default, profiling information for a given task won't appear in the + timeline until after the task has completed. For very long-running + tasks, we may want profiling information to appear more quickly. + In such cases, this function can be called. Note that as an + aalternative, we could start thread in the background on workers that + calls this automatically. + """ + with self.lock: + events = self.events + self.events = [] + + if not self.worker.use_raylet: + event_log_key = b"event_log:" + self.worker.worker_id + event_log_value = json.dumps(events) + self.worker.local_scheduler_client.log_event( + event_log_key, event_log_value, time.time()) + else: + if self.worker.mode == ray.WORKER_MODE: + component_type = "worker" + else: + component_type = "driver" + + self.worker.local_scheduler_client.push_profile_events( + component_type, ray.ObjectID(self.worker.worker_id), + self.worker.node_ip_address, events) + + def add_event(self, event): + with self.lock: + self.events.append(event) + + +class RayLogSpanNonRaylet(object): + """An object used to enable logging a span of events with a with statement. + + Attributes: + event_type (str): The type of the event being logged. + contents: Additional information to log. + """ + + def __init__(self, profiler, event_type, contents=None): + """Initialize a RayLogSpanNonRaylet object.""" + self.profiler = profiler + self.event_type = event_type + self.contents = contents + + def _log(self, event_type, kind, contents=None): + """Log an event to the global state store. + + This adds the event to a buffer of events locally. The buffer can be + flushed and written to the global state store by calling + flush_profile_data(). + + Args: + event_type (str): The type of the event. + contents: More general data to store with the event. + kind (int): Either LOG_POINT, LOG_SPAN_START, or LOG_SPAN_END. This + is LOG_POINT if the event being logged happens at a single + point in time. It is LOG_SPAN_START if we are starting to log a + span of time, and it is LOG_SPAN_END if we are finishing + logging a span of time. + """ + # TODO(rkn): This code currently takes around half a microsecond. Since + # we call it tens of times per task, this adds up. We will need to redo + # the logging code, perhaps in C. + contents = {} if contents is None else contents + assert isinstance(contents, dict) + # Make sure all of the keys and values in the dictionary are strings. + contents = {str(k): str(v) for k, v in contents.items()} + self.profiler.add_event((time.time(), event_type, kind, contents)) + + def __enter__(self): + """Log the beginning of a span event.""" + self._log( + event_type=self.event_type, + contents=self.contents, + kind=LOG_SPAN_START) + + def __exit__(self, type, value, tb): + """Log the end of a span event. Log any exception that occurred.""" + if type is None: + self._log(event_type=self.event_type, kind=LOG_SPAN_END) + else: + self._log( + event_type=self.event_type, + contents={ + "type": str(type), + "value": value, + "traceback": traceback.format_exc() + }, + kind=LOG_SPAN_END) + + +class RayLogSpanRaylet(object): + """An object used to enable logging a span of events with a with statement. + + Attributes: + event_type (str): The type of the event being logged. + contents: Additional information to log. + """ + + def __init__(self, profiler, event_type, extra_data=None): + """Initialize a RayLogSpanRaylet object.""" + self.profiler = profiler + self.event_type = event_type + self.extra_data = extra_data if extra_data is not None else {} + + def set_attribute(self, key, value): + """Add a key-value pair to the extra_data dict. + + This can be used to add attributes that are not available when + ray.profile was called. + + Args: + key: The attribute name. + value: The attribute value. + """ + if not isinstance(key, str) or not isinstance(value, str): + raise ValueError("The extra_data argument must be a " + "dictionary mapping strings to strings.") + self.extra_data[key] = value + + def __enter__(self): + """Log the beginning of a span event. + + Returns: + The object itself is returned so that if the block is opened using + "with ray.profile(...) as prof:", we can call + "prof.set_attribute" inside the block. + """ + self.start_time = time.time() + return self + + def __exit__(self, type, value, tb): + """Log the end of a span event. Log any exception that occurred.""" + for key, value in self.extra_data.items(): + if not isinstance(key, str) or not isinstance(value, str): + raise ValueError("The extra_data argument must be a " + "dictionary mapping strings to strings.") + + if type is not None: + extra_data = json.dumps({ + "type": str(type), + "value": str(value), + "traceback": str(traceback.format_exc()), + }) + else: + extra_data = json.dumps(self.extra_data) + + event = { + "event_type": self.event_type, + "start_time": self.start_time, + "end_time": time.time(), + "extra_data": extra_data, + } + + self.profiler.add_event(event) diff --git a/python/ray/worker.py b/python/ray/worker.py index 46cc27f8f..17fd411c6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -7,7 +7,6 @@ import collections import colorama import hashlib import inspect -import json import numpy as np import os import redis @@ -32,6 +31,7 @@ import ray.local_scheduler import ray.plasma import ray.ray_constants as ray_constants from ray import import_thread +from ray import profiling from ray.utils import ( binary_to_hex, check_oversized_pickle, @@ -44,10 +44,6 @@ WORKER_MODE = 1 LOCAL_MODE = 2 SILENT_MODE = 3 -LOG_POINT = 0 -LOG_SPAN_START = 1 -LOG_SPAN_END = 2 - ERROR_KEY_PREFIX = b"Error:" DRIVER_ID_LENGTH = 20 ERROR_ID_LENGTH = 20 @@ -203,6 +199,7 @@ class Worker(object): that connect has been called already. cached_functions_to_run (List): A list of functions to run on all of the workers that should be exported as soon as connect is called. + profiler: the profiler used to aggregate profiling information. """ def __init__(self): @@ -238,6 +235,7 @@ class Worker(object): # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray.utils.get_cuda_visible_devices() + self.profiler = profiling.Profiler(self) def check_connected(self): """Check if the worker is connected. @@ -563,7 +561,7 @@ class Worker(object): Returns: The return object IDs for this task. """ - with profile("submit_task", worker=self): + with profiling.profile("submit_task", worker=self): check_main_thread() if actor_id is None: assert actor_handle_id is None @@ -868,7 +866,7 @@ class Worker(object): # Get task arguments from the object store. try: - with profile("task:deserialize_arguments", worker=self): + with profiling.profile("task:deserialize_arguments", worker=self): arguments = self._get_arguments_for_execution( function_name, args) except (RayGetError, RayGetArgumentError) as e: @@ -883,7 +881,7 @@ class Worker(object): # Execute the task. try: - with profile("task:execute", worker=self): + with profiling.profile("task:execute", worker=self): if task.actor_id().id() == NIL_ACTOR_ID: outputs = function_executor(*arguments) else: @@ -902,7 +900,7 @@ class Worker(object): # Store the outputs in the local object store. try: - with profile("task:store_outputs", worker=self): + with profiling.profile("task:store_outputs", worker=self): # If this is an actor task, then the last object ID returned by # the task is a dummy output, not returned by the function # itself. Decrement to get the correct number of return values. @@ -977,7 +975,7 @@ class Worker(object): # Wait until the function to be executed has actually been registered # on this worker. We will push warnings to the user if we spend too # long in this loop. - with profile("wait_for_function", worker=self): + with profiling.profile("wait_for_function", worker=self): self._wait_for_function(function_id, driver_id) # Execute the task. @@ -1000,11 +998,11 @@ class Worker(object): "name": function_name, "task_id": task.task_id().hex() } - with profile("task", extra_data=extra_data, worker=self): + with profiling.profile("task", extra_data=extra_data, worker=self): self._process_task(task) # Push all of the log events to the global state store. - flush_profile_data() + self.profiler.flush_profile_data() # Increase the task execution counter. self.num_task_executions[driver_id][function_id.id()] += 1 @@ -1022,7 +1020,7 @@ class Worker(object): Returns: A task from the local scheduler. """ - with profile("get_task", worker=self): + with profiling.profile("get_task", worker=self): task = self.local_scheduler_client.get_task() # Automatically restrict the GPUs available to this task. @@ -1847,21 +1845,6 @@ def custom_excepthook(type, value, tb): sys.excepthook = custom_excepthook -def _flush_profile_events(worker): - """Drivers run this as a thread to flush profile data in the background.""" - # Note(rkn): This is run on a background thread in the driver. It uses the - # local scheduler client. This should be ok because it doesn't read from - # the local scheduler client and we have the GIL here. However, if either - # of those things changes, then we could run into issues. - try: - while True: - time.sleep(1) - flush_profile_data(worker=worker) - except AttributeError: - # This is to suppress errors that occur at shutdown. - pass - - def print_error_messages_raylet(worker): """Print error messages in the background on the driver. @@ -2024,11 +2007,6 @@ def connect(info, worker.set_mode(mode) worker.use_raylet = use_raylet - # The worker.events field is used to aggregate logging information and - # display it in the web UI. Note that Python lists protected by the GIL, - # which is important because we will append to this field from multiple - # threads. - worker.events = [] # If running Ray in LOCAL_MODE, there is no need to create call # create_worker or to start the worker service. if mode == LOCAL_MODE: @@ -2214,11 +2192,7 @@ def connect(info, t.start() if mode in [SCRIPT_MODE, SILENT_MODE] and worker.use_raylet: - t = threading.Thread(target=_flush_profile_events, args=(worker, )) - # Making the thread a daemon causes it to exit when the main thread - # exits. - t.daemon = True - t.start() + worker.profiler.start_flush_thread() if mode in [SCRIPT_MODE, SILENT_MODE]: # Add the directory containing the script that is running to the Python @@ -2397,209 +2371,6 @@ def register_custom_serializer(cls, register_class_for_serialization({"worker": worker}) -class RayLogSpan(object): - """An object used to enable logging a span of events with a with statement. - - Attributes: - event_type (str): The type of the event being logged. - contents: Additional information to log. - """ - - def __init__(self, event_type, contents=None, worker=global_worker): - """Initialize a RayLogSpan object.""" - self.event_type = event_type - self.contents = contents - self.worker = worker - - def __enter__(self): - """Log the beginning of a span event.""" - _log( - event_type=self.event_type, - contents=self.contents, - kind=LOG_SPAN_START, - worker=self.worker) - - def __exit__(self, type, value, tb): - """Log the end of a span event. Log any exception that occurred.""" - if type is None: - _log( - event_type=self.event_type, - kind=LOG_SPAN_END, - worker=self.worker) - else: - _log( - event_type=self.event_type, - contents={ - "type": str(type), - "value": value, - "traceback": traceback.format_exc() - }, - kind=LOG_SPAN_END, - worker=self.worker) - - -class RayLogSpanRaylet(object): - """An object used to enable logging a span of events with a with statement. - - Attributes: - event_type (str): The type of the event being logged. - contents: Additional information to log. - """ - - def __init__(self, event_type, extra_data=None, worker=global_worker): - """Initialize a RayLogSpan object.""" - self.event_type = event_type - self.extra_data = extra_data if extra_data is not None else {} - self.worker = worker - - def set_attribute(self, key, value): - """Add a key-value pair to the extra_data dict. - - This can be used to add attributes that are not available when - ray.profile was called. - - Args: - key: The attribute name. - value: The attribute value. - """ - if not isinstance(key, str) or not isinstance(value, str): - raise ValueError("The extra_data argument must be a " - "dictionary mapping strings to strings.") - self.extra_data[key] = value - - def __enter__(self): - """Log the beginning of a span event. - - Returns: - The object itself is returned so that if the block is opened using - "with ray.profile(...) as prof:", we can call - "prof.set_attribute" inside the block. - """ - self.start_time = time.time() - return self - - def __exit__(self, type, value, tb): - """Log the end of a span event. Log any exception that occurred.""" - for key, value in self.extra_data.items(): - if not isinstance(key, str) or not isinstance(value, str): - raise ValueError("The extra_data argument must be a " - "dictionary mapping strings to strings.") - - event = { - "event_type": self.event_type, - "start_time": self.start_time, - "end_time": time.time(), - "extra_data": json.dumps(self.extra_data), - } - - if type is not None: - event["extra_data"] = json.dumps({ - "type": str(type), - "value": str(value), - "traceback": str(traceback.format_exc()), - }) - - self.worker.events.append(event) - - -def profile(event_type, extra_data=None, worker=global_worker): - """Profile a span of time so that it appears in the timeline visualization. - - Note that this only works in the raylet code path. - - This function can be used as follows (both on the driver or within a task). - - .. code-block:: python - - with ray.profile("custom event", extra_data={'key': 'value'}): - # Do some computation here. - - Optionally, a dictionary can be passed as the "extra_data" argument, and - it can have keys "name" and "cname" if you want to override the default - timeline display text and box color. Other values will appear at the bottom - of the chrome tracing GUI when you click on the box corresponding to this - profile span. - - Args: - event_type: A string describing the type of the event. - extra_data: This must be a dictionary mapping strings to strings. This - data will be added to the json objects that are used to populate - the timeline, so if you want to set a particular color, you can - simply set the "cname" attribute to an appropriate color. - Similarly, if you set the "name" attribute, then that will set the - text displayed on the box in the timeline. - - Returns: - An object that can profile a span of time via a "with" statement. - """ - if not worker.use_raylet: - return RayLogSpan(event_type, contents=extra_data, worker=worker) - else: - return RayLogSpanRaylet( - event_type, extra_data=extra_data, worker=worker) - - -def _log(event_type, kind, contents=None, worker=global_worker): - """Log an event to the global state store. - - This adds the event to a buffer of events locally. The buffer can be - flushed and written to the global state store by calling - flush_profile_data(). - - Args: - event_type (str): The type of the event. - contents: More general data to store with the event. - kind (int): Either LOG_POINT, LOG_SPAN_START, or LOG_SPAN_END. This is - LOG_POINT if the event being logged happens at a single point in - time. It is LOG_SPAN_START if we are starting to log a span of - time, and it is LOG_SPAN_END if we are finishing logging a span of - time. - """ - if worker.use_raylet: - raise Exception( - "This method is not supported in the raylet code path.") - # TODO(rkn): This code currently takes around half a microsecond. Since we - # call it tens of times per task, this adds up. We will need to redo the - # logging code, perhaps in C. - contents = {} if contents is None else contents - assert isinstance(contents, dict) - # Make sure all of the keys and values in the dictionary are strings. - contents = {str(k): str(v) for k, v in contents.items()} - # Log the event if this is a worker and not a driver, since the driver's - # event log never gets flushed. - if worker.mode == WORKER_MODE: - worker.events.append((time.time(), event_type, kind, contents)) - - -# TODO(rkn): Support calling this function in the middle of a task, and also -# call this periodically in the background from the driver. -def flush_profile_data(worker=global_worker): - """Push the logged profiling data to the global control store. - - By default, profiling information for a given task won't appear in the - timeline until after the task has completed. For very long-running tasks, - we may want profiling information to appear more quickly. In such cases, - this function can be called. Note that as an alternative, we could start - a thread in the background on workers that calls this automatically. - """ - if not worker.use_raylet: - event_log_key = b"event_log:" + worker.worker_id - event_log_value = json.dumps(worker.events) - worker.local_scheduler_client.log_event(event_log_key, event_log_value, - time.time()) - else: - if worker.mode == WORKER_MODE: - component_type = "worker" - else: - component_type = "driver" - - worker.local_scheduler_client.push_profile_events( - component_type, ray.ObjectID(worker.worker_id), - worker.node_ip_address, worker.events) - - worker.events = [] - - def get(object_ids, worker=global_worker): """Get a remote object or a list of remote objects from the object store. @@ -2621,7 +2392,7 @@ def get(object_ids, worker=global_worker): or that created one of the objects raised an exception. """ worker.check_connected() - with profile("ray.get", worker=worker): + with profiling.profile("ray.get", worker=worker): check_main_thread() if worker.mode == LOCAL_MODE: @@ -2654,7 +2425,7 @@ def put(value, worker=global_worker): The object ID assigned to this value. """ worker.check_connected() - with profile("ray.put", worker=worker): + with profiling.profile("ray.put", worker=worker): check_main_thread() if worker.mode == LOCAL_MODE: @@ -2713,7 +2484,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): type(object_id))) worker.check_connected() - with profile("ray.wait", worker=worker): + with profiling.profile("ray.wait", worker=worker): check_main_thread() # When Ray is run in LOCAL_MODE, all functions are run immediately,