Separate python logger module-wise (#2703)

## What do these changes do?
1. Separate the log related code to logger.py from services.py.
2. Allow users to modify logging formatter in `ray start`.

## Related issue number
https://github.com/ray-project/ray/pull/2664
This commit is contained in:
Yuhong Guo
2018-08-27 04:46:15 +08:00
committed by Robert Nishihara
parent 26d3c0655c
commit 0b6e08ebee
9 changed files with 169 additions and 63 deletions
+23 -1
View File
@@ -3,15 +3,21 @@ from __future__ import division
from __future__ import print_function
import argparse
import logging
import os
import redis
import time
import ray.ray_constants as ray_constants
from ray.services import get_ip_address
from ray.services import get_port
from ray.services import logger
import ray.utils
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
logger = logging.getLogger(__name__)
class LogMonitor(object):
"""A monitor process for monitoring Ray log files.
@@ -124,7 +130,23 @@ if __name__ == "__main__":
required=True,
type=str,
help="The IP address of the node this process is on.")
parser.add_argument(
"--logging-level",
required=False,
type=str,
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
args = parser.parse_args()
logging.basicConfig(
level=logging.getLevelName(args.logging_level.upper()),
format=args.logging_format)
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)
+46 -30
View File
@@ -54,9 +54,7 @@ LOCAL_SCHEDULER_CLIENT_TYPE = b"local_scheduler"
PLASMA_MANAGER_CLIENT_TYPE = b"plasma_manager"
# Set up logging.
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
class Monitor(object):
@@ -125,8 +123,8 @@ class Monitor(object):
# that redis server.
addr_port = self.redis.lrange("RedisShards", 0, -1)
if len(addr_port) > 1:
log.warning("TODO: if launching > 1 redis shard, flushing "
"needs to touch shards in parallel.")
logger.warning("TODO: if launching > 1 redis shard, flushing "
"needs to touch shards in parallel.")
self.issue_gcs_flushes = False
else:
addr_port = addr_port[0].split(b":")
@@ -135,7 +133,7 @@ class Monitor(object):
try:
self.redis_shard.execute_command("HEAD.FLUSH 0")
except redis.exceptions.ResponseError as e:
log.info(
logger.info(
"Turning off flushing due to exception: {}".format(
str(e)))
self.issue_gcs_flushes = False
@@ -194,8 +192,8 @@ class Monitor(object):
dummy_object_id, "RAY.OBJECT_TABLE_REMOVE",
dummy_object_id.id(), hex_to_binary(manager))
if ok != b"OK":
log.warn("Failed to remove object location for "
"dead plasma manager.")
logger.warn("Failed to remove object location for "
"dead plasma manager.")
# If the task is scheduled on a dead local scheduler, mark the
# task as lost.
@@ -205,11 +203,11 @@ class Monitor(object):
ray.experimental.state.TASK_STATUS_LOST, NIL_ID,
task["ExecutionDependenciesString"], task["SpillbackCount"])
if ok != b"OK":
log.warn("Failed to update lost task for dead scheduler.")
logger.warn("Failed to update lost task for dead scheduler.")
num_tasks_updated += 1
if num_tasks_updated > 0:
log.warn("Marked {} tasks as lost.".format(num_tasks_updated))
logger.warn("Marked {} tasks as lost.".format(num_tasks_updated))
def cleanup_object_table(self):
"""Clean up global state for failed plasma managers.
@@ -234,11 +232,12 @@ class Monitor(object):
object_id, "RAY.OBJECT_TABLE_REMOVE", object_id.id(),
hex_to_binary(manager))
if ok != b"OK":
log.warn("Failed to remove object location for dead "
"plasma manager.")
logger.warn("Failed to remove object location for "
"dead plasma manager.")
num_objects_removed += 1
if num_objects_removed > 0:
log.warn("Marked {} objects as lost.".format(num_objects_removed))
logger.warn("Marked {} objects as lost."
.format(num_objects_removed))
def scan_db_client_table(self):
"""Scan the database client table for dead clients.
@@ -285,7 +284,8 @@ class Monitor(object):
# If the update was a deletion, add them to our accounting for dead
# local schedulers and plasma managers.
log.warn("Removed {}, client ID {}".format(client_type, db_client_id))
logger.warn("Removed {}, client ID {}".format(client_type,
db_client_id))
if client_type == LOCAL_SCHEDULER_CLIENT_TYPE:
if db_client_id not in self.dead_local_schedulers:
self.dead_local_schedulers.add(db_client_id)
@@ -429,11 +429,11 @@ class Monitor(object):
return
# Remove with best effort.
num_deleted = redis.delete(*keys)
log.info(
logger.info(
"Removed {} dead redis entries of the driver from redis shard {}.".
format(num_deleted, shard_index))
if num_deleted != len(keys):
log.warning(
logger.warning(
"Failed to remove {} relevant redis entries"
" from redis shard {}.".format(len(keys) - num_deleted))
@@ -494,7 +494,7 @@ class Monitor(object):
message = ray.gcs_utils.DriverTableMessage.GetRootAsDriverTableMessage(
data, 0)
driver_id = message.DriverId()
log.info("Driver {} has been removed.".format(
logger.info("Driver {} has been removed.".format(
binary_to_hex(driver_id)))
self._clean_up_entries_for_driver(driver_id)
@@ -556,12 +556,12 @@ class Monitor(object):
continue
redis = self.state.redis_clients[shard_index]
num_deleted = redis.delete(*keys)
log.info("Removed {} dead redis entries of the driver"
" from redis shard {}.".format(num_deleted, shard_index))
logger.info("Removed {} dead redis entries of the driver from"
" redis shard {}.".format(num_deleted, shard_index))
if num_deleted != len(keys):
log.warning("Failed to remove {} relevant redis entries"
" from redis shard {}.".format(
len(keys) - num_deleted, shard_index))
logger.warning("Failed to remove {} relevant redis entries"
" from redis shard {}.".format(
len(keys) - num_deleted, shard_index))
def xray_driver_removed_handler(self, unused_channel, data):
"""Handle a notification that a driver has been removed.
@@ -576,7 +576,7 @@ class Monitor(object):
message = ray.gcs_utils.DriverTableData.GetRootAsDriverTableData(
driver_data, 0)
driver_id = message.DriverId()
log.info("XRay Driver {} has been removed.".format(
logger.info("XRay Driver {} has been removed.".format(
binary_to_hex(driver_id)))
self._xray_clean_up_entries_for_driver(driver_id)
@@ -616,7 +616,7 @@ class Monitor(object):
message_handler = self.db_client_notification_handler
elif channel == DRIVER_DEATH_CHANNEL:
# The message was a notification that a driver was removed.
log.info("message-handler: driver_removed_handler")
logger.info("message-handler: driver_removed_handler")
message_handler = self.driver_removed_handler
elif channel == XRAY_HEARTBEAT_CHANNEL:
# Similar functionality as local scheduler info channel
@@ -669,7 +669,7 @@ class Monitor(object):
max_entries_to_flush = self.gcs_flush_policy.num_entries_to_flush()
num_flushed = self.redis_shard.execute_command(
"HEAD.FLUSH {}".format(max_entries_to_flush))
log.info("num_flushed {}".format(num_flushed))
logger.info("num_flushed {}".format(num_flushed))
# This flushes event log and log files.
ray.experimental.flush_redis_unsafe(self.redis)
@@ -706,10 +706,10 @@ class Monitor(object):
num_plasma_managers = len(self.live_plasma_managers) + len(
self.dead_plasma_managers)
log.debug("{} dead local schedulers, {} plasma managers total, {} "
"dead plasma managers".format(
len(self.dead_local_schedulers), num_plasma_managers,
len(self.dead_plasma_managers)))
logger.debug("{} dead local schedulers, {} plasma managers total, {} "
"dead plasma managers".format(
len(self.dead_local_schedulers), num_plasma_managers,
len(self.dead_plasma_managers)))
# Handle messages from the subscription channels.
while True:
@@ -743,7 +743,8 @@ class Monitor(object):
for plasma_manager_id in plasma_manager_ids:
if ((self.live_plasma_managers[plasma_manager_id]) >=
ray._config.num_heartbeats_timeout()):
log.warn("Timed out {}".format(PLASMA_MANAGER_CLIENT_TYPE))
logger.warn("Timed out {}"
.format(PLASMA_MANAGER_CLIENT_TYPE))
# Remove the plasma manager from the managers whose
# heartbeats we're tracking.
del self.live_plasma_managers[plasma_manager_id]
@@ -782,7 +783,22 @@ if __name__ == "__main__":
required=False,
type=str,
help="the path to the autoscaling config file")
parser.add_argument(
"--logging-level",
required=False,
type=str,
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
args = parser.parse_args()
level = logging.getLevelName(args.logging_level.upper())
logging.basicConfig(level=level, format=args.logging_format)
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)
+8
View File
@@ -66,3 +66,11 @@ AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S",
# Max number of retries to AWS (default is 5, time increases exponentially)
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
# Default logger format: only contains the message.
LOGGER_FORMAT = "%(message)s"
LOGGER_FORMAT_HELP = "The logging format. default='%(message)s'"
LOGGER_LEVEL = "info"
LOGGER_LEVEL_CHOICES = ['debug', 'info', 'warning', 'error', 'critical']
LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info',"
" 'warning', 'error', 'critical'], default='info'")
+42 -21
View File
@@ -4,6 +4,7 @@ from __future__ import print_function
import click
import json
import logging
import os
import subprocess
@@ -11,6 +12,7 @@ import ray.services as services
from ray.autoscaler.commands import (attach_cluster, exec_cluster,
create_or_update_cluster, rsync,
teardown_cluster, get_head_node_ip)
import ray.ray_constants as ray_constants
import ray.utils
@@ -149,20 +151,36 @@ def cli():
is_flag=True,
default=None,
help="use the raylet code path")
@click.option(
"--logging-level",
required=False,
default=ray_constants.LOGGER_LEVEL,
type=str,
help=ray_constants.LOGGER_LEVEL_HELP)
@click.option(
"--logging-format",
required=False,
default=ray_constants.LOGGER_FORMAT,
type=str,
help=ray_constants.LOGGER_FORMAT_HELP)
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_shard_ports, object_manager_port,
object_store_memory, num_workers, num_cpus, num_gpus, resources,
head, no_ui, block, plasma_directory, huge_pages, autoscaling_config,
use_raylet):
use_raylet, logging_level, logging_format):
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
if redis_address is not None:
redis_address = services.address_to_ip(redis_address)
level = logging.getLevelName(logging_level.upper())
logging.basicConfig(level=level, format=logging_format)
logger = logging.getLogger(__name__)
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
print("Detected environment variable 'RAY_USE_XRAY'.")
logger.info("Detected environment variable 'RAY_USE_XRAY'.")
use_raylet = True
try:
@@ -204,7 +222,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address()
print("Using IP address {} for this node.".format(node_ip_address))
logger.info("Using IP address {} for this node."
.format(node_ip_address))
address_info = {}
# Use the provided object manager port if there is one.
@@ -232,19 +251,20 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
huge_pages=huge_pages,
autoscaling_config=autoscaling_config,
use_raylet=use_raylet)
print(address_info)
print("\nStarted Ray on this node. You can add additional nodes to "
"the cluster by calling\n\n"
" ray start --redis-address {}\n\n"
"from the node you wish to add. You can connect a driver to the "
"cluster from Python by running\n\n"
" import ray\n"
" ray.init(redis_address=\"{}\")\n\n"
"If you have trouble connecting from a different machine, check "
"that your firewall is configured properly. If you wish to "
"terminate the processes that have been started, run\n\n"
" ray stop".format(address_info["redis_address"],
address_info["redis_address"]))
logger.info(address_info)
logger.info(
"\nStarted Ray on this node. You can add additional nodes to "
"the cluster by calling\n\n"
" ray start --redis-address {}\n\n"
"from the node you wish to add. You can connect a driver to the "
"cluster from Python by running\n\n"
" import ray\n"
" ray.init(redis_address=\"{}\")\n\n"
"If you have trouble connecting from a different machine, check "
"that your firewall is configured properly. If you wish to "
"terminate the processes that have been started, run\n\n"
" ray stop".format(address_info["redis_address"],
address_info["redis_address"]))
else:
# Start Ray on a non-head node.
if redis_port is not None:
@@ -281,7 +301,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address(redis_address)
print("Using IP address {} for this node.".format(node_ip_address))
logger.info("Using IP address {} for this node."
.format(node_ip_address))
# Check that there aren't already Redis clients with the same IP
# address connected with this Redis instance. This raises an exception
# if the Redis server already has clients on this node.
@@ -299,10 +320,10 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
use_raylet=use_raylet)
print(address_info)
print("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
" ray stop")
logger.info(address_info)
logger.info("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
" ray stop")
if block:
import time
+6 -7
View File
@@ -3,8 +3,8 @@ from __future__ import division
from __future__ import print_function
import binascii
import logging
import json
import logging
import os
import random
import resource
@@ -28,12 +28,6 @@ import ray.global_scheduler as global_scheduler
import ray.local_scheduler
import ray.plasma
logger = logging.getLogger("ray")
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(logging.INFO)
logger.addHandler(ch)
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
PROCESS_TYPE_WORKER = "worker"
@@ -98,6 +92,11 @@ RAYLET_EXECUTABLE = os.path.join(
ObjectStoreAddress = namedtuple("ObjectStoreAddress",
["name", "manager_name", "manager_port"])
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
logger = logging.getLogger(__name__)
def address(ip_address, port):
return ip_address + ":" + str(port)
+6 -1
View File
@@ -5,10 +5,15 @@ from __future__ import print_function
from collections import namedtuple
import funcsigs
from funcsigs import Parameter
import logging
from ray.services import logger
from ray.utils import is_cython
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
logger = logging.getLogger(__name__)
FunctionSignature = namedtuple("FunctionSignature", [
"arg_names", "arg_defaults", "arg_is_positionals", "keyword_names",
"function_name"
+1 -1
View File
@@ -141,7 +141,7 @@ def wait_for_pid_to_exit(pid, timeout=20):
def run_and_get_output(command):
with tempfile.NamedTemporaryFile() as tmp:
p = subprocess.Popen(command, stdout=tmp)
p = subprocess.Popen(command, stdout=tmp, stderr=tmp)
if p.wait() != 0:
raise RuntimeError("ray start did not terminate properly")
with open(tmp.name, 'r') as f:
+18 -2
View File
@@ -7,6 +7,7 @@ import collections
import colorama
import hashlib
import inspect
import logging
import numpy as np
import os
import redis
@@ -25,7 +26,6 @@ import ray.gcs_utils
import ray.remote_function
import ray.serialization as serialization
import ray.services as services
from ray.services import logger
import ray.signature
import ray.local_scheduler
import ray.plasma
@@ -73,6 +73,11 @@ DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0
DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0
DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
logger = logging.getLogger(__name__)
class RayTaskError(Exception):
"""An object used internally to represent a task that threw an exception.
@@ -1730,7 +1735,10 @@ def init(redis_address=None,
plasma_directory=None,
huge_pages=False,
include_webui=True,
use_raylet=None):
use_raylet=None,
configure_logging=True,
logging_level=logging.INFO,
logging_format=ray_constants.LOGGER_FORMAT):
"""Connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1790,6 +1798,11 @@ def init(redis_address=None,
include_webui: Boolean flag indicating whether to start the web
UI, which is a Jupyter notebook.
use_raylet: True if the new raylet code path should be used.
configure_logging: True if allow the logging cofiguration here.
Otherwise, the users may want to configure it by their own.
logging_level: Logging level, default will be loging.INFO.
logging_format: Logging format, default will be "%(message)s"
which means only contains the message.
Returns:
Address information about the started processes.
@@ -1798,6 +1811,9 @@ def init(redis_address=None,
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
if configure_logging:
logging.basicConfig(level=logging_level, format=logging_format)
if global_worker.connected:
if ignore_reinit_error:
logger.error("Calling ray.init() again after it has already been "
+19
View File
@@ -3,10 +3,12 @@ from __future__ import division
from __future__ import print_function
import argparse
import logging
import traceback
import ray
import ray.actor
import ray.ray_constants as ray_constants
parser = argparse.ArgumentParser(
description=("Parse addresses for the worker "
@@ -38,6 +40,19 @@ parser.add_argument(
help="the local scheduler's name")
parser.add_argument(
"--raylet-name", required=False, type=str, help="the raylet's name")
parser.add_argument(
"--logging-level",
required=False,
type=str,
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
if __name__ == "__main__":
args = parser.parse_args()
@@ -51,6 +66,10 @@ if __name__ == "__main__":
"raylet_socket_name": args.raylet_name
}
logging.basicConfig(
level=logging.getLevelName(args.logging_level.upper()),
format=args.logging_format)
ray.worker.connect(
info, mode=ray.WORKER_MODE, use_raylet=(args.raylet_name is not None))