Stream logs to driver by default. (#3892)

* Stream logs to driver by default.

* Fix from rebase

* Redirect raylet output independently of worker output.

* Fix.

* Create redis client with services.create_redis_client.

* Suppress Redis connection error at exit.

* Remove thread_safe_client from redis.

* Shutdown driver threads in ray.shutdown().

* Add warning for too many log messages.

* Only stop threads if worker is connected.

* Only stop threads if they exist.

* Remove unnecessary try/excepts.

* Fix

* Only add new logging handler once.

* Increase timeout.

* Fix tempfile test.

* Fix logging in cluster_utils.

* Revert "Increase timeout."

This reverts commit b3846b89040bcd8e583b2e18cb513cb040e71d95.

* Retry longer when connecting to plasma store from node manager and object manager.

* Close pubsub channels to avoid leaking file descriptors.

* Limit log monitor open files to 200.

* Increase plasma connect retries.

* Add comment.
This commit is contained in:
Robert Nishihara
2019-02-07 19:53:50 -08:00
committed by Philipp Moritz
parent 0aa74fb1fd
commit ef527f84ab
17 changed files with 511 additions and 344 deletions
+4 -28
View File
@@ -401,34 +401,6 @@ class GlobalState(object):
return parse_client_table(self.redis_client)
def log_files(self):
"""Fetch and return a dictionary of log file names to outputs.
Returns:
IP address to log file name to log file contents mappings.
"""
relevant_files = self.redis_client.keys("LOGFILE*")
ip_filename_file = {}
for filename in relevant_files:
filename = decode(filename)
filename_components = filename.split(":")
ip_addr = filename_components[1]
file = self.redis_client.lrange(filename, 0, -1)
file_str = []
for x in file:
y = decode(x)
file_str.append(y)
if ip_addr not in ip_filename_file:
ip_filename_file[ip_addr] = {}
ip_filename_file[ip_addr][filename] = file_str
return ip_filename_file
def _profile_table(self, batch_id):
"""Get the profile events for a given batch of profile events.
@@ -869,6 +841,10 @@ class GlobalState(object):
for resource_id, num_available in available_resources.items():
total_available_resources[resource_id] += num_available
# Close the pubsub clients to avoid leaking file descriptors.
for subscribe_client in subscribe_clients:
subscribe_client.close()
return dict(total_available_resources)
def _error_messages(self, job_id):
+1
View File
@@ -27,6 +27,7 @@ __all__ = [
]
FUNCTION_PREFIX = "RemoteFunction:"
LOG_FILE_CHANNEL = "RAY_LOG_CHANNEL"
# xray heartbeats
XRAY_HEARTBEAT_CHANNEL = str(TablePubsub.HEARTBEAT).encode("ascii")
+34 -21
View File
@@ -5,8 +5,6 @@ from __future__ import print_function
import threading
import traceback
import redis
import ray
from ray import ray_constants
from ray import cloudpickle as pickle
@@ -17,29 +15,35 @@ from ray import utils
class ImportThread(object):
"""A thread used to import exports from the driver or other workers.
Note:
The driver also has an import thread, which is used only to
import custom class definitions from calls to register_custom_serializer
that happen under the hood on workers.
Note: The driver also has an import thread, which is used only to import
custom class definitions from calls to register_custom_serializer that
happen under the hood on workers.
Attributes:
worker: the worker object in this process.
mode: worker mode
redis_client: the redis client used to query exports.
threads_stopped (threading.Event): A threading event used to signal to
the thread that it should exit.
"""
def __init__(self, worker, mode):
def __init__(self, worker, mode, threads_stopped):
self.worker = worker
self.mode = mode
self.redis_client = worker.redis_client
self.threads_stopped = threads_stopped
def start(self):
"""Start the import thread."""
t = threading.Thread(target=self._run, name="ray_import_thread")
self.t = threading.Thread(target=self._run, name="ray_import_thread")
# Making the thread a daemon causes it to exit
# when the main thread exits.
t.daemon = True
t.start()
self.t.daemon = True
self.t.start()
def join_import_thread(self):
"""Wait for the thread to exit."""
self.t.join()
def _run(self):
import_pubsub_client = self.redis_client.pubsub()
@@ -50,14 +54,24 @@ class ImportThread(object):
# Keep track of the number of imports that we've imported.
num_imported = 0
# Get the exports that occurred before the call to subscribe.
with self.worker.lock:
export_keys = self.redis_client.lrange("Exports", 0, -1)
for key in export_keys:
num_imported += 1
self._process_key(key)
try:
for msg in import_pubsub_client.listen():
# Get the exports that occurred before the call to subscribe.
with self.worker.lock:
export_keys = self.redis_client.lrange("Exports", 0, -1)
for key in export_keys:
num_imported += 1
self._process_key(key)
while True:
# Exit if we received a signal that we should stop.
if self.threads_stopped.is_set():
return
msg = import_pubsub_client.get_message()
if msg is None:
self.threads_stopped.wait(timeout=0.01)
continue
with self.worker.lock:
if msg["type"] == "subscribe":
continue
@@ -68,10 +82,9 @@ class ImportThread(object):
num_imported += 1
key = self.redis_client.lindex("Exports", i)
self._process_key(key)
except redis.ConnectionError:
# When Redis terminates the listen call will throw a
# ConnectionError, which we catch here.
pass
finally:
# Close the pubsub client to avoid leaking file descriptors.
import_pubsub_client.close()
def _process_key(self, key):
"""Process the given export key from redis."""
+186 -85
View File
@@ -3,14 +3,14 @@ from __future__ import division
from __future__ import print_function
import argparse
import errno
import logging
import os
import redis
import time
import traceback
import colorama
import ray.ray_constants as ray_constants
from ray.services import get_ip_address
from ray.services import get_port
import ray.utils
# Logger for this module. It should be configured at the entry point
@@ -19,91 +19,185 @@ import ray.utils
logger = logging.getLogger(__name__)
class LogFileInfo(object):
def __init__(self,
filename=None,
size_when_last_opened=None,
file_position=None,
file_handle=None):
assert (filename is not None and size_when_last_opened is not None
and file_position is not None)
self.filename = filename
self.size_when_last_opened = size_when_last_opened
self.file_position = file_position
self.file_handle = file_handle
self.worker_pid = None
class LogMonitor(object):
"""A monitor process for monitoring Ray log files.
This class mantains a list of open files and a list of closed log files. We
can't simply leave all files open because we'll run out of file
descriptors.
The "run" method of this class will cycle between doing several things:
1. First, it will check if any new files have appeared in the log
directory. If so, they will be added to the list of closed files.
2. Then, if we are unable to open any new files, we will close all of the
files.
3. Then, we will open as many closed files as we can that may have new
lines (judged by an increase in file size since the last time the file
was opened).
4. Then we will loop through the open files and see if there are any new
lines in the file. If so, we will publish them to Redis.
Attributes:
node_ip_address: The IP address of the node that the log monitor
process is running on. This will be used to determine which log
files to track.
host (str): The hostname of this machine. Used to improve the log
messages published to Redis.
logs_dir (str): The directory that the log files are in.
redis_client: A client used to communicate with the Redis server.
log_files: A dictionary mapping the name of a log file to a list of
strings representing its contents.
log_file_handles: A dictionary mapping the name of a log file to a file
handle for that file.
log_filenames (set): This is the set of filenames of all files in
open_file_infos and closed_file_infos.
open_file_infos (list[LogFileInfo]): Info for all of the open files.
closed_file_infos (list[LogFileInfo]): Info for all of the closed
files.
can_open_more_files (bool): True if we can still open more files and
false otherwise.
"""
def __init__(self,
redis_ip_address,
redis_port,
node_ip_address,
redis_password=None):
def __init__(self, logs_dir, redis_address, redis_password=None):
"""Initialize the log monitor object."""
self.node_ip_address = node_ip_address
self.redis_client = redis.StrictRedis(
host=redis_ip_address, port=redis_port, password=redis_password)
self.log_files = {}
self.log_file_handles = {}
self.files_to_ignore = set()
self.host = os.uname()[1]
self.logs_dir = logs_dir
self.redis_client = ray.services.create_redis_client(
redis_address, password=redis_password)
self.log_filenames = set()
self.open_file_infos = []
self.closed_file_infos = []
self.can_open_more_files = True
def close_all_files(self):
"""Close all open files (so that we can open more)."""
while len(self.open_file_infos) > 0:
file_info = self.open_file_infos.pop(0)
file_info.file_handle.close()
file_info.file_handle = None
self.closed_file_infos.append(file_info)
self.can_open_more_files = True
def update_log_filenames(self):
"""Get the most up-to-date list of log files to monitor from Redis."""
num_current_log_files = len(self.log_files)
new_log_filenames = self.redis_client.lrange(
"LOG_FILENAMES:{}".format(self.node_ip_address),
num_current_log_files, -1)
for log_filename in new_log_filenames:
logger.info("Beginning to track file {}".format(log_filename))
assert log_filename not in self.log_files
self.log_files[log_filename] = []
"""Update the list of log files to monitor."""
log_filenames = os.listdir(self.logs_dir)
def check_log_files_and_push_updates(self):
"""Get any changes to the log files and push updates to Redis."""
for log_filename in self.log_files:
if log_filename in self.log_file_handles:
# Get any updates to the file.
new_lines = []
while True:
current_position = (
self.log_file_handles[log_filename].tell())
next_line = self.log_file_handles[log_filename].readline()
if next_line != "":
new_lines.append(next_line)
else:
self.log_file_handles[log_filename].seek(
current_position)
break
for log_filename in log_filenames:
full_path = os.path.join(self.logs_dir, log_filename)
if full_path not in self.log_filenames:
self.log_filenames.add(full_path)
self.closed_file_infos.append(
LogFileInfo(
filename=full_path,
size_when_last_opened=0,
file_position=0,
file_handle=None))
logger.info("Beginning to track file {}".format(log_filename))
# If there are any new lines, cache them and also push them to
# Redis.
if len(new_lines) > 0:
self.log_files[log_filename] += new_lines
redis_key = "LOGFILE:{}:{}".format(
self.node_ip_address, ray.utils.decode(log_filename))
self.redis_client.rpush(redis_key, *new_lines)
def open_closed_files(self):
"""Open some closed files if they may have new lines.
# Pass if we already failed to open the log file.
elif log_filename in self.files_to_ignore:
pass
Opening more files may require us to close some of the already open
files.
"""
if not self.can_open_more_files:
# If we can't open any more files. Close all of the files.
self.close_all_files()
# Try to open this file for the first time.
else:
files_with_no_updates = []
while len(self.closed_file_infos) > 0:
if (len(self.open_file_infos) >=
ray_constants.LOG_MONITOR_MAX_OPEN_FILES):
self.can_open_more_files = False
break
file_info = self.closed_file_infos.pop(0)
assert file_info.file_handle is None
# Get the file size to see if it has gotten bigger since we last
# opened it.
try:
file_size = os.path.getsize(file_info.filename)
except (IOError, OSError) as e:
# Catch "file not found" errors.
if e.errno == errno.ENOENT:
logger.warning("Warning: The file {} was not "
"found.".format(file_info.filename))
self.log_filenames.remove(file_info.filename)
continue
raise e
# If some new lines have been added to this file, try to reopen the
# file.
if file_size > file_info.size_when_last_opened:
try:
self.log_file_handles[log_filename] = open(
log_filename, "r")
except IOError as e:
if e.errno == os.errno.EMFILE:
logger.warning(
"Warning: Ignoring {} because there are too "
"many open files.".format(log_filename))
elif e.errno == os.errno.ENOENT:
f = open(file_info.filename, "r")
except (IOError, OSError) as e:
if e.errno == errno.ENOENT:
logger.warning("Warning: The file {} was not "
"found.".format(log_filename))
"found.".format(file_info.filename))
self.log_filenames.remove(file_info.filename)
continue
else:
raise e
# Don't try to open this file any more.
self.files_to_ignore.add(log_filename)
f.seek(file_info.file_position)
file_info.filesize_when_last_opened = file_size
file_info.file_handle = f
self.open_file_infos.append(file_info)
else:
files_with_no_updates.append(file_info)
# Add the files with no changes back to the list of closed files.
self.closed_file_infos += files_with_no_updates
def check_log_files_and_publish_updates(self):
"""Get any changes to the log files and push updates to Redis."""
for file_info in self.open_file_infos:
assert not file_info.file_handle.closed
lines_to_publish = []
max_num_lines_to_read = 100
for _ in range(max_num_lines_to_read):
next_line = file_info.file_handle.readline()
if next_line == "":
break
if next_line[-1] == "\n":
next_line = next_line[:-1]
lines_to_publish.append(next_line)
# Publish the lines if this is a worker process.
filename = file_info.filename.split("/")[-1]
is_worker = (filename.startswith("worker")
and (filename.endswith("out")
or filename.endswith("err")))
output_type = "stdout" if filename.endswith("out") else "stderr"
if is_worker and file_info.file_position == 0:
if (len(lines_to_publish) > 0 and
lines_to_publish[0].startswith("Ray worker pid: ")):
file_info.worker_pid = int(
lines_to_publish[0].split(" ")[-1])
lines_to_publish = lines_to_publish[1:]
# Record the current position in the file.
file_info.file_position = file_info.file_handle.tell()
if len(lines_to_publish) > 0 and is_worker:
lines_to_publish.insert(
0, "{}{}{} (pid={}, host={})".format(
colorama.Fore.CYAN, "worker ({})".format(output_type),
colorama.Fore.RESET, file_info.worker_pid, self.host))
self.redis_client.publish(ray.gcs_utils.LOG_FILE_CHANNEL,
"\n".join(lines_to_publish))
def run(self):
"""Run the log monitor.
@@ -113,8 +207,8 @@ class LogMonitor(object):
"""
while True:
self.update_log_filenames()
self.check_log_files_and_push_updates()
time.sleep(1)
self.open_closed_files()
self.check_log_files_and_publish_updates()
if __name__ == "__main__":
@@ -127,11 +221,6 @@ if __name__ == "__main__":
required=True,
type=str,
help="The address to use for Redis.")
parser.add_argument(
"--node-ip-address",
required=True,
type=str,
help="The IP address of the node this process is on.")
parser.add_argument(
"--redis-password",
required=False,
@@ -151,15 +240,27 @@ if __name__ == "__main__":
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
parser.add_argument(
"--logs-dir",
required=True,
type=str,
help="Specify the path of the temporary directory used by Ray "
"processes.")
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)
log_monitor = LogMonitor(
redis_ip_address,
redis_port,
args.node_ip_address,
redis_password=args.redis_password)
log_monitor.run()
args.logs_dir, args.redis_address, redis_password=args.redis_password)
try:
log_monitor.run()
except Exception as e:
# Something went wrong, so push an error to all drivers.
redis_client = ray.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The log monitor on node {} failed with the following "
"error:\n{}".format(os.uname()[1], traceback_str))
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.LOG_MONITOR_DIED_ERROR, message)
raise e
+14 -17
View File
@@ -36,17 +36,15 @@ class Monitor(object):
receive notifications about failed components.
"""
def __init__(self,
redis_address,
redis_port,
autoscaling_config,
redis_password=None):
def __init__(self, redis_address, autoscaling_config, redis_password=None):
# Initialize the Redis clients.
self.state = ray.experimental.state.GlobalState()
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)
self.state._initialize_global_state(
redis_address, redis_port, redis_password=redis_password)
self.redis = redis.StrictRedis(
host=redis_address, port=redis_port, db=0, password=redis_password)
redis_ip_address, redis_port, redis_password=redis_password)
self.redis = ray.services.create_redis_client(
redis_address, password=redis_password)
# Setup subscriptions to the primary Redis server and the Redis shards.
self.primary_subscribe_client = self.redis.pubsub(
ignore_subscribe_messages=True)
@@ -88,6 +86,11 @@ class Monitor(object):
str(e)))
self.issue_gcs_flushes = False
def __del__(self):
"""Destruct the monitor object."""
# We close the pubsub client to avoid leaking file descriptors.
self.primary_subscribe_client.close()
def subscribe(self, channel):
"""Subscribe to the given channel on the primary Redis shard.
@@ -366,17 +369,13 @@ if __name__ == "__main__":
args = parser.parse_args()
setup_logger(args.logging_level, args.logging_format)
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)
if args.autoscaling_config:
autoscaling_config = os.path.expanduser(args.autoscaling_config)
else:
autoscaling_config = None
monitor = Monitor(
redis_ip_address,
redis_port,
args.redis_address,
autoscaling_config,
redis_password=args.redis_password)
@@ -384,10 +383,8 @@ if __name__ == "__main__":
monitor.run()
except Exception as e:
# Something went wrong, so push an error to all drivers.
redis_client = redis.StrictRedis(
host=redis_ip_address,
port=redis_port,
password=args.redis_password)
redis_client = ray.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = "The monitor failed with the following error:\n{}".format(
traceback_str)
+2 -4
View File
@@ -149,7 +149,6 @@ class Node(object):
stdout_file, stderr_file = new_log_monitor_log_file()
process_info = ray.services.start_log_monitor(
self.redis_address,
self._node_ip_address,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password)
@@ -189,8 +188,7 @@ class Node(object):
object_store_memory=self._ray_params.object_store_memory,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
plasma_store_socket_name=self._plasma_store_socket_name,
redis_password=self._ray_params.redis_password)
plasma_store_socket_name=self._plasma_store_socket_name)
assert (
ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] = [
@@ -212,7 +210,7 @@ class Node(object):
or get_raylet_socket_name())
self.prepare_socket_file(self._raylet_socket_name)
stdout_file, stderr_file = new_raylet_log_file(
redirect_output=self._ray_params.redirect_worker_output)
redirect_output=self._ray_params.redirect_output)
process_info = ray.services.start_raylet(
self._redis_address,
self._node_ip_address,
+1 -1
View File
@@ -93,7 +93,7 @@ class RayParams(object):
num_workers=None,
local_mode=False,
driver_mode=None,
redirect_worker_output=False,
redirect_worker_output=True,
redirect_output=True,
num_redis_shards=None,
redis_max_clients=None,
+21 -23
View File
@@ -69,21 +69,28 @@ class Profiler(object):
worker: the worker to profile.
events: the buffer of events.
lock: the lock to protect access of events.
threads_stopped (threading.Event): A threading event used to signal to
the thread that it should exit.
"""
def __init__(self, worker):
def __init__(self, worker, threads_stopped):
self.worker = worker
self.events = []
self.lock = threading.Lock()
self.threads_stopped = threads_stopped
def start_flush_thread(self):
t = threading.Thread(
self.t = threading.Thread(
target=self._periodically_flush_profile_events,
name="ray_push_profiling_information")
# Making the thread a daemon causes it to exit when the main thread
# exits.
t.daemon = True
t.start()
self.t.daemon = True
self.t.start()
def join_flush_thread(self):
"""Wait for the flush thread to exit."""
self.t.join()
def _periodically_flush_profile_events(self):
"""Drivers run this as a thread to flush profile data in the
@@ -92,28 +99,19 @@ class Profiler(object):
# the local scheduler client. This should be ok because it doesn't read
# from the local scheduler client and we have the GIL here. However,
# if either of those things changes, then we could run into issues.
try:
while True:
time.sleep(1)
self.flush_profile_data()
except AttributeError:
# TODO(suquark): It is a bad idea to ignore "AttributeError".
# It has caused some very unexpected behaviors when implementing
# new features (related to AttributeError).
while True:
# Sleep for 1 second. This will be interrupted if
# self.threads_stopped is set.
self.threads_stopped.wait(timeout=1)
# This is to suppress errors that occur at shutdown.
pass
# Exit if we received a signal that we should stop.
if self.threads_stopped.is_set():
return
self.flush_profile_data()
def flush_profile_data(self):
"""Push the logged profiling data to the global control store.
By default, profiling information for a given task won't appear in the
timeline until after the task has completed. For very long-running
tasks, we may want profiling information to appear more quickly.
In such cases, this function can be called. Note that as an
alternative, we could start a thread in the background on workers that
calls this automatically.
"""
"""Push the logged profiling data to the global control store."""
with self.lock:
events = self.events
self.events = []
+3
View File
@@ -53,6 +53,7 @@ PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction"
INFEASIBLE_TASK_ERROR = "infeasible_task"
REMOVED_NODE_ERROR = "node_removed"
MONITOR_DIED_ERROR = "monitor_died"
LOG_MONITOR_DIED_ERROR = "log_monitor_died"
# Abort autoscaling if more than this number of errors are encountered. This
# is a safety feature to prevent e.g. runaway node launches.
@@ -100,3 +101,5 @@ PROCESS_TYPE_RAYLET = "raylet"
PROCESS_TYPE_PLASMA_STORE = "plasma_store"
PROCESS_TYPE_REDIS_SERVER = "redis_server"
PROCESS_TYPE_WEB_UI = "web_ui"
LOG_MONITOR_MAX_OPEN_FILES = 200
+4 -66
View File
@@ -188,34 +188,6 @@ def get_node_ip_address(address="8.8.8.8:53"):
return node_ip_address
def record_log_files_in_redis(redis_address,
node_ip_address,
log_files,
password=None):
"""Record in Redis that a new log file has been created.
This is used so that each log monitor can check Redis and figure out which
log files it is reponsible for monitoring.
Args:
redis_address: The address of the redis server.
node_ip_address: The IP address of the node that the log file exists
on.
log_files: A list of file handles for the log files. If one of the file
handles is None, we ignore it.
password (str): The password of the redis server.
"""
for log_file in log_files:
if log_file is not None:
redis_ip_address, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(
host=redis_ip_address, port=redis_port, password=password)
# The name of the key storing the list of log filenames for this IP
# address.
log_file_list_key = "LOG_FILENAMES:{}".format(node_ip_address)
redis_client.rpush(log_file_list_key, log_file.name)
def create_redis_client(redis_address, password=None):
"""Create a Redis client.
@@ -584,12 +556,6 @@ def start_redis(node_ip_address,
processes.append(p)
redis_address = address(node_ip_address, port)
# Record the log files in Redis.
record_log_files_in_redis(
redis_address,
node_ip_address, [redis_stdout_file, redis_stderr_file],
password=password)
# Register the number of Redis shards in the primary shard, so that clients
# know how many redis shards to expect under RedisShards.
primary_redis_client = redis.StrictRedis(
@@ -650,11 +616,6 @@ def start_redis(node_ip_address,
# Store redis shard information in the primary redis shard.
primary_redis_client.rpush("RedisShards", shard_address)
record_log_files_in_redis(
redis_address,
node_ip_address, [redis_stdout_file, redis_stderr_file],
password=password)
if use_credis:
# Configure the chain state. The way it is intended to work is
# the following:
@@ -834,7 +795,6 @@ def _start_redis_instance(executable,
def start_log_monitor(redis_address,
node_ip_address,
stdout_file=None,
stderr_file=None,
redis_password=None):
@@ -842,8 +802,6 @@ def start_log_monitor(redis_address,
Args:
redis_address (str): The address of the Redis instance.
node_ip_address (str): The IP address of the node that this log monitor
is running on.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
@@ -856,8 +814,9 @@ def start_log_monitor(redis_address,
log_monitor_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "log_monitor.py")
command = [
sys.executable, "-u", log_monitor_filepath, "--redis-address",
redis_address, "--node-ip-address", node_ip_address
sys.executable, "-u", log_monitor_filepath,
"--redis-address={}".format(redis_address), "--logs-dir={}".format(
get_logs_dir_path())
]
if redis_password:
command += ["--redis-password", redis_password]
@@ -866,10 +825,6 @@ def start_log_monitor(redis_address,
ray_constants.PROCESS_TYPE_LOG_MONITOR,
stdout_file=stdout_file,
stderr_file=stderr_file)
record_log_files_in_redis(
redis_address,
node_ip_address, [stdout_file, stderr_file],
password=redis_password)
return process_info
@@ -1127,10 +1082,6 @@ def start_raylet(redis_address,
use_perftools_profiler=("RAYLET_PERFTOOLS_PATH" in os.environ),
stdout_file=stdout_file,
stderr_file=stderr_file)
record_log_files_in_redis(
redis_address,
node_ip_address, [stdout_file, stderr_file],
password=redis_password)
return process_info
@@ -1322,8 +1273,7 @@ def start_plasma_store(node_ip_address,
object_store_memory=None,
plasma_directory=None,
huge_pages=False,
plasma_store_socket_name=None,
redis_password=None):
plasma_store_socket_name=None):
"""This method starts an object store process.
Args:
@@ -1340,7 +1290,6 @@ def start_plasma_store(node_ip_address,
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
redis_password (str): The password of the redis server.
Returns:
ProcessInfo for the process that was started.
@@ -1368,11 +1317,6 @@ def start_plasma_store(node_ip_address,
huge_pages=huge_pages,
socket_name=plasma_store_socket_name)
record_log_files_in_redis(
redis_address,
node_ip_address, [stdout_file, stderr_file],
password=redis_password)
return process_info
@@ -1414,8 +1358,6 @@ def start_worker(node_ip_address,
ray_constants.PROCESS_TYPE_WORKER,
stdout_file=stdout_file,
stderr_file=stderr_file)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
return process_info
@@ -1456,10 +1398,6 @@ def start_monitor(redis_address,
ray_constants.PROCESS_TYPE_MONITOR,
stdout_file=stdout_file,
stderr_file=stderr_file)
record_log_files_in_redis(
redis_address,
node_ip_address, [stdout_file, stderr_file],
password=redis_password)
return process_info
+4 -14
View File
@@ -276,21 +276,11 @@ def setup_logger(logging_level, logging_format):
logging_level = logging.getLevelName(logging_level.upper())
logger.setLevel(logging_level)
global _default_handler
_default_handler = logging.StreamHandler()
_default_handler.setFormatter(logging.Formatter(logging_format))
logger.addHandler(_default_handler)
logger.propagate = False
def try_update_handler(new_stream):
global _default_handler
logger = logging.getLogger("ray")
if _default_handler:
new_handler = logging.StreamHandler(stream=new_stream)
new_handler.setFormatter(_default_handler.formatter)
_default_handler.close()
_default_handler = new_handler
if _default_handler is None:
_default_handler = logging.StreamHandler()
logger.addHandler(_default_handler)
_default_handler.setFormatter(logging.Formatter(logging_format))
logger.propagate = False
# This function is copied and modified from
+154 -45
View File
@@ -39,7 +39,7 @@ from ray import profiling
from ray.function_manager import (FunctionActorManager, FunctionDescriptor)
import ray.parameter
from ray.utils import (check_oversized_pickle, is_cython, random_string,
thread_safe_client, setup_logger, try_update_handler)
thread_safe_client, setup_logger)
SCRIPT_MODE = 0
WORKER_MODE = 1
@@ -158,6 +158,9 @@ class Worker(object):
# It is a DriverID.
self.task_driver_id = DriverID.nil()
self._task_context = threading.local()
# This event is checked regularly by all of the threads so that they
# know when to exit.
self.threads_stopped = threading.Event()
@property
def task_context(self):
@@ -1219,12 +1222,13 @@ def init(redis_address=None,
resources=None,
object_store_memory=None,
redis_max_memory=None,
log_to_driver=True,
node_ip_address=None,
object_id_seed=None,
num_workers=None,
local_mode=False,
driver_mode=None,
redirect_worker_output=False,
redirect_worker_output=True,
redirect_output=True,
ignore_reinit_error=False,
num_redis_shards=None,
@@ -1281,6 +1285,8 @@ def init(redis_address=None,
LRU eviction of entries. This only applies to the sharded redis
tables (task, object, and profile tables). By default, this is
capped at 10GB but can be set higher.
log_to_driver (bool): If true, then output from all of the worker
processes on all nodes will be directed to the driver.
node_ip_address (str): The IP address of the node that we are on.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
@@ -1490,6 +1496,7 @@ def init(redis_address=None,
redis_password=redis_password,
object_id_seed=object_id_seed,
mode=driver_mode,
log_to_driver=log_to_driver,
worker=global_worker,
driver_id=driver_id)
@@ -1523,10 +1530,6 @@ def shutdown(worker=global_worker):
will need to reload the module.
"""
disconnect(worker)
if hasattr(worker, "raylet_client"):
del worker.raylet_client
if hasattr(worker, "plasma_client"):
worker.plasma_client.disconnect()
# Shut down the Ray processes.
global _global_node
@@ -1564,18 +1567,72 @@ last_task_error_raise_time = 0
UNCAUGHT_ERROR_GRACE_PERIOD = 5
def print_error_messages_raylet(task_error_queue):
def print_logs(redis_client, threads_stopped):
"""Prints log messages from workers on all of the nodes.
Args:
redis_client: A client to the primary Redis shard.
threads_stopped (threading.Event): A threading event used to signal to
the thread that it should exit.
"""
pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True)
pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL)
try:
# Keep track of the number of consecutive log messages that have been
# received with no break in between. If this number grows continually,
# then the worker is probably not able to process the log messages as
# rapidly as they are coming in.
num_consecutive_messages_received = 0
while True:
# Exit if we received a signal that we should stop.
if threads_stopped.is_set():
return
msg = pubsub_client.get_message()
if msg is None:
num_consecutive_messages_received = 0
threads_stopped.wait(timeout=0.01)
continue
num_consecutive_messages_received += 1
print(ray.utils.decode(msg["data"]), file=sys.stderr)
if (num_consecutive_messages_received % 100 == 0
and num_consecutive_messages_received > 0):
logger.warning(
"The driver may not be able to keep up with the "
"stdout/stderr of the workers. To avoid forwarding logs "
"to the driver, use 'ray.init(log_to_driver=False)'.")
finally:
# Close the pubsub client to avoid leaking file descriptors.
pubsub_client.close()
def print_error_messages_raylet(task_error_queue, threads_stopped):
"""Prints message received in the given output queue.
This checks periodically if any un-raised errors occured in the background.
Args:
task_error_queue (queue.Queue): A queue used to receive errors from the
thread that listens to Redis.
threads_stopped (threading.Event): A threading event used to signal to
the thread that it should exit.
"""
while True:
error, t = task_error_queue.get()
# Exit if we received a signal that we should stop.
if threads_stopped.is_set():
return
try:
error, t = task_error_queue.get(block=False)
except queue.Empty:
threads_stopped.wait(timeout=0.01)
continue
# Delay errors a little bit of time to attempt to suppress redundant
# messages originating from the worker.
while t + UNCAUGHT_ERROR_GRACE_PERIOD > time.time():
time.sleep(1)
threads_stopped.wait(timeout=1)
if t < last_task_error_raise_time + UNCAUGHT_ERROR_GRACE_PERIOD:
logger.debug("Suppressing error from worker: {}".format(error))
else:
@@ -1583,11 +1640,18 @@ def print_error_messages_raylet(task_error_queue):
"Possible unhandled error from worker: {}".format(error))
def listen_error_messages_raylet(worker, task_error_queue):
def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
"""Listen to error messages in the background on the driver.
This runs in a separate thread on the driver and pushes (error, time)
tuples to the output queue.
Args:
worker: The worker class that this thread belongs to.
task_error_queue (queue.Queue): A queue used to communicate with the
thread that prints the errors found by this thread.
threads_stopped (threading.Event): A threading event used to signal to
the thread that it should exit.
"""
worker.error_message_pubsub_client = worker.redis_client.pubsub(
ignore_subscribe_messages=True)
@@ -1602,15 +1666,22 @@ def listen_error_messages_raylet(worker, task_error_queue):
worker.error_message_pubsub_client.subscribe(error_pubsub_channel)
# worker.error_message_pubsub_client.psubscribe("*")
# Get the exports that occurred before the call to subscribe.
with worker.lock:
error_messages = global_state.error_messages(worker.task_driver_id)
for error_message in error_messages:
logger.error(error_message)
try:
for msg in worker.error_message_pubsub_client.listen():
# Get the exports that occurred before the call to subscribe.
with worker.lock:
error_messages = global_state.error_messages(worker.task_driver_id)
for error_message in error_messages:
logger.error(error_message)
while True:
# Exit if we received a signal that we should stop.
if threads_stopped.is_set():
return
msg = worker.error_message_pubsub_client.get_message()
if msg is None:
threads_stopped.wait(timeout=0.01)
continue
gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
msg["data"], 0)
assert gcs_entry.EntriesLength() == 1
@@ -1630,11 +1701,9 @@ def listen_error_messages_raylet(worker, task_error_queue):
task_error_queue.put((error_message, time.time()))
else:
logger.error(error_message)
except redis.ConnectionError:
# When Redis terminates the listen call will throw a ConnectionError,
# which we catch here.
pass
finally:
# Close the pubsub client to avoid leaking file descriptors.
worker.error_message_pubsub_client.close()
def is_initialized():
@@ -1650,6 +1719,7 @@ def connect(info,
redis_password=None,
object_id_seed=None,
mode=WORKER_MODE,
log_to_driver=False,
worker=global_worker,
driver_id=None):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
@@ -1665,6 +1735,8 @@ def connect(info,
manner. However, the same ID should not be used for different jobs.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and
LOCAL_MODE.
log_to_driver (bool): If true, then output from all of the worker
processes on all nodes will be directed to the driver.
worker: The ray.Worker instance.
driver_id: The ID of driver. If it's None, then we will generate one.
"""
@@ -1677,7 +1749,7 @@ def connect(info,
if not faulthandler.is_enabled():
faulthandler.enable(all_threads=False)
worker.profiler = profiling.Profiler(worker)
worker.profiler = profiling.Profiler(worker, worker.threads_stopped)
# Initialize some fields.
if mode is WORKER_MODE:
@@ -1717,11 +1789,11 @@ def connect(info,
# Create a Redis client.
redis_ip_address, redis_port = info["redis_address"].split(":")
worker.redis_client = thread_safe_client(
redis.StrictRedis(
host=redis_ip_address,
port=int(redis_port),
password=redis_password))
# The Redis client can safely be shared between threads. However, that is
# not true of Redis pubsub clients. See the documentation at
# https://github.com/andymccurdy/redis-py#thread-safety.
worker.redis_client = redis.StrictRedis(
host=redis_ip_address, port=int(redis_port), password=redis_password)
# For driver's check that the version information matches the version
# information that the Ray cluster was started with.
@@ -1777,13 +1849,19 @@ def connect(info,
log_stdout_file, log_stderr_file = (
tempfile_services.new_worker_redirected_log_file(
worker.worker_id))
sys.stdout = log_stdout_file
sys.stderr = log_stderr_file
try_update_handler(sys.stderr)
services.record_log_files_in_redis(
info["redis_address"],
info["node_ip_address"], [log_stdout_file, log_stderr_file],
password=redis_password)
# Redirect stdout/stderr at the file descriptor level. If we simply
# set sys.stdout and sys.stderr, then logging from C++ can fail to
# be redirected.
os.dup2(log_stdout_file.fileno(), sys.stdout.fileno())
os.dup2(log_stderr_file.fileno(), sys.stderr.fileno())
# This should always be the first message to appear in the worker's
# stdout and stderr log files. The string "Ray worker pid:" is
# parsed in the log monitor process.
print("Ray worker pid: {}".format(os.getpid()))
print("Ray worker pid: {}".format(os.getpid()), file=sys.stderr)
sys.stdout.flush()
sys.stderr.flush()
# Register the worker with Redis.
worker_dict = {
"node_ip_address": worker.node_ip_address,
@@ -1799,7 +1877,7 @@ def connect(info,
# Create an object store client.
worker.plasma_client = thread_safe_client(
plasma.connect(info["store_socket_name"]))
plasma.connect(info["store_socket_name"], None, 0, 300))
raylet_socket = info["raylet_socket_name"]
@@ -1866,7 +1944,9 @@ def connect(info,
)
# Start the import thread
import_thread.ImportThread(worker, mode).start()
worker.import_thread = import_thread.ImportThread(worker, mode,
worker.threads_stopped)
worker.import_thread.start()
# If this is a driver running in SCRIPT_MODE, start a thread to print error
# messages asynchronously in the background. Ideally the scheduler would
@@ -1876,18 +1956,25 @@ def connect(info,
# scheduler for new error messages.
if mode == SCRIPT_MODE:
q = queue.Queue()
listener = threading.Thread(
worker.listener_thread = threading.Thread(
target=listen_error_messages_raylet,
name="ray_listen_error_messages",
args=(worker, q))
printer = threading.Thread(
args=(worker, q, worker.threads_stopped))
worker.printer_thread = threading.Thread(
target=print_error_messages_raylet,
name="ray_print_error_messages",
args=(q, ))
listener.daemon = True
listener.start()
printer.daemon = True
printer.start()
args=(q, worker.threads_stopped))
worker.listener_thread.daemon = True
worker.listener_thread.start()
worker.printer_thread.daemon = True
worker.printer_thread.start()
if log_to_driver:
worker.logger_thread = threading.Thread(
target=print_logs,
name="ray_print_logs",
args=(worker.redis_client, worker.threads_stopped))
worker.logger_thread.daemon = True
worker.logger_thread.start()
# If we are using the raylet code path and we are not in local mode, start
# a background thread to periodically flush profiling data to the GCS.
@@ -1928,11 +2015,33 @@ def disconnect(worker=global_worker):
# remote functions or actors are defined and then connect is called again,
# the remote functions will be exported. This is mostly relevant for the
# tests.
if worker.connected:
# Shutdown all of the threads that we've started. TODO(rkn): This
# should be handled cleanly in the worker object's destructor and not
# in this disconnect method.
worker.threads_stopped.set()
if hasattr(worker, "import_thread"):
worker.import_thread.join_import_thread()
if hasattr(worker, "profiler") and hasattr(worker.profiler, "t"):
worker.profiler.join_flush_thread()
if hasattr(worker, "listener_thread"):
worker.listener_thread.join()
if hasattr(worker, "printer_thread"):
worker.printer_thread.join()
if hasattr(worker, "logger_thread"):
worker.logger_thread.join()
worker.threads_stopped.clear()
worker.connected = False
worker.cached_functions_to_run = []
worker.function_actor_manager.reset_cache()
worker.serialization_context_map.clear()
if hasattr(worker, "raylet_client"):
del worker.raylet_client
if hasattr(worker, "plasma_client"):
worker.plasma_client.disconnect()
@contextmanager
def _changeproctitle(title, next_title):