Log profiling information from worker. (#178)

* Log timing events on workers.

* Have workers log to the event log through the local scheduler.

* Fixes and address comments.

* bug fix

* styling
This commit is contained in:
Robert Nishihara
2017-01-05 16:47:16 -08:00
committed by Philipp Moritz
parent 509685d240
commit 651aa6007a
12 changed files with 355 additions and 71 deletions
+1 -1
View File
@@ -15,6 +15,6 @@ if hasattr(ctypes, "windll"):
import ray.experimental
import ray.serialization
from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote
from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log
from ray.worker import Reusable, reusables
from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE
+176 -67
View File
@@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import hashlib
import os
import sys
@@ -30,6 +31,10 @@ WORKER_MODE = 1
PYTHON_MODE = 2
SILENT_MODE = 3
LOG_POINT = 0
LOG_SPAN_START = 1
LOG_SPAN_END = 2
def random_string():
return np.random.bytes(20)
@@ -476,30 +481,31 @@ class Worker(object):
be object IDs or they can be values. If they are values, they
must be serializable objecs.
"""
check_main_thread()
# Put large or complex arguments that are passed by value in the object
# store first.
args_for_photon = []
for arg in args:
if isinstance(arg, photon.ObjectID):
args_for_photon.append(arg)
elif photon.check_simple_value(arg):
args_for_photon.append(arg)
else:
args_for_photon.append(put(arg))
with log_span("ray:submit_task", worker=self):
check_main_thread()
# Put large or complex arguments that are passed by value in the object
# store first.
args_for_photon = []
for arg in args:
if isinstance(arg, photon.ObjectID):
args_for_photon.append(arg)
elif photon.check_simple_value(arg):
args_for_photon.append(arg)
else:
args_for_photon.append(put(arg))
# Submit the task to Photon.
task = photon.Task(photon.ObjectID(function_id.id()),
args_for_photon,
self.num_return_vals[function_id.id()],
self.current_task_id,
self.task_index)
# Increment the worker's task index to track how many tasks have been
# submitted by the current task so far.
self.task_index += 1
self.photon_client.submit(task)
# Submit the task to Photon.
task = photon.Task(photon.ObjectID(function_id.id()),
args_for_photon,
self.num_return_vals[function_id.id()],
self.current_task_id,
self.task_index)
# Increment the worker's task index to track how many tasks have been
# submitted by the current task so far.
self.task_index += 1
self.photon_client.submit(task)
return task.returns()
return task.returns()
def run_function_on_all_workers(self, function):
"""Run arbitrary code on all of the workers.
@@ -1014,11 +1020,14 @@ def import_thread(worker):
for i in range(worker.worker_import_counter, num_imports):
key = worker.redis_client.lindex("Exports", i)
if key.startswith(b"RemoteFunction"):
fetch_and_register_remote_function(key, worker=worker)
with log_span("ray:import_remote_function", worker=worker):
fetch_and_register_remote_function(key, worker=worker)
elif key.startswith(b"ReusableVariables"):
fetch_and_register_reusable_variable(key, worker=worker)
with log_span("ray:import_reusable_variable", worker=worker):
fetch_and_register_reusable_variable(key, worker=worker)
elif key.startswith(b"FunctionsToRun"):
fetch_and_execute_function_to_run(key, worker=worker)
with log_span("ray:import_function_to_run", worker=worker):
fetch_and_execute_function_to_run(key, worker=worker)
else:
raise Exception("This code should be unreachable.")
worker.redis_client.hincrby(worker_info_key, "export_counter", 1)
@@ -1044,6 +1053,10 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
worker.worker_id = random_string()
worker.connected = True
worker.set_mode(mode)
# The worker.events field is used to aggregate logging information and display
# it in the web UI. Note that Python lists protected by the GIL, which is
# important because we will append to this field from multiple threads.
worker.events = []
# If running Ray in PYTHON_MODE, there is no need to create call create_worker
# or to start the worker service.
if mode == PYTHON_MODE:
@@ -1061,9 +1074,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"])
# Register the worker with Redis.
if mode in [SCRIPT_MODE, SILENT_MODE]:
worker.redis_client.rpush("Drivers", worker.worker_id)
worker.redis_client.hmset(b"Drivers:" + worker.worker_id, {"node_ip_address": worker.node_ip_address})
elif mode == WORKER_MODE:
worker.redis_client.rpush("Workers", worker.worker_id)
worker.redis_client.hmset(b"Workers:" + worker.worker_id, {"node_ip_address": worker.node_ip_address})
else:
raise Exception("This code should be unreachable.")
# If this is a driver, set the current task ID and set the task index to 0.
@@ -1175,6 +1188,74 @@ def register_class(cls, pickle=False, worker=global_worker):
serialization.add_class_to_whitelist(cls, pickle=pickle)
worker.run_function_on_all_workers(register_class_for_serialization)
class RayLogSpan(object):
"""An object used to enable logging a span of events with a with statement.
Attributes:
event_type (str): The type of the event being logged.
contents: Additional information to log.
"""
def __init__(self, event_type, contents=None, worker=global_worker):
"""Initialize a RayLogSpan object."""
self.event_type = event_type
self.contents = contents
self.worker = worker
def __enter__(self):
"""Log the beginning of a span event."""
log(event_type=self.event_type,
contents=self.contents,
kind=LOG_SPAN_START,
worker=self.worker)
def __exit__(self, type, value, tb):
"""Log the end of a span event. Log any exception that occurred."""
if type is None:
log(event_type=self.event_type, kind=LOG_SPAN_END, worker=self.worker)
else:
log(event_type=self.event_type,
contents={"type": str(type),
"value": value,
"traceback": traceback.format_exc()},
kind=LOG_SPAN_END,
worker=self.worker)
def log_span(event_type, contents=None, worker=global_worker):
return RayLogSpan(event_type, contents=contents, worker=worker)
def log_event(event_type, contents=None, worker=global_worker):
log(event_type, kind=LOG_POINT, contents=contents, worker=worker)
def log(event_type, kind, contents=None, worker=global_worker):
"""Log an event to the global state store.
This adds the event to a buffer of events locally. The buffer can be flushed
and written to the global state store by calling flush_log().
Args:
event_type (str): The type of the event.
contents: More general data to store with the event.
kind (int): Either LOG_POINT, LOG_SPAN_START, or LOG_SPAN_END. This is
LOG_POINT if the event being logged happens at a single point in time. It
is LOG_SPAN_START if we are starting to log a span of time, and it is
LOG_SPAN_END if we are finishing logging a span of time.
"""
# TODO(rkn): This code currently takes around half a microsecond. Since we
# call it tens of times per task, this adds up. We will need to redo the
# logging code, perhaps in C.
contents = {} if contents is None else contents
assert isinstance(contents, dict)
# Make sure all of the keys and values in the dictionary are strings.
contents = {str(k): str(v) for k, v in contents.items()}
worker.events.append((time.time(), event_type, kind, contents))
def flush_log(worker=global_worker):
"""Send the logged worker events to the global state store."""
event_log_key = b"event_log:" + worker.worker_id + b":" + worker.current_task_id.id()
event_log_value = json.dumps(worker.events)
worker.photon_client.log_event(event_log_key, event_log_value)
worker.events = []
def get(objectid, worker=global_worker):
"""Get a remote object or a list of remote objects from the object store.
@@ -1190,22 +1271,26 @@ def get(objectid, worker=global_worker):
Returns:
A Python object or a list of Python objects.
"""
check_main_thread()
check_connected(worker)
if worker.mode == PYTHON_MODE:
return objectid # In PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objectid)
if isinstance(objectid, list):
values = [worker.get_object(x) for x in objectid]
for i, value in enumerate(values):
with log_span("ray:get", worker=worker):
check_main_thread()
check_connected(worker)
if worker.mode == PYTHON_MODE:
# In PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objectid)
return objectid
if isinstance(objectid, list):
values = [worker.get_object(x) for x in objectid]
for i, value in enumerate(values):
if isinstance(value, RayTaskError):
raise RayGetError(objectid[i], value)
return values
else:
value = worker.get_object(objectid)
if isinstance(value, RayTaskError):
raise RayGetError(objectid[i], value)
return values
value = worker.get_object(objectid)
if isinstance(value, RayTaskError):
# If the result is a RayTaskError, then the task that created this object
# failed, and we should propagate the error message here.
raise RayGetError(objectid, value)
return value
# If the result is a RayTaskError, then the task that created this object
# failed, and we should propagate the error message here.
raise RayGetError(objectid, value)
return value
def put(value, worker=global_worker):
"""Store an object in the object store.
@@ -1216,14 +1301,17 @@ def put(value, worker=global_worker):
Returns:
The object ID assigned to this value.
"""
check_main_thread()
check_connected(worker)
if worker.mode == PYTHON_MODE:
return value # In PYTHON_MODE, ray.put is the identity operation
object_id = photon.compute_put_id(worker.current_task_id, worker.put_index)
worker.put_object(object_id, value)
worker.put_index += 1
return object_id
with log_span("ray:put", worker=worker):
check_main_thread()
check_connected(worker)
if worker.mode == PYTHON_MODE:
# In PYTHON_MODE, ray.put is the identity operation
return value
object_id = photon.compute_put_id(worker.current_task_id, worker.put_index)
worker.put_object(object_id, value)
worker.put_index += 1
return object_id
def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
"""Return a list of IDs that are ready and a list of IDs that are not ready.
@@ -1247,14 +1335,15 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
Returns:
A list of object IDs that are ready and a list of the remaining object IDs.
"""
check_main_thread()
check_connected(worker)
object_id_strs = [object_id.id() for object_id in object_ids]
timeout = timeout if timeout is not None else 2 ** 30
ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs, timeout, num_returns)
ready_ids = [photon.ObjectID(object_id) for object_id in ready_ids]
remaining_ids = [photon.ObjectID(object_id) for object_id in remaining_ids]
return ready_ids, remaining_ids
with log_span("ray:wait", worker=worker):
check_main_thread()
check_connected(worker)
object_id_strs = [object_id.id() for object_id in object_ids]
timeout = timeout if timeout is not None else 2 ** 30
ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs, timeout, num_returns)
ready_ids = [photon.ObjectID(object_id) for object_id in ready_ids]
remaining_ids = [photon.ObjectID(object_id) for object_id in remaining_ids]
return ready_ids, remaining_ids
def wait_for_valid_import_counter(function_id, timeout=5, worker=global_worker):
"""Wait until this worker has imported enough to execute the function.
@@ -1335,14 +1424,20 @@ def main_loop(worker=global_worker):
args = task.arguments()
return_object_ids = task.returns()
function_name = worker.function_names[function_id.id()]
# Get task arguments from the object store.
arguments = get_arguments_for_execution(worker.functions[function_id.id()], args, worker)
with log_span("ray:task:get_arguments", worker=worker):
arguments = get_arguments_for_execution(worker.functions[function_id.id()], args, worker)
# Execute the task.
outputs = worker.functions[function_id.id()].executor(arguments)
with log_span("ray:task:execute", worker=worker):
outputs = worker.functions[function_id.id()].executor(arguments)
# Store the outputs in the local object store.
if len(return_object_ids) == 1:
outputs = (outputs,)
store_outputs_in_objstore(return_object_ids, outputs, worker)
with log_span("ray:task:store_outputs", worker=worker):
if len(return_object_ids) == 1:
outputs = (outputs,)
store_outputs_in_objstore(return_object_ids, outputs, worker)
except Exception as e:
# We determine whether the exception was caused by the call to
# get_arguments_for_execution or by the execution of the remote function
@@ -1370,7 +1465,8 @@ def main_loop(worker=global_worker):
try:
# Reinitialize the values of reusable variables that were used in the task
# above so that changes made to their state do not affect other tasks.
reusables._reinitialize()
with log_span("ray:task:reinitialize_reusables", worker=worker):
reusables._reinitialize()
except Exception as e:
# The attempt to reinitialize the reusable variables threw an exception.
# We record the traceback and notify the scheduler.
@@ -1384,19 +1480,32 @@ def main_loop(worker=global_worker):
check_main_thread()
while True:
task = worker.photon_client.get_task()
with log_span("ray:get_task", worker=worker):
task = worker.photon_client.get_task()
function_id = task.function_id()
# Check that the number of imports we have is at least as great as the
# export counter for the task. If not, wait until we have imported enough.
# We will push warnings to the user if we spend too long in this loop.
wait_for_valid_import_counter(function_id, worker=worker)
with log_span("ray:wait_for_import_counter", worker=worker):
wait_for_valid_import_counter(function_id, worker=worker)
# Execute the task.
# TODO(rkn): Consider acquiring this lock with a timeout and pushing a
# warning to the user if we are waiting too long to acquire the lock because
# that may indicate that the system is hanging, and it'd be good to know
# where the system is hanging.
log(event_type="ray:acquire_lock", kind=LOG_SPAN_START, worker=worker)
with worker.lock:
process_task(task)
log(event_type="ray:acquire_lock", kind=LOG_SPAN_END, worker=worker)
contents = {"function_name": worker.function_names[function_id.id()],
"task_id": task.task_id().hex()}
with log_span("ray:task", contents=contents, worker=worker):
process_task(task)
# Push all of the log events to the global state store.
flush_log()
def push_warning_to_user(message, worker=global_worker):
error_key = "GenericWarning:{}".format(random_string())