From 178346fa167f5ce9994195ca7f74744c1aa17e5a Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 2 Jul 2018 16:10:57 -0700 Subject: [PATCH] Printing messages to stderr (#2312) Move core python code onto logging module. Addressing #1884. --- python/ray/log_monitor.py | 12 ++++---- python/ray/services.py | 62 +++++++++++++++++++++++---------------- python/ray/signature.py | 5 ++-- python/ray/worker.py | 47 +++++++++++++++-------------- 4 files changed, 72 insertions(+), 54 deletions(-) diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index b0f268cd4..34ecfc68d 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -9,6 +9,7 @@ import time from ray.services import get_ip_address from ray.services import get_port +from ray.services import logger class LogMonitor(object): @@ -43,7 +44,7 @@ class LogMonitor(object): "LOG_FILENAMES:{}".format(self.node_ip_address), num_current_log_files, -1) for log_filename in new_log_filenames: - print("Beginning to track file {}".format(log_filename)) + logger.info("Beginning to track file {}".format(log_filename)) assert log_filename not in self.log_files self.log_files[log_filename] = [] @@ -83,11 +84,12 @@ class LogMonitor(object): log_filename, "r") except IOError as e: if e.errno == os.errno.EMFILE: - print("Warning: Ignoring {} because there are too " - "many open files.".format(log_filename)) + logger.warning( + "Warning: Ignoring {} because there are too " + "many open files.".format(log_filename)) elif e.errno == os.errno.ENOENT: - print("Warning: The file {} was not " - "found.".format(log_filename)) + logger.warning("Warning: The file {} was not " + "found.".format(log_filename)) else: raise e diff --git a/python/ray/services.py b/python/ray/services.py index 2bbe21644..4d4624753 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import binascii +import logging import json import os import random @@ -27,6 +28,12 @@ import ray.global_scheduler as global_scheduler import ray.local_scheduler import ray.plasma +logger = logging.getLogger("ray") +logger.setLevel(logging.INFO) +ch = logging.StreamHandler(sys.stderr) +ch.setLevel(logging.INFO) +logger.addHandler(ch) + PROCESS_TYPE_MONITOR = "monitor" PROCESS_TYPE_LOG_MONITOR = "log_monitor" PROCESS_TYPE_WORKER = "worker" @@ -178,7 +185,7 @@ def cleanup(): # Reset the list of processes of this type. all_processes[process_type] = [] if not successfully_shut_down: - print("Ray did not shut down properly.") + logger.warning("Ray did not shut down properly.") def all_processes_alive(exclude=[]): @@ -193,7 +200,8 @@ def all_processes_alive(exclude=[]): # alive. processes_alive = [p.poll() is None for p in processes] if (not all(processes_alive) and process_type not in exclude): - print("A process of type {} has died.".format(process_type)) + logger.warning( + "A process of type {} has died.".format(process_type)) return False return True @@ -303,13 +311,14 @@ def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5): while counter < num_retries: try: # Run some random command and see if it worked. - print("Waiting for redis server at {}:{} to respond...".format( - redis_ip_address, redis_port)) + logger.info( + "Waiting for redis server at {}:{} to respond...".format( + redis_ip_address, redis_port)) redis_client.client_list() except redis.ConnectionError as e: # Wait a little bit. time.sleep(1) - print("Failed to connect to the redis server, retrying.") + logger.info("Failed to connect to the redis server, retrying.") counter += 1 else: break @@ -393,7 +402,7 @@ def check_version_info(redis_client): if version_info[:2] != true_version_info[:2]: raise Exception(error_message) else: - print(error_message) + logger.warning(error_message) def start_redis(node_ip_address, @@ -601,7 +610,7 @@ def _start_redis_instance(node_ip_address="127.0.0.1", while counter < num_retries: if counter > 0: - print("Redis failed to start, retrying now.") + logger.warning("Redis failed to start, retrying now.") command = [executable, "--port", str(port), "--loglevel", "warning"] + load_module_args p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) @@ -786,16 +795,16 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True): stdout=stdout_file, stderr=stderr_file) except Exception: - print("Failed to start the UI, you may need to run " - "'pip install jupyter'.") + logger.warning("Failed to start the UI, you may need to run " + "'pip install jupyter'.") else: if cleanup: all_processes[PROCESS_TYPE_WEB_UI].append(ui_process) webui_url = ("http://localhost:{}/notebooks/ray_ui{}.ipynb?token={}" .format(port, random_ui_id, token)) - print("\n" + "=" * 70) - print("View the web UI at {}".format(webui_url)) - print("=" * 70 + "\n") + logger.info("\n" + "=" * 70) + logger.info("View the web UI at {}".format(webui_url)) + logger.info("=" * 70 + "\n") return webui_url @@ -846,8 +855,8 @@ def check_and_update_resources(resources, use_raylet): if (use_raylet and resource_quantity > ray.ray_constants.MAX_RESOURCE_QUANTITY): - raise ValueError("Resource quantities must be at most {}." - .format(ray.ray_constants.MAX_RESOURCE_QUANTITY)) + raise ValueError("Resource quantities must be at most {}.".format( + ray.ray_constants.MAX_RESOURCE_QUANTITY)) return resources @@ -892,8 +901,8 @@ def start_local_scheduler(redis_address, """ resources = check_and_update_resources(resources, False) - print("Starting local scheduler with the following resources: {}." - .format(resources)) + logger.info("Starting local scheduler with the following resources: {}." + .format(resources)) local_scheduler_name, p = ray.local_scheduler.start_local_scheduler( plasma_store_name, plasma_manager_name, @@ -1049,12 +1058,13 @@ def start_objstore(node_ip_address, # blocks. shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail if objstore_memory > shm_avail: - print("Warning: Reducing object store memory because " - "/dev/shm has only {} bytes available. You may be " - "able to free up space by deleting files in " - "/dev/shm. If you are inside a Docker container, " - "you may need to pass an argument with the flag " - "'--shm-size' to 'docker run'.".format(shm_avail)) + logger.warning( + "Warning: Reducing object store memory because " + "/dev/shm has only {} bytes available. You may be " + "able to free up space by deleting files in " + "/dev/shm. If you are inside a Docker container, " + "you may need to pass an argument with the flag " + "'--shm-size' to 'docker run'.".format(shm_avail)) objstore_memory = int(shm_avail * 0.8) finally: os.close(shm_fd) @@ -1295,7 +1305,8 @@ def start_ray_processes(address_info=None, A dictionary of the address information for the processes that were started. """ - print("Process STDOUT and STDERR is being redirected to /tmp/raylogs/.") + logger.info( + "Process STDOUT and STDERR is being redirected to /tmp/raylogs/.") if resources is None: resources = {} @@ -1712,8 +1723,9 @@ def try_to_create_directory(directory_path): except OSError as e: if e.errno != os.errno.EEXIST: raise e - print("Attempted to create '{}', but the directory already " - "exists.".format(directory_path)) + logger.warning( + "Attempted to create '{}', but the directory already " + "exists.".format(directory_path)) # Change the log directory permissions so others can use it. This is # important when multiple people are using the same machine. os.chmod(directory_path, 0o0777) diff --git a/python/ray/signature.py b/python/ray/signature.py index 9bd9a881a..4e771284a 100644 --- a/python/ray/signature.py +++ b/python/ray/signature.py @@ -6,6 +6,7 @@ from collections import namedtuple import funcsigs from funcsigs import Parameter +from ray.services import logger from ray.utils import is_cython FunctionSignature = namedtuple("FunctionSignature", [ @@ -99,7 +100,7 @@ def check_signature_supported(func, warn=False): message = ("The function {} has a **kwargs argument, which is " "currently not supported.".format(function_name)) if warn: - print(message) + logger.warning(message) else: raise Exception(message) @@ -108,7 +109,7 @@ def check_signature_supported(func, warn=False): "(defined after * or *args), which is currently " "not supported.".format(function_name)) if warn: - print(message) + logger.warning(message) else: raise Exception(message) diff --git a/python/ray/worker.py b/python/ray/worker.py index 3dd78cfd7..515d65d16 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -26,6 +26,7 @@ import ray.gcs_utils import ray.remote_function import ray.serialization as serialization import ray.services as services +from ray.services import logger import ray.signature import ray.local_scheduler import ray.plasma @@ -310,7 +311,7 @@ class Worker(object): "of their fields. This behavior may " "be incorrect in some cases.".format( type(e.example_object))) - print(warning_message) + logger.warning(warning_message) except (serialization.RayNotDictionarySerializable, serialization.CloudPickleError, pickle.pickle.PicklingError, Exception): @@ -324,7 +325,7 @@ class Worker(object): "using pickle. This may be " "inefficient.".format( type(e.example_object))) - print(warning_message) + logger.warning(warning_message) except serialization.CloudPickleError: register_custom_serializer( type(e.example_object), @@ -335,7 +336,7 @@ class Worker(object): "and only registering the class " "locally.".format( type(e.example_object))) - print(warning_message) + logger.warning(warning_message) def put_object(self, object_id, value): """Put value in the local object store with object id objectid. @@ -371,8 +372,9 @@ class Worker(object): # and make sure that the objects are in fact the same. We also # should return an error code to the caller instead of printing a # message. - print("The object with ID {} already exists in the object store." - .format(object_id)) + logger.info( + "The object with ID {} already exists in the object store." + .format(object_id)) def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10): start_time = time.time() @@ -1153,7 +1155,7 @@ def print_failed_task(task_status): task_status (Dict): A dictionary containing the name, operationid, and error message for a failed task. """ - print(""" + logger.error(""" Error: Task failed Function Name: {} Task ID: {} @@ -1381,9 +1383,10 @@ def get_address_info_from_redis(redis_address, raise # Some of the information may not be in Redis yet, so wait a little # bit. - print("Some processes that the driver needs to connect to have " - "not registered with Redis, so retrying. Have you run " - "'ray start' on this node?") + logger.warning( + "Some processes that the driver needs to connect to have " + "not registered with Redis, so retrying. Have you run " + "'ray start' on this node?") time.sleep(1) counter += 1 @@ -1516,7 +1519,7 @@ def _init(address_info=None, if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": # This environment variable is used in our testing setup. - print("Detected environment variable 'RAY_USE_XRAY'.") + logger.info("Detected environment variable 'RAY_USE_XRAY'.") use_raylet = True # Get addresses of existing services. @@ -1716,7 +1719,7 @@ def init(redis_address=None, """ if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": # This environment variable is used in our testing setup. - print("Detected environment variable 'RAY_USE_XRAY'.") + logger.info("Detected environment variable 'RAY_USE_XRAY'.") use_raylet = True # Convert hostnames to numerical IP address. @@ -1837,10 +1840,10 @@ def print_error_messages_raylet(worker): error_messages = global_state.error_messages(worker.task_driver_id) for error_message in error_messages: if error_message not in old_error_messages: - print(error_message) + logger.error(error_message) old_error_messages.add(error_message) else: - print("Suppressing duplicate error message.") + logger.error("Suppressing duplicate error message.") try: for msg in worker.error_message_pubsub_client.listen(): @@ -1858,10 +1861,10 @@ def print_error_messages_raylet(worker): error_message = error_data.ErrorMessage().decode("ascii") if error_message not in old_error_messages: - print(error_message) + logger.error(error_message) old_error_messages.add(error_message) else: - print("Suppressing duplicate error message.") + logger.error("Suppressing duplicate error message.") except redis.ConnectionError: # When Redis terminates the listen call will throw a ConnectionError, @@ -1900,10 +1903,10 @@ def print_error_messages(worker): error_message = worker.redis_client.hget( error_key, "message").decode("ascii") if error_message not in old_error_messages: - print(error_message) + logger.error(error_message) old_error_messages.add(error_message) else: - print("Suppressing duplicate error message.") + logger.error("Suppressing duplicate error message.") num_errors_received += 1 try: @@ -1915,10 +1918,11 @@ def print_error_messages(worker): error_message = worker.redis_client.hget( error_key, "message").decode("ascii") if error_message not in old_error_messages: - print(error_message) + logger.error(error_message) old_error_messages.add(error_message) else: - print("Suppressing duplicate error message.") + logger.error( + "Suppressing duplicate error message.") num_errors_received += 1 except redis.ConnectionError: # When Redis terminates the listen call will throw a ConnectionError, @@ -2414,10 +2418,9 @@ def _try_to_compute_deterministic_class_id(cls, depth=5): # We have not reached a fixed point, so we may end up with a different # class ID for this custom class on each worker, which could lead to the # same class definition being exported many many times. - print( + logger.warning( "WARNING: Could not produce a deterministic class ID for class " - "{}".format(cls), - file=sys.stderr) + "{}".format(cls)) return hashlib.sha1(new_class_id).digest()