[Core] Log output from different jobs to different drivers. (#8885)

* .

* .

* Correct now

* No interactivity errors

* format

* Filtering

* lint

* .

* No more filtering

* Removed interactivity

* .

* .

* .

* .

* .

* .

* Redirection works

* formatting

* something broken?

* .

* Works

* formatting

* redirect output

* formatting

* formatting

* Fix file descriptor leakage

* format

* .

* .

* .

* .

* .

* Refactor

* .

* Only run on job switch

* .

* cleanup

* .

* ...

* Review

* .

* .

* .

* .

* whoops

* .

* Should fix bug

* .

* .

* addressed comments

* formatting

* formatting

* Fix typo

* .

* .

* .

* .

Co-authored-by: Ubuntu <ubuntu@ip-172-31-14-33.us-west-2.compute.internal>
This commit is contained in:
Alex Wu
2020-06-23 18:45:32 -07:00
committed by GitHub
parent acb3280235
commit c152730e4a
5 changed files with 213 additions and 67 deletions
+14
View File
@@ -308,6 +308,19 @@ cdef prepare_args(
core_worker.put_serialized_object(serialized_arg)))))
def switch_worker_log_if_needed(worker, next_job_id):
if worker.mode != ray.WORKER_MODE:
return
if (worker.current_logging_job_id is None) or \
(worker.current_logging_job_id != next_job_id):
job_stdout_path, job_stderr_path = (
worker.node.get_job_redirected_log_file(
worker.worker_id, next_job_id.binary())
)
ray.worker.set_log_file(job_stdout_path, job_stderr_path)
worker.current_logging_job_id = next_job_id
cdef execute_task(
CTaskType task_type,
const CRayFunction &ray_function,
@@ -444,6 +457,7 @@ cdef execute_task(
with core_worker.profile_event(b"task:execute"):
task_exception = True
try:
switch_worker_log_if_needed(worker, job_id)
with ray.worker._changeproctitle(title, next_title):
outputs = function_executor(*args, **kwargs)
task_exception = False
+82 -24
View File
@@ -17,7 +17,7 @@ import ray.ray_constants as ray_constants
import ray.services
import ray.utils
from ray.resource_spec import ResourceSpec
from ray.utils import try_to_create_directory, try_to_symlink
from ray.utils import try_to_create_directory, try_to_symlink, open_log
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
@@ -383,14 +383,16 @@ class Node:
raise FileExistsError(errno.EEXIST,
"No usable temporary filename found")
def new_log_files(self, name):
def get_log_file_names(self, name, unique=False):
"""Generate partially randomized filenames for log files.
Args:
name (str): descriptive string for this log file.
unique (bool): if true, a counter will be attached to `name` to
ensure the returned filename is not already used.
Returns:
A tuple of two file handles for redirecting (stdout, stderr).
A tuple of two file names for redirecting (stdout, stderr).
"""
redirect_output = self._ray_params.redirect_output
@@ -401,14 +403,15 @@ class Node:
if not redirect_output:
return None, None
log_stdout = self._make_inc_temp(
suffix=".out", prefix=name, directory_name=self._logs_dir)
log_stderr = self._make_inc_temp(
suffix=".err", prefix=name, directory_name=self._logs_dir)
# Line-buffer the output (mode 1).
log_stdout_file = open(log_stdout, "a", buffering=1)
log_stderr_file = open(log_stderr, "a", buffering=1)
return log_stdout_file, log_stderr_file
if unique:
log_stdout = self._make_inc_temp(
suffix=".out", prefix=name, directory_name=self._logs_dir)
log_stderr = self._make_inc_temp(
suffix=".err", prefix=name, directory_name=self._logs_dir)
else:
log_stdout = os.path.join(self._logs_dir, "{}.out".format(name))
log_stderr = os.path.join(self._logs_dir, "{}.err".format(name))
return log_stdout, log_stderr
def _get_unused_port(self, close_on_exit=True):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -487,9 +490,15 @@ class Node:
def start_redis(self):
"""Start the Redis servers."""
assert self._redis_address is None
redis_log_files = [self.new_log_files("redis")]
redis_out_name, redis_err_name = self.get_log_file_names(
"redis", unique=True)
redis_log_files = [(open_log(redis_out_name),
open_log(redis_err_name))]
for i in range(self._ray_params.num_redis_shards):
redis_log_files.append(self.new_log_files("redis-shard_" + str(i)))
shard_out_name, shard_err_name = self.get_log_file_names(
"redis-shard_{}".format(i), unique=True)
redis_log_files.append((open_log(shard_out_name),
open_log(shard_err_name)))
(self._redis_address, redis_shards,
process_infos) = ray.services.start_redis(
@@ -511,7 +520,10 @@ class Node:
def start_log_monitor(self):
"""Start the log monitor."""
stdout_file, stderr_file = self.new_log_files("log_monitor")
log_out_name, log_err_name = self.get_log_file_names(
"log_monitor", unique=True)
stdout_file, stderr_file = open_log(log_out_name), open_log(
log_err_name)
process_info = ray.services.start_log_monitor(
self.redis_address,
self._logs_dir,
@@ -526,7 +538,10 @@ class Node:
def start_reporter(self):
"""Start the reporter."""
stdout_file, stderr_file = self.new_log_files("reporter")
reporter_out_name, reporter_err_name = self.get_log_file_names(
"reporter", unique=True)
stdout_file, stderr_file = (open_log(reporter_out_name),
open_log(reporter_err_name))
process_info = ray.services.start_reporter(
self.redis_address,
stdout_file=stdout_file,
@@ -547,7 +562,10 @@ class Node:
if we fail to start the dashboard. Otherwise it will print
a warning if we fail to start the dashboard.
"""
stdout_file, stderr_file = self.new_log_files("dashboard")
dashboard_out_name, dashboard_err_name = self.get_log_file_names(
"dashboard", unique=True)
stdout_file, stderr_file = (open_log(dashboard_out_name),
open_log(dashboard_err_name))
self._webui_url, process_info = ray.services.start_dashboard(
require_dashboard,
self._ray_params.dashboard_host,
@@ -568,7 +586,10 @@ class Node:
def start_plasma_store(self):
"""Start the plasma store."""
stdout_file, stderr_file = self.new_log_files("plasma_store")
plasma_out_name, plasma_err_name = self.get_log_file_names(
"plasma_store", unique=True)
stdout_file, stderr_file = (open_log(plasma_out_name),
open_log(plasma_err_name))
process_info = ray.services.start_plasma_store(
self.get_resource_spec(),
self._plasma_store_socket_name,
@@ -587,7 +608,10 @@ class Node:
def start_gcs_server(self):
"""Start the gcs server.
"""
stdout_file, stderr_file = self.new_log_files("gcs_server")
gcs_out_name, gcs_err_name = self.get_log_file_names(
"gcs_server", unique=True)
stdout_file, stderr_file = (open_log(gcs_out_name),
open_log(gcs_err_name))
process_info = ray.services.start_gcs_server(
self._redis_address,
stdout_file=stdout_file,
@@ -610,7 +634,9 @@ class Node:
use_profiler (bool): True if we should start the process in the
valgrind profiler.
"""
stdout_file, stderr_file = self.new_log_files("raylet")
raylet_out_name, raylet_err_name = self.get_log_file_names("raylet")
stdout_file, stderr_file = (open_log(raylet_out_name),
open_log(raylet_err_name))
process_info = ray.services.start_raylet(
self._redis_address,
self._node_ip_address,
@@ -640,10 +666,39 @@ class Node:
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
def new_worker_redirected_log_file(self, worker_id):
"""Create new logging files for workers to redirect its output."""
worker_stdout_file, worker_stderr_file = (
self.new_log_files("worker-" + ray.utils.binary_to_hex(worker_id)))
def get_job_redirected_log_file(self,
worker_id: bytes,
job_id: bytes = None):
"""Determines (but does not create) logging files for workers to
redirect its output.
Args:
worker_id (bytes): A byte representation of the worker id.
job_id (bytes): A byte representation of the job id. If None,
provides a generic log file for the worker.
Returns:
(tuple) The stdout and stderr file names that the job should be
redirected to.
"""
redirect_output = self._ray_params.redirect_output
if redirect_output is None:
# Make the default behavior match that of glog.
redirect_output = os.getenv("GLOG_logtostderr") != "1"
if not redirect_output:
return None, None
if job_id is not None:
name = "worker-{}-{}".format(
ray.utils.binary_to_hex(worker_id),
ray.utils.binary_to_hex(job_id))
else:
name = "worker-{}".format(ray.utils.binary_to_hex(worker_id))
worker_stdout_file, worker_stderr_file = self.get_log_file_names(
name, unique=False)
return worker_stdout_file, worker_stderr_file
def start_worker(self):
@@ -652,7 +707,10 @@ class Node:
def start_monitor(self):
"""Start the monitor."""
stdout_file, stderr_file = self.new_log_files("monitor")
monitor_out_name, monitor_err_name = self.get_log_file_names(
"monitor", unique=True)
stdout_file, stderr_file = (open_log(monitor_out_name),
open_log(monitor_err_name))
process_info = ray.services.start_monitor(
self._redis_address,
stdout_file=stdout_file,
+4 -3
View File
@@ -150,11 +150,12 @@ def test_raylet_tempfiles(shutdown_only):
"log_monitor.out", "log_monitor.err", "plasma_store.out",
"plasma_store.err", "monitor.out", "monitor.err", "redis-shard_0.out",
"redis-shard_0.err", "redis.out", "redis.err", "raylet.out",
"raylet.err"
"raylet.err", "gcs_server.out", "gcs_server.err"
}
log_files_expected.update({"gcs_server.out", "gcs_server.err"})
for expected in log_files_expected:
assert expected in log_files
assert log_files_expected.issubset(log_files)
assert log_files.issuperset(log_files_expected)
socket_files = set(os.listdir(node.get_sockets_dir_path()))
+30
View File
@@ -390,6 +390,36 @@ def setup_logger(logging_level, logging_format):
logger.propagate = False
def open_log(path, **kwargs):
kwargs.setdefault("buffering", 1)
kwargs.setdefault("mode", "a")
kwargs.setdefault("encoding", "utf-8")
return open(path, **kwargs)
def create_and_init_new_worker_log(path, worker_pid):
"""
Opens a path (or creates if necessary) for a log.
Args:
path (str): The name/path of the file to be opened.
Returns:
A file-like object which can be written to.
"""
# TODO (Alex): We should eventually be able to replace this with
# named-pipes.
f = open_log(path)
# Check to see if we're creating this file. No one else should ever write
# to this file, so we don't have to worry about TOCTOU.
if f.tell() == 0:
# This should always be the first message to appear in the worker's
# stdout and stderr log files. The string "Ray worker pid:" is
# parsed in the log monitor process.
print("Ray worker pid: {}".format(worker_pid), file=f)
return f
def get_system_memory():
"""Return the total amount of system memory in bytes.
+83 -40
View File
@@ -46,12 +46,8 @@ from ray.exceptions import (
ObjectStoreFullError,
)
from ray.function_manager import FunctionActorManager
from ray.utils import (
_random_string,
check_oversized_pickle,
is_cython,
setup_logger,
)
from ray.utils import (_random_string, check_oversized_pickle, is_cython,
setup_logger, create_and_init_new_worker_log, open_log)
SCRIPT_MODE = 0
WORKER_MODE = 1
@@ -895,6 +891,68 @@ last_task_error_raise_time = 0
UNCAUGHT_ERROR_GRACE_PERIOD = 5
def _set_log_file(file_name, worker_pid, old_obj, setter_func):
# Line-buffer the output (mode 1).
f = create_and_init_new_worker_log(file_name, worker_pid)
# TODO (Alex): Python seems to always flush when writing. If that is no
# longer true, then we need to manually flush the old buffer.
# old_obj.flush()
# TODO (Alex): Flush the c/c++ userspace buffers if necessary.
# `fflush(stdout); cout.flush();`
fileno = old_obj.fileno()
# C++ logging requires redirecting the stdout file descriptor. Note that
# dup2 will automatically close the old file descriptor before overriding
# it.
os.dup2(f.fileno(), fileno)
# We also manually set sys.stdout and sys.stderr because that seems to
# have an effect on the output buffering. Without doing this, stdout
# and stderr are heavily buffered resulting in seemingly lost logging
# statements. We never want to close the stdout file descriptor, dup2 will
# close it when necessary and we don't want python's GC to close it.
setter_func(open_log(fileno, closefd=False))
return os.path.abspath(f.name)
def set_log_file(stdout_name, stderr_name):
"""Sets up logging for the current worker, creating the (fd backed) file and
flushing buffers as is necessary.
Args:
stdout_name (str): The file name that stdout should be written to.
stderr_name(str): The file name that stderr should be written to.
Returns:
(tuple) The absolute paths of the files that stdout and stderr will be
written to.
"""
stdout_path = ""
stderr_path = ""
worker_pid = os.getpid()
# lambda cannot contain assignment
def stdout_setter(x):
sys.stdout = x
def stderr_setter(x):
sys.stderr = x
if stdout_name:
_set_log_file(stdout_name, worker_pid, sys.stdout, stdout_setter)
# The stderr case should be analogous to the stdout case
if stderr_name:
_set_log_file(stderr_name, worker_pid, sys.stderr, stderr_setter)
return stdout_path, stderr_path
def print_logs(redis_client, threads_stopped):
"""Prints log messages from workers on all of the nodes.
@@ -1151,8 +1209,8 @@ def connect(node,
worker.lock = threading.RLock()
driver_name = ""
log_stdout_file_name = ""
log_stderr_file_name = ""
log_stdout_file_path = ""
log_stderr_file_path = ""
if mode == SCRIPT_MODE:
import __main__ as main
driver_name = (main.__file__
@@ -1164,39 +1222,24 @@ def connect(node,
redirect_worker_output_val = worker.redis_client.get("RedirectOutput")
if (redirect_worker_output_val is not None
and int(redirect_worker_output_val) == 1):
log_stdout_file, log_stderr_file = (
node.new_worker_redirected_log_file(worker.worker_id))
# Redirect stdout/stderr at the file descriptor level. If we simply
# set sys.stdout and sys.stderr, then logging from C++ can fail to
# be redirected.
if log_stdout_file is not None:
os.dup2(log_stdout_file.fileno(), sys.stdout.fileno())
if log_stderr_file is not None:
os.dup2(log_stderr_file.fileno(), sys.stderr.fileno())
# We also manually set sys.stdout and sys.stderr because that seems
# to have an affect on the output buffering. Without doing this,
# stdout and stderr are heavily buffered resulting in seemingly
# lost logging statements.
if log_stdout_file is not None:
sys.stdout = log_stdout_file
if log_stderr_file is not None:
sys.stderr = log_stderr_file
# This should always be the first message to appear in the worker's
# stdout and stderr log files. The string "Ray worker pid:" is
# parsed in the log monitor process.
print("Ray worker pid: {}".format(os.getpid()))
print("Ray worker pid: {}".format(os.getpid()), file=sys.stderr)
sys.stdout.flush()
sys.stderr.flush()
log_stdout_file_name = os.path.abspath(
(log_stdout_file
if log_stdout_file is not None else sys.stdout).name)
log_stderr_file_name = os.path.abspath(
(log_stderr_file
if log_stderr_file is not None else sys.stderr).name)
log_stdout_file_name, log_stderr_file_name = (
node.get_job_redirected_log_file(worker.worker_id))
try:
log_stdout_file_path, log_stderr_file_path = \
set_log_file(log_stdout_file_name, log_stderr_file_name)
except IOError:
raise IOError(
"Workers must be able to redirect their output at"
"the file descriptor level.")
elif not LOCAL_MODE:
raise ValueError(
"Invalid worker mode. Expected DRIVER, WORKER or LOCAL.")
# TODO (Alex): `current_logging_job` tracks the current job so that we know
# when to switch log files. If all logging functionaility was moved to c++,
# the functionaility in `_raylet.pyx::switch_worker_log_if_necessary` could
# be moved to `CoreWorker::SetCurrentTaskId()`.
worker.current_logging_job_id = None
redis_address, redis_port = node.redis_address.split(":")
gcs_options = ray._raylet.GcsClientOptions(
redis_address,
@@ -1216,8 +1259,8 @@ def connect(node,
node.raylet_ip_address,
(mode == LOCAL_MODE),
driver_name,
log_stdout_file_name,
log_stderr_file_name,
log_stdout_file_path,
log_stderr_file_path,
)
# Create an object for interfacing with the global state.