diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 667c8451c..34cb3d837 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -13,6 +13,6 @@ import config import serialization from worker import scheduler_info, visualize_computation_graph, task_info, register_module, init, connect, disconnect, get, put, remote, kill_workers, restart_workers_local from worker import Reusable, reusables -from worker import SCRIPT_MODE, WORKER_MODE, SHELL_MODE, PYTHON_MODE, SILENT_MODE +from worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE from libraylib import ObjectID import internal diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index e6d293d2c..593f91581 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -25,9 +25,8 @@ import libraylib as raylib # task failures. SCRIPT_MODE = 0 WORKER_MODE = 1 -SHELL_MODE = 2 -PYTHON_MODE = 3 -SILENT_MODE = 4 # This is only used during testing. +PYTHON_MODE = 2 +SILENT_MODE = 3 # This is only used during testing. class RayFailedObject(object): """An object used internally to represent a task that threw an exception. @@ -235,7 +234,7 @@ class RayReusables(object): raise Exception("To set a reusable variable, you must pass in a Reusable object") self._names.add(name) self._reusables[name] = reusable - if _mode() in [SHELL_MODE, SCRIPT_MODE, SILENT_MODE]: + if _mode() in [SCRIPT_MODE, SILENT_MODE]: _export_reusable_variable(name, reusable) elif _mode() is None: self._cached_reusables.append((name, reusable)) @@ -261,13 +260,16 @@ class Worker(object): function to the remote function itself. This is the set of remote functions that can be executed by this worker. handle (worker capsule): A Python object wrapping a C++ Worker object. - mode: The mode of the worker. One of SCRIPT_MODE, SHELL_MODE, PYTHON_MODE, - SILENT_MODE, and WORKER_MODE. + mode: The mode of the worker. One of SCRIPT_MODE, PYTHON_MODE, SILENT_MODE, + and WORKER_MODE. cached_remote_functions (List[str]): A list of serialized remote functions that were defined before the worker called connect. When the worker eventually does call connect, if it is a driver, it will export these functions to the scheduler. If cached_remote_functions is None, that means that connect has been called. + num_failed_tasks (int): The number of tasks that have failed and whose error + messages have been displayed to the user. We use this value to know when + a failed task hasn't been seen by the user and should be displayed. """ def __init__(self): @@ -276,16 +278,14 @@ class Worker(object): self.handle = None self.mode = None self.cached_remote_functions = [] + self.num_failed_tasks = 0 def set_mode(self, mode): """Set the mode of the worker. The mode SCRIPT_MODE should be used if this Worker is a driver that is being - run as a Python script. It will print information about task failures. - - The mode SHELL_MODE should be used if this Worker is a driver that is being - run interactively in a Python shell. It will print information about task - failures and successes. + run as a Python script or interactively in a shell. It will print + information about task failures. The mode WORKER_MODE should be used if this Worker is not a driver. It will not print information about tasks. @@ -299,8 +299,7 @@ class Worker(object): any information about errors because some of the tests intentionally fail. args: - mode: One of SCRIPT_MODE, WORKER_MODE, SHELL_MODE, PYTHON_MODE, and - SILENT_MODE. + mode: One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. """ self.mode = mode colorama.init() @@ -428,10 +427,23 @@ class Worker(object): """ task_capsule = serialization.serialize_task(self.handle, func_name, args) objectids = raylib.submit_task(self.handle, task_capsule) - if self.mode in [SHELL_MODE, SCRIPT_MODE]: - print_task_info(raylib.task_info(self.handle), self.mode) + if self.mode == SCRIPT_MODE: + self.print_new_failures() return objectids + def print_new_failures(self): + """Print information about tasks.""" + task_data = raylib.task_info(self.handle) + num_tasks_succeeded = task_data["num_succeeded"] + num_tasks_in_progress = len(task_data["running_tasks"]) + num_new_tasks_failed = len(task_data["failed_tasks"]) - self.num_failed_tasks + if num_new_tasks_failed > 0: + # Print the new tasks that have failed. + for task_status in task_data["failed_tasks"][self.num_failed_tasks:]: + print_failed_task(task_status) + print "{}Error: {} new task{} failed.{}".format(colorama.Fore.RED, num_new_tasks_failed, "s" if num_new_tasks_failed > 1 else "", colorama.Fore.RESET) + self.num_failed_tasks = len(task_data["failed_tasks"]) + global_worker = Worker() """Worker: The global Worker object for this worker process. @@ -475,32 +487,6 @@ def print_failed_task(task_status): Error Message: \n{} """.format(task_status["function_name"], task_status["operationid"], task_status["error_message"]) -def print_task_info(task_data, mode): - """Print information about tasks. - - Args: - task_data (Dict): A dictionary containing information about tasks that have - failed, succeeded, or are still running. - mode: The mode of the Worker object. - """ - num_tasks_succeeded = task_data["num_succeeded"] - num_tasks_in_progress = len(task_data["running_tasks"]) - num_tasks_failed = len(task_data["failed_tasks"]) - if num_tasks_failed > 0: - for task_status in task_data["failed_tasks"]: - print_failed_task(task_status) - print "Error: {} task{} failed.".format(num_tasks_failed, "s" if num_tasks_failed > 1 else "") - if mode == SHELL_MODE: - info_strings = [] - if num_tasks_succeeded > 0: - info_strings.append("{}{} task{} succeeded{}".format(colorama.Fore.BLUE, num_tasks_succeeded, "s" if num_tasks_succeeded > 1 else "", colorama.Fore.RESET)) - if num_tasks_in_progress > 0: - info_strings.append("{}{} task{} in progress{}".format(colorama.Fore.GREEN, num_tasks_in_progress, "s" if num_tasks_in_progress > 1 else "", colorama.Fore.RESET)) - if num_tasks_failed > 0: - info_strings.append("{}{} task{} failed{}".format(colorama.Fore.RED, num_tasks_failed, "s" if num_tasks_failed > 1 else "", colorama.Fore.RESET)) - if len(info_strings) > 0: - print ", ".join(info_strings) - def scheduler_info(worker=global_worker): """Return information about the state of the scheduler.""" check_connected(worker) @@ -587,7 +573,7 @@ def init(start_ray_local=False, num_workers=None, num_objstores=1, scheduler_add driver_address (Optional[str]): The address of this driver if start_ray_local is False. driver_mode (Optional[bool]): The mode in which to start the driver. This - should be one of SCRIPT_MODE, SHELL_MODE, PYTHON_MODE, and SILENT_MODE. + should be one of SCRIPT_MODE, PYTHON_MODE, and SILENT_MODE. raises: Exception: An exception is raised if an inappropriate combination of @@ -598,8 +584,8 @@ def init(start_ray_local=False, num_workers=None, num_objstores=1, scheduler_add # and we connect to them. if (scheduler_address is not None) or (objstore_address is not None) or (driver_address is not None): raise Exception("If start_ray_local=True, then you cannot pass in a scheduler_address, objstore_address, or worker_address.") - if driver_mode not in [SCRIPT_MODE, SHELL_MODE, PYTHON_MODE, SILENT_MODE]: - raise Exception("If start_ray_local=True, then driver_mode must be in [SCRIPT_MODE, SHELL_MODE, PYTHON_MODE, SILENT_MODE].") + if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]: + raise Exception("If start_ray_local=True, then driver_mode must be in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE].") num_workers = 1 if num_workers is None else num_workers # Start the scheduler, object store, and some workers. These will be killed # by the call to cleanup(), which happens when the Python script exits. @@ -641,8 +627,8 @@ def connect(scheduler_address, objstore_address, worker_address, is_driver=False worker_address (str): The ip address and port of this worker. The port can be chosen arbitrarily. is_driver (bool): True if this worker is a driver and false otherwise. - mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, SHELL_MODE, - PYTHON_MODE, and SILENT_MODE. + mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, + and SILENT_MODE. """ if hasattr(worker, "handle"): del worker.handle @@ -663,7 +649,7 @@ def connect(scheduler_address, objstore_address, worker_address, is_driver=False _logger().propagate = False # Configure the logging from the worker C++ code. raylib.set_log_config(config.get_log_file_path("-".join(["worker", worker_address, "c++"]) + ".log")) - if mode in [SHELL_MODE, SCRIPT_MODE, SILENT_MODE]: + if mode in [SCRIPT_MODE, SILENT_MODE]: for function_to_export in worker.cached_remote_functions: raylib.export_function(worker.handle, function_to_export) for name, reusable_variable in reusables._cached_reusables: @@ -700,8 +686,8 @@ def get(objectid, worker=global_worker): if worker.mode == PYTHON_MODE: return objectid # In PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objectid) raylib.request_object(worker.handle, objectid) - if worker.mode in [SHELL_MODE, SCRIPT_MODE]: - print_task_info(raylib.task_info(worker.handle), worker.mode) + if worker.mode == SCRIPT_MODE: + worker.print_new_failures() value = worker.get_object(objectid) if isinstance(value, RayFailedObject): raise Exception("The task that created this object ID failed with error message:\n{}".format(value.error_message)) @@ -721,8 +707,8 @@ def put(value, worker=global_worker): return value # In PYTHON_MODE, ray.put is the identity operation objectid = raylib.get_objectid(worker.handle) worker.put_object(objectid, value) - if worker.mode in [SHELL_MODE, SCRIPT_MODE]: - print_task_info(raylib.task_info(worker.handle), worker.mode) + if worker.mode == SCRIPT_MODE: + worker.print_new_failures() return objectid def kill_workers(worker=global_worker): @@ -882,7 +868,7 @@ def _export_reusable_variable(name, reusable, worker=global_worker): reusable (Reusable): The reusable object containing code for initializing and reinitializing the variable. """ - if _mode(worker) not in [SHELL_MODE, SCRIPT_MODE, SILENT_MODE]: + if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: raise Exception("_export_reusable_variable can only be called on a driver.") raylib.export_reusable_variable(worker.handle, name, pickling.dumps(reusable.initializer), pickling.dumps(reusable.reinitializer)) @@ -938,7 +924,7 @@ def remote(arg_types, return_types, worker=global_worker): check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, func_name) # Everything ready - export the function - if worker.mode in [None, SHELL_MODE, SCRIPT_MODE, SILENT_MODE]: + if worker.mode in [None, SCRIPT_MODE, SILENT_MODE]: func_name_global_valid = func.__name__ in func.__globals__ func_name_global_value = func.__globals__.get(func.__name__) # Set the function globally to make it refer to itself @@ -949,7 +935,7 @@ def remote(arg_types, return_types, worker=global_worker): # Undo our changes if func_name_global_valid: func.__globals__[func.__name__] = func_name_global_value else: del func.__globals__[func.__name__] - if worker.mode in [SHELL_MODE, SCRIPT_MODE, SILENT_MODE]: + if worker.mode in [SCRIPT_MODE, SILENT_MODE]: raylib.export_function(worker.handle, to_export) elif worker.mode is None: worker.cached_remote_functions.append(to_export)