diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index fb577d902..13a62a98a 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -3,15 +3,21 @@ from __future__ import division from __future__ import print_function import argparse +import logging import os import redis import time +import ray.ray_constants as ray_constants from ray.services import get_ip_address from ray.services import get_port -from ray.services import logger import ray.utils +# Logger for this module. It should be configured at the entry point +# into the program using Ray. Ray configures it by default automatically +# using logging.basicConfig in its entry/init points. +logger = logging.getLogger(__name__) + class LogMonitor(object): """A monitor process for monitoring Ray log files. @@ -124,7 +130,23 @@ if __name__ == "__main__": required=True, type=str, help="The IP address of the node this process is on.") + parser.add_argument( + "--logging-level", + required=False, + type=str, + default=ray_constants.LOGGER_LEVEL, + choices=ray_constants.LOGGER_LEVEL_CHOICES, + help=ray_constants.LOGGER_LEVEL_HELP) + parser.add_argument( + "--logging-format", + required=False, + type=str, + default=ray_constants.LOGGER_FORMAT, + help=ray_constants.LOGGER_FORMAT_HELP) args = parser.parse_args() + logging.basicConfig( + level=logging.getLevelName(args.logging_level.upper()), + format=args.logging_format) redis_ip_address = get_ip_address(args.redis_address) redis_port = get_port(args.redis_address) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 70743dfd7..ac06a3042 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -54,9 +54,7 @@ LOCAL_SCHEDULER_CLIENT_TYPE = b"local_scheduler" PLASMA_MANAGER_CLIENT_TYPE = b"plasma_manager" # Set up logging. -logging.basicConfig() -log = logging.getLogger() -log.setLevel(logging.INFO) +logger = logging.getLogger(__name__) class Monitor(object): @@ -125,8 +123,8 @@ class Monitor(object): # that redis server. addr_port = self.redis.lrange("RedisShards", 0, -1) if len(addr_port) > 1: - log.warning("TODO: if launching > 1 redis shard, flushing " - "needs to touch shards in parallel.") + logger.warning("TODO: if launching > 1 redis shard, flushing " + "needs to touch shards in parallel.") self.issue_gcs_flushes = False else: addr_port = addr_port[0].split(b":") @@ -135,7 +133,7 @@ class Monitor(object): try: self.redis_shard.execute_command("HEAD.FLUSH 0") except redis.exceptions.ResponseError as e: - log.info( + logger.info( "Turning off flushing due to exception: {}".format( str(e))) self.issue_gcs_flushes = False @@ -194,8 +192,8 @@ class Monitor(object): dummy_object_id, "RAY.OBJECT_TABLE_REMOVE", dummy_object_id.id(), hex_to_binary(manager)) if ok != b"OK": - log.warn("Failed to remove object location for " - "dead plasma manager.") + logger.warn("Failed to remove object location for " + "dead plasma manager.") # If the task is scheduled on a dead local scheduler, mark the # task as lost. @@ -205,11 +203,11 @@ class Monitor(object): ray.experimental.state.TASK_STATUS_LOST, NIL_ID, task["ExecutionDependenciesString"], task["SpillbackCount"]) if ok != b"OK": - log.warn("Failed to update lost task for dead scheduler.") + logger.warn("Failed to update lost task for dead scheduler.") num_tasks_updated += 1 if num_tasks_updated > 0: - log.warn("Marked {} tasks as lost.".format(num_tasks_updated)) + logger.warn("Marked {} tasks as lost.".format(num_tasks_updated)) def cleanup_object_table(self): """Clean up global state for failed plasma managers. @@ -234,11 +232,12 @@ class Monitor(object): object_id, "RAY.OBJECT_TABLE_REMOVE", object_id.id(), hex_to_binary(manager)) if ok != b"OK": - log.warn("Failed to remove object location for dead " - "plasma manager.") + logger.warn("Failed to remove object location for " + "dead plasma manager.") num_objects_removed += 1 if num_objects_removed > 0: - log.warn("Marked {} objects as lost.".format(num_objects_removed)) + logger.warn("Marked {} objects as lost." + .format(num_objects_removed)) def scan_db_client_table(self): """Scan the database client table for dead clients. @@ -285,7 +284,8 @@ class Monitor(object): # If the update was a deletion, add them to our accounting for dead # local schedulers and plasma managers. - log.warn("Removed {}, client ID {}".format(client_type, db_client_id)) + logger.warn("Removed {}, client ID {}".format(client_type, + db_client_id)) if client_type == LOCAL_SCHEDULER_CLIENT_TYPE: if db_client_id not in self.dead_local_schedulers: self.dead_local_schedulers.add(db_client_id) @@ -429,11 +429,11 @@ class Monitor(object): return # Remove with best effort. num_deleted = redis.delete(*keys) - log.info( + logger.info( "Removed {} dead redis entries of the driver from redis shard {}.". format(num_deleted, shard_index)) if num_deleted != len(keys): - log.warning( + logger.warning( "Failed to remove {} relevant redis entries" " from redis shard {}.".format(len(keys) - num_deleted)) @@ -494,7 +494,7 @@ class Monitor(object): message = ray.gcs_utils.DriverTableMessage.GetRootAsDriverTableMessage( data, 0) driver_id = message.DriverId() - log.info("Driver {} has been removed.".format( + logger.info("Driver {} has been removed.".format( binary_to_hex(driver_id))) self._clean_up_entries_for_driver(driver_id) @@ -556,12 +556,12 @@ class Monitor(object): continue redis = self.state.redis_clients[shard_index] num_deleted = redis.delete(*keys) - log.info("Removed {} dead redis entries of the driver" - " from redis shard {}.".format(num_deleted, shard_index)) + logger.info("Removed {} dead redis entries of the driver from" + " redis shard {}.".format(num_deleted, shard_index)) if num_deleted != len(keys): - log.warning("Failed to remove {} relevant redis entries" - " from redis shard {}.".format( - len(keys) - num_deleted, shard_index)) + logger.warning("Failed to remove {} relevant redis entries" + " from redis shard {}.".format( + len(keys) - num_deleted, shard_index)) def xray_driver_removed_handler(self, unused_channel, data): """Handle a notification that a driver has been removed. @@ -576,7 +576,7 @@ class Monitor(object): message = ray.gcs_utils.DriverTableData.GetRootAsDriverTableData( driver_data, 0) driver_id = message.DriverId() - log.info("XRay Driver {} has been removed.".format( + logger.info("XRay Driver {} has been removed.".format( binary_to_hex(driver_id))) self._xray_clean_up_entries_for_driver(driver_id) @@ -616,7 +616,7 @@ class Monitor(object): message_handler = self.db_client_notification_handler elif channel == DRIVER_DEATH_CHANNEL: # The message was a notification that a driver was removed. - log.info("message-handler: driver_removed_handler") + logger.info("message-handler: driver_removed_handler") message_handler = self.driver_removed_handler elif channel == XRAY_HEARTBEAT_CHANNEL: # Similar functionality as local scheduler info channel @@ -669,7 +669,7 @@ class Monitor(object): max_entries_to_flush = self.gcs_flush_policy.num_entries_to_flush() num_flushed = self.redis_shard.execute_command( "HEAD.FLUSH {}".format(max_entries_to_flush)) - log.info("num_flushed {}".format(num_flushed)) + logger.info("num_flushed {}".format(num_flushed)) # This flushes event log and log files. ray.experimental.flush_redis_unsafe(self.redis) @@ -706,10 +706,10 @@ class Monitor(object): num_plasma_managers = len(self.live_plasma_managers) + len( self.dead_plasma_managers) - log.debug("{} dead local schedulers, {} plasma managers total, {} " - "dead plasma managers".format( - len(self.dead_local_schedulers), num_plasma_managers, - len(self.dead_plasma_managers))) + logger.debug("{} dead local schedulers, {} plasma managers total, {} " + "dead plasma managers".format( + len(self.dead_local_schedulers), num_plasma_managers, + len(self.dead_plasma_managers))) # Handle messages from the subscription channels. while True: @@ -743,7 +743,8 @@ class Monitor(object): for plasma_manager_id in plasma_manager_ids: if ((self.live_plasma_managers[plasma_manager_id]) >= ray._config.num_heartbeats_timeout()): - log.warn("Timed out {}".format(PLASMA_MANAGER_CLIENT_TYPE)) + logger.warn("Timed out {}" + .format(PLASMA_MANAGER_CLIENT_TYPE)) # Remove the plasma manager from the managers whose # heartbeats we're tracking. del self.live_plasma_managers[plasma_manager_id] @@ -782,7 +783,22 @@ if __name__ == "__main__": required=False, type=str, help="the path to the autoscaling config file") + parser.add_argument( + "--logging-level", + required=False, + type=str, + default=ray_constants.LOGGER_LEVEL, + choices=ray_constants.LOGGER_LEVEL_CHOICES, + help=ray_constants.LOGGER_LEVEL_HELP) + parser.add_argument( + "--logging-format", + required=False, + type=str, + default=ray_constants.LOGGER_FORMAT, + help=ray_constants.LOGGER_FORMAT_HELP) args = parser.parse_args() + level = logging.getLevelName(args.logging_level.upper()) + logging.basicConfig(level=level, format=args.logging_format) redis_ip_address = get_ip_address(args.redis_address) redis_port = get_port(args.redis_address) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 931075787..384afddd8 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -66,3 +66,11 @@ AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S", # Max number of retries to AWS (default is 5, time increases exponentially) BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) + +# Default logger format: only contains the message. +LOGGER_FORMAT = "%(message)s" +LOGGER_FORMAT_HELP = "The logging format. default='%(message)s'" +LOGGER_LEVEL = "info" +LOGGER_LEVEL_CHOICES = ['debug', 'info', 'warning', 'error', 'critical'] +LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info'," + " 'warning', 'error', 'critical'], default='info'") diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 9c1bb0c12..c973b2eff 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -4,6 +4,7 @@ from __future__ import print_function import click import json +import logging import os import subprocess @@ -11,6 +12,7 @@ import ray.services as services from ray.autoscaler.commands import (attach_cluster, exec_cluster, create_or_update_cluster, rsync, teardown_cluster, get_head_node_ip) +import ray.ray_constants as ray_constants import ray.utils @@ -149,20 +151,36 @@ def cli(): is_flag=True, default=None, help="use the raylet code path") +@click.option( + "--logging-level", + required=False, + default=ray_constants.LOGGER_LEVEL, + type=str, + help=ray_constants.LOGGER_LEVEL_HELP) +@click.option( + "--logging-format", + required=False, + default=ray_constants.LOGGER_FORMAT, + type=str, + help=ray_constants.LOGGER_FORMAT_HELP) def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_shard_ports, object_manager_port, object_store_memory, num_workers, num_cpus, num_gpus, resources, head, no_ui, block, plasma_directory, huge_pages, autoscaling_config, - use_raylet): + use_raylet, logging_level, logging_format): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) if redis_address is not None: redis_address = services.address_to_ip(redis_address) + level = logging.getLevelName(logging_level.upper()) + logging.basicConfig(level=level, format=logging_format) + logger = logging.getLogger(__name__) + if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": # This environment variable is used in our testing setup. - print("Detected environment variable 'RAY_USE_XRAY'.") + logger.info("Detected environment variable 'RAY_USE_XRAY'.") use_raylet = True try: @@ -204,7 +222,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, # Get the node IP address if one is not provided. if node_ip_address is None: node_ip_address = services.get_node_ip_address() - print("Using IP address {} for this node.".format(node_ip_address)) + logger.info("Using IP address {} for this node." + .format(node_ip_address)) address_info = {} # Use the provided object manager port if there is one. @@ -232,19 +251,20 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, huge_pages=huge_pages, autoscaling_config=autoscaling_config, use_raylet=use_raylet) - print(address_info) - print("\nStarted Ray on this node. You can add additional nodes to " - "the cluster by calling\n\n" - " ray start --redis-address {}\n\n" - "from the node you wish to add. You can connect a driver to the " - "cluster from Python by running\n\n" - " import ray\n" - " ray.init(redis_address=\"{}\")\n\n" - "If you have trouble connecting from a different machine, check " - "that your firewall is configured properly. If you wish to " - "terminate the processes that have been started, run\n\n" - " ray stop".format(address_info["redis_address"], - address_info["redis_address"])) + logger.info(address_info) + logger.info( + "\nStarted Ray on this node. You can add additional nodes to " + "the cluster by calling\n\n" + " ray start --redis-address {}\n\n" + "from the node you wish to add. You can connect a driver to the " + "cluster from Python by running\n\n" + " import ray\n" + " ray.init(redis_address=\"{}\")\n\n" + "If you have trouble connecting from a different machine, check " + "that your firewall is configured properly. If you wish to " + "terminate the processes that have been started, run\n\n" + " ray stop".format(address_info["redis_address"], + address_info["redis_address"])) else: # Start Ray on a non-head node. if redis_port is not None: @@ -281,7 +301,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, # Get the node IP address if one is not provided. if node_ip_address is None: node_ip_address = services.get_node_ip_address(redis_address) - print("Using IP address {} for this node.".format(node_ip_address)) + logger.info("Using IP address {} for this node." + .format(node_ip_address)) # Check that there aren't already Redis clients with the same IP # address connected with this Redis instance. This raises an exception # if the Redis server already has clients on this node. @@ -299,10 +320,10 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, plasma_directory=plasma_directory, huge_pages=huge_pages, use_raylet=use_raylet) - print(address_info) - print("\nStarted Ray on this node. If you wish to terminate the " - "processes that have been started, run\n\n" - " ray stop") + logger.info(address_info) + logger.info("\nStarted Ray on this node. If you wish to terminate the " + "processes that have been started, run\n\n" + " ray stop") if block: import time diff --git a/python/ray/services.py b/python/ray/services.py index 2087e79e2..0850441ff 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -3,8 +3,8 @@ from __future__ import division from __future__ import print_function import binascii -import logging import json +import logging import os import random import resource @@ -28,12 +28,6 @@ import ray.global_scheduler as global_scheduler import ray.local_scheduler import ray.plasma -logger = logging.getLogger("ray") -logger.setLevel(logging.INFO) -ch = logging.StreamHandler(sys.stderr) -ch.setLevel(logging.INFO) -logger.addHandler(ch) - PROCESS_TYPE_MONITOR = "monitor" PROCESS_TYPE_LOG_MONITOR = "log_monitor" PROCESS_TYPE_WORKER = "worker" @@ -98,6 +92,11 @@ RAYLET_EXECUTABLE = os.path.join( ObjectStoreAddress = namedtuple("ObjectStoreAddress", ["name", "manager_name", "manager_port"]) +# Logger for this module. It should be configured at the entry point +# into the program using Ray. Ray configures it by default automatically +# using logging.basicConfig in its entry/init points. +logger = logging.getLogger(__name__) + def address(ip_address, port): return ip_address + ":" + str(port) diff --git a/python/ray/signature.py b/python/ray/signature.py index 4e771284a..d155e55d8 100644 --- a/python/ray/signature.py +++ b/python/ray/signature.py @@ -5,10 +5,15 @@ from __future__ import print_function from collections import namedtuple import funcsigs from funcsigs import Parameter +import logging -from ray.services import logger from ray.utils import is_cython +# Logger for this module. It should be configured at the entry point +# into the program using Ray. Ray configures it by default automatically +# using logging.basicConfig in its entry/init points. +logger = logging.getLogger(__name__) + FunctionSignature = namedtuple("FunctionSignature", [ "arg_names", "arg_defaults", "arg_is_positionals", "keyword_names", "function_name" diff --git a/python/ray/test/test_utils.py b/python/ray/test/test_utils.py index 14d627fb4..a29daa5a0 100644 --- a/python/ray/test/test_utils.py +++ b/python/ray/test/test_utils.py @@ -141,7 +141,7 @@ def wait_for_pid_to_exit(pid, timeout=20): def run_and_get_output(command): with tempfile.NamedTemporaryFile() as tmp: - p = subprocess.Popen(command, stdout=tmp) + p = subprocess.Popen(command, stdout=tmp, stderr=tmp) if p.wait() != 0: raise RuntimeError("ray start did not terminate properly") with open(tmp.name, 'r') as f: diff --git a/python/ray/worker.py b/python/ray/worker.py index a3dd96be1..6ecac4903 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -7,6 +7,7 @@ import collections import colorama import hashlib import inspect +import logging import numpy as np import os import redis @@ -25,7 +26,6 @@ import ray.gcs_utils import ray.remote_function import ray.serialization as serialization import ray.services as services -from ray.services import logger import ray.signature import ray.local_scheduler import ray.plasma @@ -73,6 +73,11 @@ DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0 DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0 DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1 +# Logger for this module. It should be configured at the entry point +# into the program using Ray. Ray configures it by default automatically +# using logging.basicConfig in its entry/init points. +logger = logging.getLogger(__name__) + class RayTaskError(Exception): """An object used internally to represent a task that threw an exception. @@ -1730,7 +1735,10 @@ def init(redis_address=None, plasma_directory=None, huge_pages=False, include_webui=True, - use_raylet=None): + use_raylet=None, + configure_logging=True, + logging_level=logging.INFO, + logging_format=ray_constants.LOGGER_FORMAT): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -1790,6 +1798,11 @@ def init(redis_address=None, include_webui: Boolean flag indicating whether to start the web UI, which is a Jupyter notebook. use_raylet: True if the new raylet code path should be used. + configure_logging: True if allow the logging cofiguration here. + Otherwise, the users may want to configure it by their own. + logging_level: Logging level, default will be loging.INFO. + logging_format: Logging format, default will be "%(message)s" + which means only contains the message. Returns: Address information about the started processes. @@ -1798,6 +1811,9 @@ def init(redis_address=None, Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ + if configure_logging: + logging.basicConfig(level=logging_level, format=logging_format) + if global_worker.connected: if ignore_reinit_error: logger.error("Calling ray.init() again after it has already been " diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 0bc21f2b2..cd7b3f4a4 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -3,10 +3,12 @@ from __future__ import division from __future__ import print_function import argparse +import logging import traceback import ray import ray.actor +import ray.ray_constants as ray_constants parser = argparse.ArgumentParser( description=("Parse addresses for the worker " @@ -38,6 +40,19 @@ parser.add_argument( help="the local scheduler's name") parser.add_argument( "--raylet-name", required=False, type=str, help="the raylet's name") +parser.add_argument( + "--logging-level", + required=False, + type=str, + default=ray_constants.LOGGER_LEVEL, + choices=ray_constants.LOGGER_LEVEL_CHOICES, + help=ray_constants.LOGGER_LEVEL_HELP) +parser.add_argument( + "--logging-format", + required=False, + type=str, + default=ray_constants.LOGGER_FORMAT, + help=ray_constants.LOGGER_FORMAT_HELP) if __name__ == "__main__": args = parser.parse_args() @@ -51,6 +66,10 @@ if __name__ == "__main__": "raylet_socket_name": args.raylet_name } + logging.basicConfig( + level=logging.getLevelName(args.logging_level.upper()), + format=args.logging_format) + ray.worker.connect( info, mode=ray.WORKER_MODE, use_raylet=(args.raylet_name is not None))