From 352e5e1dd22a606fbd91defe895cf8fd804d1ab8 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 29 Jul 2016 12:40:45 -0700 Subject: [PATCH] use Ray specific logger so logging does not interfere with other python modules that use the logging module (#321) --- lib/python/ray/worker.py | 41 ++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index afc7c84cd..df16fb7d6 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -397,7 +397,7 @@ class Worker(object): Args: function (Callable): The remote function that this worker can execute. """ - logging.info("Registering function {}.".format(function.func_name)) + _logger().info("Registering function {}.".format(function.func_name)) ray.lib.register_function(self.handle, function.func_name, len(function.return_types)) self.functions[function.func_name] = function @@ -437,6 +437,9 @@ reinitialize these variables after they are used so that changes to their state made by one task do not affect other tasks. """ +logger = logging.getLogger("ray") +"""Logger: The logging object for the Python worker code.""" + def check_connected(worker=global_worker): """Check if the worker is connected. @@ -542,11 +545,11 @@ def register_module(module, worker=global_worker): module (module): The module of functions to register. """ check_connected(worker) - logging.info("registering functions in module {}.".format(module.__name__)) + _logger().info("registering functions in module {}.".format(module.__name__)) for name in dir(module): val = getattr(module, name) if hasattr(val, "is_remote") and val.is_remote: - logging.info("registering {}.".format(val.func_name)) + _logger().info("registering {}.".format(val.func_name)) worker.register_function(val) def connect(scheduler_address, objstore_address, worker_address, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE): @@ -569,7 +572,16 @@ def connect(scheduler_address, objstore_address, worker_address, is_driver=False worker.handle = ray.lib.create_worker(worker.scheduler_address, worker.objstore_address, worker.worker_address, is_driver) worker.set_mode(mode) FORMAT = "%(asctime)-15s %(message)s" - logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=ray.config.get_log_file_path("-".join(["worker", worker_address]) + ".log")) + # Configure the Python logging module. Note that if we do not provide our own + # logger, then our logging will interfere with other Python modules that also + # use the logging module. + log_handler = logging.FileHandler(ray.config.get_log_file_path("-".join(["worker", worker_address]) + ".log")) + log_handler.setLevel(logging.DEBUG) + log_handler.setFormatter(logging.Formatter(FORMAT)) + _logger().addHandler(log_handler) + _logger().setLevel(logging.DEBUG) + _logger().propagate = False + # Configure the logging from the worker C++ code. ray.lib.set_log_config(ray.config.get_log_file_path("-".join(["worker", worker_address, "c++"]) + ".log")) if mode in [ray.SHELL_MODE, ray.SCRIPT_MODE, ray.SILENT_MODE]: for function_to_export in worker.cached_remote_functions: @@ -721,7 +733,7 @@ def main_loop(worker=global_worker): failure_objects = [RayFailedObject(exception_message) for _ in range(len(return_objrefs))] store_outputs_in_objstore(return_objrefs, failure_objects, worker) ray.lib.notify_task_completed(worker.handle, False, exception_message) # notify the scheduler that the task threw an exception - logging.info("Worker threw exception with message: \n\n{}\n, while running function {}.".format(exception_message, func_name)) + _logger().info("Worker threw exception with message: \n\n{}\n, while running function {}.".format(exception_message, func_name)) else: store_outputs_in_objstore(return_objrefs, outputs, worker) # store output in local object store ray.lib.notify_task_completed(worker.handle, True, "") # notify the scheduler that the task completed successfully @@ -773,6 +785,15 @@ def _mode(worker=global_worker): """ return worker.mode +def _logger(): + """Return the logger object. + + We use this wrapper because so that functions which do logging can be pickled. + Normally a logger object is specific to a machine (it opens a local file), and + so cannot be pickled. + """ + return logger + def _export_reusable_variable(name, reusable, worker=global_worker): """Export a reusable variable to the workers. This is only called by a driver. @@ -811,12 +832,12 @@ def remote(arg_types, return_types, worker=global_worker): return objrefs def func_executor(arguments): """This gets run when the remote function is executed.""" - logging.info("Calling function {}".format(func.__name__)) + _logger().info("Calling function {}".format(func.__name__)) start_time = time.time() result = func(*arguments) end_time = time.time() check_return_values(func_call, result) # throws an exception if result is invalid - logging.info("Finished executing function {}, it took {} seconds".format(func.__name__, end_time - start_time)) + _logger().info("Finished executing function {}, it took {} seconds".format(func.__name__, end_time - start_time)) return result func_call.executor = func_executor func_call.arg_types = arg_types @@ -1009,9 +1030,9 @@ def get_arguments_for_execution(function, args, worker=global_worker): if isinstance(arg, ray.lib.ObjRef): # get the object from the local object store - logging.info("Getting argument {} for function {}.".format(i, function.__name__)) + _logger().info("Getting argument {} for function {}.".format(i, function.__name__)) argument = worker.get_object(arg) - logging.info("Successfully retrieved argument {} for function {}.".format(i, function.__name__)) + _logger().info("Successfully retrieved argument {} for function {}.".format(i, function.__name__)) else: # pass the argument by value argument = arg @@ -1043,7 +1064,7 @@ def store_outputs_in_objstore(objrefs, outputs, worker=global_worker): for i in range(len(objrefs)): if isinstance(outputs[i], ray.lib.ObjRef): # An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to - logging.info("Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val)) + _logger().info("Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val)) worker.alias_objrefs(objrefs[i], outputs[i]) pass else: