diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 6d5b8f7e7..812f7111a 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -108,8 +108,6 @@ class Profiler(object): # This is to suppress errors that occur at shutdown. pass - # TODO(rkn): Support calling this function in the middle of a task, and - # also call this periodically in the background from the driver. def flush_profile_data(self): """Push the logged profiling data to the global control store. diff --git a/python/ray/worker.py b/python/ray/worker.py index 17fd411c6..014b12392 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1001,8 +1001,11 @@ class Worker(object): with profiling.profile("task", extra_data=extra_data, worker=self): self._process_task(task) - # Push all of the log events to the global state store. - self.profiler.flush_profile_data() + # In the non-raylet code path, push all of the log events to the global + # state store. In the raylet code path, this is done periodically in a + # background thread. + if not self.use_raylet: + self.profiler.flush_profile_data() # Increase the task execution counter. self.num_task_executions[driver_id][function_id.id()] += 1 @@ -2191,7 +2194,9 @@ def connect(info, t.daemon = True t.start() - if mode in [SCRIPT_MODE, SILENT_MODE] and worker.use_raylet: + # If we are using the raylet code path and we are not in local mode, start + # a background thread to periodically flush profiling data to the GCS. + if mode != LOCAL_MODE and worker.use_raylet: worker.profiler.start_flush_thread() if mode in [SCRIPT_MODE, SILENT_MODE]: