From abaa4211d49760503f3fa6611cbec6ec3e2d2405 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 24 Jun 2016 13:22:29 -0700 Subject: [PATCH] better printing of errors (#153) --- lib/python/ray/__init__.py | 7 +++++ lib/python/ray/services.py | 6 +++-- lib/python/ray/worker.py | 53 ++++++++++++++++++++++++-------------- test/runtest.py | 2 +- 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index ec5faea2e..5a43889bd 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -1,3 +1,10 @@ +# These three constants are used to define the mode that a worker is running in. +# Right now, this is only used for determining how to print information about +# task failures. +SCRIPT_MODE = 0 +WORKER_MODE = 1 +SHELL_MODE = 2 + import libraylib as lib import serialization from worker import scheduler_info, task_info, register_module, connect, disconnect, get, put, remote diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 899902182..264ed63c2 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -104,7 +104,9 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) time.sleep(0.5) -def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None, print_task_info=False): +# driver_mode should equal ray.SCRIPT_MODE if this is being run in a script and +# ray.SHELL_MODE if it is being used interactively in a shell. +def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE): global drivers if num_workers_per_objstore > 0 and worker_path is None: raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore)) @@ -134,5 +136,5 @@ def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_ time.sleep(0.5) return driver_workers else: - ray.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port()), print_task_info=print_task_info) + ray.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port()), mode=driver_mode) time.sleep(0.5) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 11d9fa9fe..db384e48e 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -29,8 +29,8 @@ class Worker(object): self.functions = {} self.handle = None - def set_print_task_info(self, print_task_info): - self.print_task_info = print_task_info + def set_mode(self, mode): + self.mode = mode colorama.init() def put_object(self, objref, value): @@ -93,27 +93,41 @@ class Worker(object): """Tell the scheduler to schedule the execution of the function with name `func_name` with arguments `args`. Retrieve object references for the outputs of the function from the scheduler and immediately return them.""" task_capsule = serialization.serialize_task(self.handle, func_name, args) objrefs = ray.lib.submit_task(self.handle, task_capsule) - if self.print_task_info: - print_task_info(ray.lib.task_info(self.handle)) + if self.mode == ray.SHELL_MODE or self.mode == ray.SCRIPT_MODE: + print_task_info(ray.lib.task_info(self.handle), self.mode) return objrefs # We make `global_worker` a global variable so that there is one worker per worker process. global_worker = Worker() # This is a helper method. It should not be called by users. -def print_task_info(task_data): +def print_failed_task(task_status): + print """ + Error: Task failed + Function Name: {} + Task ID: {} + Error Message: {} + """.format(task_status["function_name"], task_status["operationid"], task_status["error_message"]) + +# This is a helper method. It should not be called by users. +def print_task_info(task_data, mode): num_tasks_succeeded = task_data["num_succeeded"] num_tasks_in_progress = len(task_data["running_tasks"]) num_tasks_failed = len(task_data["failed_tasks"]) - info_strings = [] - if num_tasks_succeeded > 0: - info_strings.append("{}{} task{} succeeded{}".format(colorama.Fore.BLUE, num_tasks_succeeded, "s" if num_tasks_succeeded > 1 else "", colorama.Fore.RESET)) - if num_tasks_in_progress > 0: - info_strings.append("{}{} task{} in progress{}".format(colorama.Fore.GREEN, num_tasks_in_progress, "s" if num_tasks_in_progress > 1 else "", colorama.Fore.RESET)) if num_tasks_failed > 0: - info_strings.append("{}{} task{} failed{}".format(colorama.Fore.RED, num_tasks_failed, "s" if num_tasks_failed > 1 else "", colorama.Fore.RESET)) - if len(info_strings) > 0: - print ", ".join(info_strings) + for task_status in task_data["failed_tasks"]: + print_failed_task(task_status) + print "Error: {} task{} failed.".format(num_tasks_failed, "s" if num_tasks_failed > 1 else "") + if mode == ray.SHELL_MODE: + info_strings = [] + if num_tasks_succeeded > 0: + info_strings.append("{}{} task{} succeeded{}".format(colorama.Fore.BLUE, num_tasks_succeeded, "s" if num_tasks_succeeded > 1 else "", colorama.Fore.RESET)) + if num_tasks_in_progress > 0: + info_strings.append("{}{} task{} in progress{}".format(colorama.Fore.GREEN, num_tasks_in_progress, "s" if num_tasks_in_progress > 1 else "", colorama.Fore.RESET)) + if num_tasks_failed > 0: + info_strings.append("{}{} task{} failed{}".format(colorama.Fore.RED, num_tasks_failed, "s" if num_tasks_failed > 1 else "", colorama.Fore.RESET)) + if len(info_strings) > 0: + print ", ".join(info_strings) def scheduler_info(worker=global_worker): return ray.lib.scheduler_info(worker.handle); @@ -132,7 +146,7 @@ def register_module(module, recursive=False, worker=global_worker): # elif recursive and isinstance(val, ModuleType): # register_module(val, recursive, worker) -def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker, print_task_info=False): +def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker, mode=ray.WORKER_MODE): if hasattr(worker, "handle"): del worker.handle worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr) @@ -140,22 +154,22 @@ def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker, pr log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_addr)) logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=log_basename + ".log") ray.lib.set_log_config(log_basename + "-c++.log") - worker.set_print_task_info(print_task_info) + worker.set_mode(mode) def disconnect(worker=global_worker): ray.lib.disconnect(worker.handle) def get(objref, worker=global_worker): ray.lib.request_object(worker.handle, objref) - if worker.print_task_info: - print_task_info(ray.lib.task_info(worker.handle)) + if worker.mode == ray.SHELL_MODE or worker.mode == ray.SCRIPT_MODE: + print_task_info(ray.lib.task_info(worker.handle), worker.mode) return worker.get_object(objref) def put(value, worker=global_worker): objref = ray.lib.get_objref(worker.handle) worker.put_object(objref, value) - if worker.print_task_info: - print_task_info(ray.lib.task_info(worker.handle)) + if worker.mode == ray.SHELL_MODE or worker.mode == ray.SCRIPT_MODE: + print_task_info(ray.lib.task_info(worker.handle), worker.mode) return objref def main_loop(worker=global_worker): @@ -169,6 +183,7 @@ def main_loop(worker=global_worker): outputs = worker.functions[func_name].executor(arguments) # execute the function except Exception as e: ray.lib.notify_task_completed(worker.handle, False, str(e)) # notify the scheduler that the task threw an exception + logging.info("Worker through exception with message: {}, while running function {}.".format(str(e), func_name)) else: store_outputs_in_objstore(return_objrefs, outputs, worker) # store output in local object store ray.lib.notify_task_completed(worker.handle, True, "") # notify the scheduler that the task completed successfully diff --git a/test/runtest.py b/test/runtest.py index 5a3f82c0d..c0c1a0c5a 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -241,7 +241,7 @@ class APITest(unittest.TestCase): class TaskStatusTest(unittest.TestCase): def testFailedTask(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) + services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path, driver_mode=ray.WORKER_MODE) test_functions.test_alias_f() test_functions.throw_exception_fct() test_functions.throw_exception_fct()