Propagate error messages from functions that run on all workers. (#410)

This commit is contained in:
Robert Nishihara
2016-09-06 10:06:43 -07:00
committed by Philipp Moritz
parent 327d7ff689
commit d5cb3ac090
9 changed files with 55 additions and 8 deletions
+11 -6
View File
@@ -462,7 +462,7 @@ class Worker(object):
not be used.
"""
# First run the function on the driver.
function()
function(self)
# Then run the function on all of the workers.
raylib.run_function_on_all_workers(self.handle, pickling.dumps(function))
@@ -649,6 +649,7 @@ def print_error_messages(worker=global_worker):
num_failed_remote_function_imports = 0
num_failed_reusable_variable_imports = 0
num_failed_reusable_variable_reinitializations = 0
num_failed_function_to_runs = 0
while True:
try:
info = task_info(worker=worker)
@@ -668,6 +669,9 @@ def print_error_messages(worker=global_worker):
for error in info["failed_reinitialize_reusable_variables"][num_failed_reusable_variable_reinitializations:]:
print error["error_message"]
num_failed_reusable_variable_reinitializations = len(info["failed_reinitialize_reusable_variables"])
for error in info["failed_function_to_runs"][num_failed_function_to_runs:]:
print error["error_message"]
num_failed_function_to_runs = len(info["failed_function_to_runs"])
except RayConnectionError:
# When the driver is exiting, we set worker.handle to None, which will cause
# the check_connected call inside of task_info to raise an exception. We use
@@ -731,8 +735,8 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, worker=gl
# the same.
script_directory = os.path.abspath(os.path.dirname(sys.argv[0]))
current_directory = os.path.abspath(os.path.curdir)
worker.run_function_on_all_workers(lambda : sys.path.insert(1, script_directory))
worker.run_function_on_all_workers(lambda : sys.path.insert(1, current_directory))
worker.run_function_on_all_workers(lambda worker : sys.path.insert(1, script_directory))
worker.run_function_on_all_workers(lambda worker : sys.path.insert(1, current_directory))
# Export cached remote functions to the workers.
for function_name, function_to_export in worker.cached_remote_functions:
raylib.export_remote_function(worker.handle, function_name, function_to_export)
@@ -986,14 +990,15 @@ def main_loop(worker=global_worker):
# Deserialize the function.
function = pickling.loads(serialized_function)
# Run the function.
function()
function(worker)
except:
# If an exception was thrown when the function was run, we record the
# traceback and notify the scheduler of the failure.
traceback_str = format_error_message(traceback.format_exc())
traceback_str = traceback.format_exc()
_logger().info("Failed to run function on worker. Failed with message: \n\n{}\n".format(traceback_str))
# Notify the scheduler that running the function failed.
# TODO(rkn): Notify the scheduler.
name = function.__name__ if "function" in locals() and hasattr(function, "__name__") else ""
raylib.notify_failure(worker.handle, name, traceback_str, raylib.FailedFunctionToRun)
else:
_logger().info("Successfully ran function on worker.")