[core] Fix Ray service startup when logging redirection is disabled. (#9547)

This commit is contained in:
Clark Zinzow
2020-07-23 10:26:24 -06:00
committed by GitHub
parent 01d6edae9c
commit 9f969260e8
2 changed files with 43 additions and 39 deletions
+39 -39
View File
@@ -394,7 +394,33 @@ class Node:
raise FileExistsError(errno.EEXIST,
"No usable temporary filename found")
def get_log_file_names(self, name, unique=False):
def get_log_file_handles(self, name, unique=False):
"""Open log files with partially randomized filenames, returning the
file handles. If output redirection has been disabled, no files will
be opened and `(None, None)` will be returned.
Args:
name (str): descriptive string for this log file.
unique (bool): if true, a counter will be attached to `name` to
ensure the returned filename is not already used.
Returns:
A tuple of two file handles for redirecting (stdout, stderr), or
`(None, None)` if output redirection is disabled.
"""
redirect_output = self._ray_params.redirect_output
if redirect_output is None:
# Make the default behavior match that of glog.
redirect_output = os.getenv("GLOG_logtostderr") != "1"
if not redirect_output:
return None, None
log_stdout, log_stderr = self._get_log_file_names(name, unique=unique)
return open_log(log_stdout), open_log(log_stderr)
def _get_log_file_names(self, name, unique=False):
"""Generate partially randomized filenames for log files.
Args:
@@ -405,14 +431,6 @@ class Node:
Returns:
A tuple of two file names for redirecting (stdout, stderr).
"""
redirect_output = self._ray_params.redirect_output
if redirect_output is None:
# Make the default behavior match that of glog.
redirect_output = os.getenv("GLOG_logtostderr") != "1"
if not redirect_output:
return None, None
if unique:
log_stdout = self._make_inc_temp(
@@ -501,15 +519,11 @@ class Node:
def start_redis(self):
"""Start the Redis servers."""
assert self._redis_address is None
redis_out_name, redis_err_name = self.get_log_file_names(
"redis", unique=True)
redis_log_files = [(open_log(redis_out_name),
open_log(redis_err_name))]
redis_log_files = [self.get_log_file_handles("redis", unique=True)]
for i in range(self._ray_params.num_redis_shards):
shard_out_name, shard_err_name = self.get_log_file_names(
"redis-shard_{}".format(i), unique=True)
redis_log_files.append((open_log(shard_out_name),
open_log(shard_err_name)))
redis_log_files.append(
self.get_log_file_handles(
"redis-shard_{}".format(i), unique=True))
(self._redis_address, redis_shards,
process_infos) = ray.services.start_redis(
@@ -531,10 +545,8 @@ class Node:
def start_log_monitor(self):
"""Start the log monitor."""
log_out_name, log_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"log_monitor", unique=True)
stdout_file, stderr_file = open_log(log_out_name), open_log(
log_err_name)
process_info = ray.services.start_log_monitor(
self.redis_address,
self._logs_dir,
@@ -549,10 +561,8 @@ class Node:
def start_reporter(self):
"""Start the reporter."""
reporter_out_name, reporter_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"reporter", unique=True)
stdout_file, stderr_file = (open_log(reporter_out_name),
open_log(reporter_err_name))
process_info = ray.services.start_reporter(
self.redis_address,
self._ray_params.metrics_agent_port,
@@ -574,10 +584,8 @@ class Node:
if we fail to start the dashboard. Otherwise it will print
a warning if we fail to start the dashboard.
"""
dashboard_out_name, dashboard_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"dashboard", unique=True)
stdout_file, stderr_file = (open_log(dashboard_out_name),
open_log(dashboard_err_name))
self._webui_url, process_info = ray.services.start_dashboard(
require_dashboard,
self._ray_params.dashboard_host,
@@ -598,10 +606,8 @@ class Node:
def start_plasma_store(self):
"""Start the plasma store."""
plasma_out_name, plasma_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"plasma_store", unique=True)
stdout_file, stderr_file = (open_log(plasma_out_name),
open_log(plasma_err_name))
process_info = ray.services.start_plasma_store(
self.get_resource_spec(),
self._plasma_store_socket_name,
@@ -620,10 +626,8 @@ class Node:
def start_gcs_server(self):
"""Start the gcs server.
"""
gcs_out_name, gcs_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"gcs_server", unique=True)
stdout_file, stderr_file = (open_log(gcs_out_name),
open_log(gcs_err_name))
process_info = ray.services.start_gcs_server(
self._redis_address,
stdout_file=stdout_file,
@@ -647,10 +651,8 @@ class Node:
use_profiler (bool): True if we should start the process in the
valgrind profiler.
"""
raylet_out_name, raylet_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"raylet", unique=True)
stdout_file, stderr_file = (open_log(raylet_out_name),
open_log(raylet_err_name))
process_info = ray.services.start_raylet(
self._redis_address,
self._node_ip_address,
@@ -713,7 +715,7 @@ class Node:
else:
name = "worker-{}".format(ray.utils.binary_to_hex(worker_id))
worker_stdout_file, worker_stderr_file = self.get_log_file_names(
worker_stdout_file, worker_stderr_file = self._get_log_file_names(
name, unique=False)
return worker_stdout_file, worker_stderr_file
@@ -723,10 +725,8 @@ class Node:
def start_monitor(self):
"""Start the monitor."""
monitor_out_name, monitor_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"monitor", unique=True)
stdout_file, stderr_file = (open_log(monitor_out_name),
open_log(monitor_err_name))
process_info = ray.services.start_monitor(
self._redis_address,
stdout_file=stdout_file,