Printing messages to stderr (#2312)

Move core python code onto logging module.

Addressing #1884.
This commit is contained in:
Richard Liaw
2018-07-02 16:10:57 -07:00
committed by GitHub
parent bb1d7eaece
commit 178346fa16
4 changed files with 72 additions and 54 deletions
+7 -5
View File
@@ -9,6 +9,7 @@ import time
from ray.services import get_ip_address
from ray.services import get_port
from ray.services import logger
class LogMonitor(object):
@@ -43,7 +44,7 @@ class LogMonitor(object):
"LOG_FILENAMES:{}".format(self.node_ip_address),
num_current_log_files, -1)
for log_filename in new_log_filenames:
print("Beginning to track file {}".format(log_filename))
logger.info("Beginning to track file {}".format(log_filename))
assert log_filename not in self.log_files
self.log_files[log_filename] = []
@@ -83,11 +84,12 @@ class LogMonitor(object):
log_filename, "r")
except IOError as e:
if e.errno == os.errno.EMFILE:
print("Warning: Ignoring {} because there are too "
"many open files.".format(log_filename))
logger.warning(
"Warning: Ignoring {} because there are too "
"many open files.".format(log_filename))
elif e.errno == os.errno.ENOENT:
print("Warning: The file {} was not "
"found.".format(log_filename))
logger.warning("Warning: The file {} was not "
"found.".format(log_filename))
else:
raise e
+37 -25
View File
@@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
import binascii
import logging
import json
import os
import random
@@ -27,6 +28,12 @@ import ray.global_scheduler as global_scheduler
import ray.local_scheduler
import ray.plasma
logger = logging.getLogger("ray")
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(logging.INFO)
logger.addHandler(ch)
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
PROCESS_TYPE_WORKER = "worker"
@@ -178,7 +185,7 @@ def cleanup():
# Reset the list of processes of this type.
all_processes[process_type] = []
if not successfully_shut_down:
print("Ray did not shut down properly.")
logger.warning("Ray did not shut down properly.")
def all_processes_alive(exclude=[]):
@@ -193,7 +200,8 @@ def all_processes_alive(exclude=[]):
# alive.
processes_alive = [p.poll() is None for p in processes]
if (not all(processes_alive) and process_type not in exclude):
print("A process of type {} has died.".format(process_type))
logger.warning(
"A process of type {} has died.".format(process_type))
return False
return True
@@ -303,13 +311,14 @@ def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5):
while counter < num_retries:
try:
# Run some random command and see if it worked.
print("Waiting for redis server at {}:{} to respond...".format(
redis_ip_address, redis_port))
logger.info(
"Waiting for redis server at {}:{} to respond...".format(
redis_ip_address, redis_port))
redis_client.client_list()
except redis.ConnectionError as e:
# Wait a little bit.
time.sleep(1)
print("Failed to connect to the redis server, retrying.")
logger.info("Failed to connect to the redis server, retrying.")
counter += 1
else:
break
@@ -393,7 +402,7 @@ def check_version_info(redis_client):
if version_info[:2] != true_version_info[:2]:
raise Exception(error_message)
else:
print(error_message)
logger.warning(error_message)
def start_redis(node_ip_address,
@@ -601,7 +610,7 @@ def _start_redis_instance(node_ip_address="127.0.0.1",
while counter < num_retries:
if counter > 0:
print("Redis failed to start, retrying now.")
logger.warning("Redis failed to start, retrying now.")
command = [executable, "--port",
str(port), "--loglevel", "warning"] + load_module_args
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
@@ -786,16 +795,16 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True):
stdout=stdout_file,
stderr=stderr_file)
except Exception:
print("Failed to start the UI, you may need to run "
"'pip install jupyter'.")
logger.warning("Failed to start the UI, you may need to run "
"'pip install jupyter'.")
else:
if cleanup:
all_processes[PROCESS_TYPE_WEB_UI].append(ui_process)
webui_url = ("http://localhost:{}/notebooks/ray_ui{}.ipynb?token={}"
.format(port, random_ui_id, token))
print("\n" + "=" * 70)
print("View the web UI at {}".format(webui_url))
print("=" * 70 + "\n")
logger.info("\n" + "=" * 70)
logger.info("View the web UI at {}".format(webui_url))
logger.info("=" * 70 + "\n")
return webui_url
@@ -846,8 +855,8 @@ def check_and_update_resources(resources, use_raylet):
if (use_raylet and
resource_quantity > ray.ray_constants.MAX_RESOURCE_QUANTITY):
raise ValueError("Resource quantities must be at most {}."
.format(ray.ray_constants.MAX_RESOURCE_QUANTITY))
raise ValueError("Resource quantities must be at most {}.".format(
ray.ray_constants.MAX_RESOURCE_QUANTITY))
return resources
@@ -892,8 +901,8 @@ def start_local_scheduler(redis_address,
"""
resources = check_and_update_resources(resources, False)
print("Starting local scheduler with the following resources: {}."
.format(resources))
logger.info("Starting local scheduler with the following resources: {}."
.format(resources))
local_scheduler_name, p = ray.local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name,
@@ -1049,12 +1058,13 @@ def start_objstore(node_ip_address,
# blocks.
shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail
if objstore_memory > shm_avail:
print("Warning: Reducing object store memory because "
"/dev/shm has only {} bytes available. You may be "
"able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, "
"you may need to pass an argument with the flag "
"'--shm-size' to 'docker run'.".format(shm_avail))
logger.warning(
"Warning: Reducing object store memory because "
"/dev/shm has only {} bytes available. You may be "
"able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, "
"you may need to pass an argument with the flag "
"'--shm-size' to 'docker run'.".format(shm_avail))
objstore_memory = int(shm_avail * 0.8)
finally:
os.close(shm_fd)
@@ -1295,7 +1305,8 @@ def start_ray_processes(address_info=None,
A dictionary of the address information for the processes that were
started.
"""
print("Process STDOUT and STDERR is being redirected to /tmp/raylogs/.")
logger.info(
"Process STDOUT and STDERR is being redirected to /tmp/raylogs/.")
if resources is None:
resources = {}
@@ -1712,8 +1723,9 @@ def try_to_create_directory(directory_path):
except OSError as e:
if e.errno != os.errno.EEXIST:
raise e
print("Attempted to create '{}', but the directory already "
"exists.".format(directory_path))
logger.warning(
"Attempted to create '{}', but the directory already "
"exists.".format(directory_path))
# Change the log directory permissions so others can use it. This is
# important when multiple people are using the same machine.
os.chmod(directory_path, 0o0777)
+3 -2
View File
@@ -6,6 +6,7 @@ from collections import namedtuple
import funcsigs
from funcsigs import Parameter
from ray.services import logger
from ray.utils import is_cython
FunctionSignature = namedtuple("FunctionSignature", [
@@ -99,7 +100,7 @@ def check_signature_supported(func, warn=False):
message = ("The function {} has a **kwargs argument, which is "
"currently not supported.".format(function_name))
if warn:
print(message)
logger.warning(message)
else:
raise Exception(message)
@@ -108,7 +109,7 @@ def check_signature_supported(func, warn=False):
"(defined after * or *args), which is currently "
"not supported.".format(function_name))
if warn:
print(message)
logger.warning(message)
else:
raise Exception(message)
+25 -22
View File
@@ -26,6 +26,7 @@ import ray.gcs_utils
import ray.remote_function
import ray.serialization as serialization
import ray.services as services
from ray.services import logger
import ray.signature
import ray.local_scheduler
import ray.plasma
@@ -310,7 +311,7 @@ class Worker(object):
"of their fields. This behavior may "
"be incorrect in some cases.".format(
type(e.example_object)))
print(warning_message)
logger.warning(warning_message)
except (serialization.RayNotDictionarySerializable,
serialization.CloudPickleError,
pickle.pickle.PicklingError, Exception):
@@ -324,7 +325,7 @@ class Worker(object):
"using pickle. This may be "
"inefficient.".format(
type(e.example_object)))
print(warning_message)
logger.warning(warning_message)
except serialization.CloudPickleError:
register_custom_serializer(
type(e.example_object),
@@ -335,7 +336,7 @@ class Worker(object):
"and only registering the class "
"locally.".format(
type(e.example_object)))
print(warning_message)
logger.warning(warning_message)
def put_object(self, object_id, value):
"""Put value in the local object store with object id objectid.
@@ -371,8 +372,9 @@ class Worker(object):
# and make sure that the objects are in fact the same. We also
# should return an error code to the caller instead of printing a
# message.
print("The object with ID {} already exists in the object store."
.format(object_id))
logger.info(
"The object with ID {} already exists in the object store."
.format(object_id))
def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10):
start_time = time.time()
@@ -1153,7 +1155,7 @@ def print_failed_task(task_status):
task_status (Dict): A dictionary containing the name, operationid, and
error message for a failed task.
"""
print("""
logger.error("""
Error: Task failed
Function Name: {}
Task ID: {}
@@ -1381,9 +1383,10 @@ def get_address_info_from_redis(redis_address,
raise
# Some of the information may not be in Redis yet, so wait a little
# bit.
print("Some processes that the driver needs to connect to have "
"not registered with Redis, so retrying. Have you run "
"'ray start' on this node?")
logger.warning(
"Some processes that the driver needs to connect to have "
"not registered with Redis, so retrying. Have you run "
"'ray start' on this node?")
time.sleep(1)
counter += 1
@@ -1516,7 +1519,7 @@ def _init(address_info=None,
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
print("Detected environment variable 'RAY_USE_XRAY'.")
logger.info("Detected environment variable 'RAY_USE_XRAY'.")
use_raylet = True
# Get addresses of existing services.
@@ -1716,7 +1719,7 @@ def init(redis_address=None,
"""
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
print("Detected environment variable 'RAY_USE_XRAY'.")
logger.info("Detected environment variable 'RAY_USE_XRAY'.")
use_raylet = True
# Convert hostnames to numerical IP address.
@@ -1837,10 +1840,10 @@ def print_error_messages_raylet(worker):
error_messages = global_state.error_messages(worker.task_driver_id)
for error_message in error_messages:
if error_message not in old_error_messages:
print(error_message)
logger.error(error_message)
old_error_messages.add(error_message)
else:
print("Suppressing duplicate error message.")
logger.error("Suppressing duplicate error message.")
try:
for msg in worker.error_message_pubsub_client.listen():
@@ -1858,10 +1861,10 @@ def print_error_messages_raylet(worker):
error_message = error_data.ErrorMessage().decode("ascii")
if error_message not in old_error_messages:
print(error_message)
logger.error(error_message)
old_error_messages.add(error_message)
else:
print("Suppressing duplicate error message.")
logger.error("Suppressing duplicate error message.")
except redis.ConnectionError:
# When Redis terminates the listen call will throw a ConnectionError,
@@ -1900,10 +1903,10 @@ def print_error_messages(worker):
error_message = worker.redis_client.hget(
error_key, "message").decode("ascii")
if error_message not in old_error_messages:
print(error_message)
logger.error(error_message)
old_error_messages.add(error_message)
else:
print("Suppressing duplicate error message.")
logger.error("Suppressing duplicate error message.")
num_errors_received += 1
try:
@@ -1915,10 +1918,11 @@ def print_error_messages(worker):
error_message = worker.redis_client.hget(
error_key, "message").decode("ascii")
if error_message not in old_error_messages:
print(error_message)
logger.error(error_message)
old_error_messages.add(error_message)
else:
print("Suppressing duplicate error message.")
logger.error(
"Suppressing duplicate error message.")
num_errors_received += 1
except redis.ConnectionError:
# When Redis terminates the listen call will throw a ConnectionError,
@@ -2414,10 +2418,9 @@ def _try_to_compute_deterministic_class_id(cls, depth=5):
# We have not reached a fixed point, so we may end up with a different
# class ID for this custom class on each worker, which could lead to the
# same class definition being exported many many times.
print(
logger.warning(
"WARNING: Could not produce a deterministic class ID for class "
"{}".format(cls),
file=sys.stderr)
"{}".format(cls))
return hashlib.sha1(new_class_id).digest()