diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 339c4f3a1..102adf030 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -308,6 +308,19 @@ cdef prepare_args( core_worker.put_serialized_object(serialized_arg))))) +def switch_worker_log_if_needed(worker, next_job_id): + if worker.mode != ray.WORKER_MODE: + return + if (worker.current_logging_job_id is None) or \ + (worker.current_logging_job_id != next_job_id): + job_stdout_path, job_stderr_path = ( + worker.node.get_job_redirected_log_file( + worker.worker_id, next_job_id.binary()) + ) + ray.worker.set_log_file(job_stdout_path, job_stderr_path) + worker.current_logging_job_id = next_job_id + + cdef execute_task( CTaskType task_type, const CRayFunction &ray_function, @@ -444,6 +457,7 @@ cdef execute_task( with core_worker.profile_event(b"task:execute"): task_exception = True try: + switch_worker_log_if_needed(worker, job_id) with ray.worker._changeproctitle(title, next_title): outputs = function_executor(*args, **kwargs) task_exception = False diff --git a/python/ray/node.py b/python/ray/node.py index 5f6dbfab9..cf38e988d 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -17,7 +17,7 @@ import ray.ray_constants as ray_constants import ray.services import ray.utils from ray.resource_spec import ResourceSpec -from ray.utils import try_to_create_directory, try_to_symlink +from ray.utils import try_to_create_directory, try_to_symlink, open_log # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray configures it by default automatically @@ -383,14 +383,16 @@ class Node: raise FileExistsError(errno.EEXIST, "No usable temporary filename found") - def new_log_files(self, name): + def get_log_file_names(self, name, unique=False): """Generate partially randomized filenames for log files. Args: name (str): descriptive string for this log file. + unique (bool): if true, a counter will be attached to `name` to + ensure the returned filename is not already used. Returns: - A tuple of two file handles for redirecting (stdout, stderr). + A tuple of two file names for redirecting (stdout, stderr). """ redirect_output = self._ray_params.redirect_output @@ -401,14 +403,15 @@ class Node: if not redirect_output: return None, None - log_stdout = self._make_inc_temp( - suffix=".out", prefix=name, directory_name=self._logs_dir) - log_stderr = self._make_inc_temp( - suffix=".err", prefix=name, directory_name=self._logs_dir) - # Line-buffer the output (mode 1). - log_stdout_file = open(log_stdout, "a", buffering=1) - log_stderr_file = open(log_stderr, "a", buffering=1) - return log_stdout_file, log_stderr_file + if unique: + log_stdout = self._make_inc_temp( + suffix=".out", prefix=name, directory_name=self._logs_dir) + log_stderr = self._make_inc_temp( + suffix=".err", prefix=name, directory_name=self._logs_dir) + else: + log_stdout = os.path.join(self._logs_dir, "{}.out".format(name)) + log_stderr = os.path.join(self._logs_dir, "{}.err".format(name)) + return log_stdout, log_stderr def _get_unused_port(self, close_on_exit=True): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -487,9 +490,15 @@ class Node: def start_redis(self): """Start the Redis servers.""" assert self._redis_address is None - redis_log_files = [self.new_log_files("redis")] + redis_out_name, redis_err_name = self.get_log_file_names( + "redis", unique=True) + redis_log_files = [(open_log(redis_out_name), + open_log(redis_err_name))] for i in range(self._ray_params.num_redis_shards): - redis_log_files.append(self.new_log_files("redis-shard_" + str(i))) + shard_out_name, shard_err_name = self.get_log_file_names( + "redis-shard_{}".format(i), unique=True) + redis_log_files.append((open_log(shard_out_name), + open_log(shard_err_name))) (self._redis_address, redis_shards, process_infos) = ray.services.start_redis( @@ -511,7 +520,10 @@ class Node: def start_log_monitor(self): """Start the log monitor.""" - stdout_file, stderr_file = self.new_log_files("log_monitor") + log_out_name, log_err_name = self.get_log_file_names( + "log_monitor", unique=True) + stdout_file, stderr_file = open_log(log_out_name), open_log( + log_err_name) process_info = ray.services.start_log_monitor( self.redis_address, self._logs_dir, @@ -526,7 +538,10 @@ class Node: def start_reporter(self): """Start the reporter.""" - stdout_file, stderr_file = self.new_log_files("reporter") + reporter_out_name, reporter_err_name = self.get_log_file_names( + "reporter", unique=True) + stdout_file, stderr_file = (open_log(reporter_out_name), + open_log(reporter_err_name)) process_info = ray.services.start_reporter( self.redis_address, stdout_file=stdout_file, @@ -547,7 +562,10 @@ class Node: if we fail to start the dashboard. Otherwise it will print a warning if we fail to start the dashboard. """ - stdout_file, stderr_file = self.new_log_files("dashboard") + dashboard_out_name, dashboard_err_name = self.get_log_file_names( + "dashboard", unique=True) + stdout_file, stderr_file = (open_log(dashboard_out_name), + open_log(dashboard_err_name)) self._webui_url, process_info = ray.services.start_dashboard( require_dashboard, self._ray_params.dashboard_host, @@ -568,7 +586,10 @@ class Node: def start_plasma_store(self): """Start the plasma store.""" - stdout_file, stderr_file = self.new_log_files("plasma_store") + plasma_out_name, plasma_err_name = self.get_log_file_names( + "plasma_store", unique=True) + stdout_file, stderr_file = (open_log(plasma_out_name), + open_log(plasma_err_name)) process_info = ray.services.start_plasma_store( self.get_resource_spec(), self._plasma_store_socket_name, @@ -587,7 +608,10 @@ class Node: def start_gcs_server(self): """Start the gcs server. """ - stdout_file, stderr_file = self.new_log_files("gcs_server") + gcs_out_name, gcs_err_name = self.get_log_file_names( + "gcs_server", unique=True) + stdout_file, stderr_file = (open_log(gcs_out_name), + open_log(gcs_err_name)) process_info = ray.services.start_gcs_server( self._redis_address, stdout_file=stdout_file, @@ -610,7 +634,9 @@ class Node: use_profiler (bool): True if we should start the process in the valgrind profiler. """ - stdout_file, stderr_file = self.new_log_files("raylet") + raylet_out_name, raylet_err_name = self.get_log_file_names("raylet") + stdout_file, stderr_file = (open_log(raylet_out_name), + open_log(raylet_err_name)) process_info = ray.services.start_raylet( self._redis_address, self._node_ip_address, @@ -640,10 +666,39 @@ class Node: assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] - def new_worker_redirected_log_file(self, worker_id): - """Create new logging files for workers to redirect its output.""" - worker_stdout_file, worker_stderr_file = ( - self.new_log_files("worker-" + ray.utils.binary_to_hex(worker_id))) + def get_job_redirected_log_file(self, + worker_id: bytes, + job_id: bytes = None): + """Determines (but does not create) logging files for workers to + redirect its output. + + Args: + worker_id (bytes): A byte representation of the worker id. + job_id (bytes): A byte representation of the job id. If None, + provides a generic log file for the worker. + + Returns: + (tuple) The stdout and stderr file names that the job should be + redirected to. + """ + redirect_output = self._ray_params.redirect_output + + if redirect_output is None: + # Make the default behavior match that of glog. + redirect_output = os.getenv("GLOG_logtostderr") != "1" + + if not redirect_output: + return None, None + + if job_id is not None: + name = "worker-{}-{}".format( + ray.utils.binary_to_hex(worker_id), + ray.utils.binary_to_hex(job_id)) + else: + name = "worker-{}".format(ray.utils.binary_to_hex(worker_id)) + + worker_stdout_file, worker_stderr_file = self.get_log_file_names( + name, unique=False) return worker_stdout_file, worker_stderr_file def start_worker(self): @@ -652,7 +707,10 @@ class Node: def start_monitor(self): """Start the monitor.""" - stdout_file, stderr_file = self.new_log_files("monitor") + monitor_out_name, monitor_err_name = self.get_log_file_names( + "monitor", unique=True) + stdout_file, stderr_file = (open_log(monitor_out_name), + open_log(monitor_err_name)) process_info = ray.services.start_monitor( self._redis_address, stdout_file=stdout_file, diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 17f533e0d..b5036f342 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -150,11 +150,12 @@ def test_raylet_tempfiles(shutdown_only): "log_monitor.out", "log_monitor.err", "plasma_store.out", "plasma_store.err", "monitor.out", "monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err", "raylet.out", - "raylet.err" + "raylet.err", "gcs_server.out", "gcs_server.err" } - log_files_expected.update({"gcs_server.out", "gcs_server.err"}) - + for expected in log_files_expected: + assert expected in log_files + assert log_files_expected.issubset(log_files) assert log_files.issuperset(log_files_expected) socket_files = set(os.listdir(node.get_sockets_dir_path())) diff --git a/python/ray/utils.py b/python/ray/utils.py index 73aecc46f..692e819d2 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -390,6 +390,36 @@ def setup_logger(logging_level, logging_format): logger.propagate = False +def open_log(path, **kwargs): + kwargs.setdefault("buffering", 1) + kwargs.setdefault("mode", "a") + kwargs.setdefault("encoding", "utf-8") + return open(path, **kwargs) + + +def create_and_init_new_worker_log(path, worker_pid): + """ + Opens a path (or creates if necessary) for a log. + + Args: + path (str): The name/path of the file to be opened. + + Returns: + A file-like object which can be written to. + """ + # TODO (Alex): We should eventually be able to replace this with + # named-pipes. + f = open_log(path) + # Check to see if we're creating this file. No one else should ever write + # to this file, so we don't have to worry about TOCTOU. + if f.tell() == 0: + # This should always be the first message to appear in the worker's + # stdout and stderr log files. The string "Ray worker pid:" is + # parsed in the log monitor process. + print("Ray worker pid: {}".format(worker_pid), file=f) + return f + + def get_system_memory(): """Return the total amount of system memory in bytes. diff --git a/python/ray/worker.py b/python/ray/worker.py index 999c1a36f..4b71b10fa 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -46,12 +46,8 @@ from ray.exceptions import ( ObjectStoreFullError, ) from ray.function_manager import FunctionActorManager -from ray.utils import ( - _random_string, - check_oversized_pickle, - is_cython, - setup_logger, -) +from ray.utils import (_random_string, check_oversized_pickle, is_cython, + setup_logger, create_and_init_new_worker_log, open_log) SCRIPT_MODE = 0 WORKER_MODE = 1 @@ -895,6 +891,68 @@ last_task_error_raise_time = 0 UNCAUGHT_ERROR_GRACE_PERIOD = 5 +def _set_log_file(file_name, worker_pid, old_obj, setter_func): + # Line-buffer the output (mode 1). + f = create_and_init_new_worker_log(file_name, worker_pid) + + # TODO (Alex): Python seems to always flush when writing. If that is no + # longer true, then we need to manually flush the old buffer. + # old_obj.flush() + + # TODO (Alex): Flush the c/c++ userspace buffers if necessary. + # `fflush(stdout); cout.flush();` + + fileno = old_obj.fileno() + + # C++ logging requires redirecting the stdout file descriptor. Note that + # dup2 will automatically close the old file descriptor before overriding + # it. + os.dup2(f.fileno(), fileno) + + # We also manually set sys.stdout and sys.stderr because that seems to + # have an effect on the output buffering. Without doing this, stdout + # and stderr are heavily buffered resulting in seemingly lost logging + # statements. We never want to close the stdout file descriptor, dup2 will + # close it when necessary and we don't want python's GC to close it. + setter_func(open_log(fileno, closefd=False)) + + return os.path.abspath(f.name) + + +def set_log_file(stdout_name, stderr_name): + """Sets up logging for the current worker, creating the (fd backed) file and + flushing buffers as is necessary. + + Args: + stdout_name (str): The file name that stdout should be written to. + stderr_name(str): The file name that stderr should be written to. + + Returns: + (tuple) The absolute paths of the files that stdout and stderr will be + written to. + + """ + stdout_path = "" + stderr_path = "" + worker_pid = os.getpid() + + # lambda cannot contain assignment + def stdout_setter(x): + sys.stdout = x + + def stderr_setter(x): + sys.stderr = x + + if stdout_name: + _set_log_file(stdout_name, worker_pid, sys.stdout, stdout_setter) + + # The stderr case should be analogous to the stdout case + if stderr_name: + _set_log_file(stderr_name, worker_pid, sys.stderr, stderr_setter) + + return stdout_path, stderr_path + + def print_logs(redis_client, threads_stopped): """Prints log messages from workers on all of the nodes. @@ -1151,8 +1209,8 @@ def connect(node, worker.lock = threading.RLock() driver_name = "" - log_stdout_file_name = "" - log_stderr_file_name = "" + log_stdout_file_path = "" + log_stderr_file_path = "" if mode == SCRIPT_MODE: import __main__ as main driver_name = (main.__file__ @@ -1164,39 +1222,24 @@ def connect(node, redirect_worker_output_val = worker.redis_client.get("RedirectOutput") if (redirect_worker_output_val is not None and int(redirect_worker_output_val) == 1): - log_stdout_file, log_stderr_file = ( - node.new_worker_redirected_log_file(worker.worker_id)) - # Redirect stdout/stderr at the file descriptor level. If we simply - # set sys.stdout and sys.stderr, then logging from C++ can fail to - # be redirected. - if log_stdout_file is not None: - os.dup2(log_stdout_file.fileno(), sys.stdout.fileno()) - if log_stderr_file is not None: - os.dup2(log_stderr_file.fileno(), sys.stderr.fileno()) - # We also manually set sys.stdout and sys.stderr because that seems - # to have an affect on the output buffering. Without doing this, - # stdout and stderr are heavily buffered resulting in seemingly - # lost logging statements. - if log_stdout_file is not None: - sys.stdout = log_stdout_file - if log_stderr_file is not None: - sys.stderr = log_stderr_file - # This should always be the first message to appear in the worker's - # stdout and stderr log files. The string "Ray worker pid:" is - # parsed in the log monitor process. - print("Ray worker pid: {}".format(os.getpid())) - print("Ray worker pid: {}".format(os.getpid()), file=sys.stderr) - sys.stdout.flush() - sys.stderr.flush() - log_stdout_file_name = os.path.abspath( - (log_stdout_file - if log_stdout_file is not None else sys.stdout).name) - log_stderr_file_name = os.path.abspath( - (log_stderr_file - if log_stderr_file is not None else sys.stderr).name) + log_stdout_file_name, log_stderr_file_name = ( + node.get_job_redirected_log_file(worker.worker_id)) + try: + log_stdout_file_path, log_stderr_file_path = \ + set_log_file(log_stdout_file_name, log_stderr_file_name) + except IOError: + raise IOError( + "Workers must be able to redirect their output at" + "the file descriptor level.") elif not LOCAL_MODE: raise ValueError( "Invalid worker mode. Expected DRIVER, WORKER or LOCAL.") + + # TODO (Alex): `current_logging_job` tracks the current job so that we know + # when to switch log files. If all logging functionaility was moved to c++, + # the functionaility in `_raylet.pyx::switch_worker_log_if_necessary` could + # be moved to `CoreWorker::SetCurrentTaskId()`. + worker.current_logging_job_id = None redis_address, redis_port = node.redis_address.split(":") gcs_options = ray._raylet.GcsClientOptions( redis_address, @@ -1216,8 +1259,8 @@ def connect(node, node.raylet_ip_address, (mode == LOCAL_MODE), driver_name, - log_stdout_file_name, - log_stderr_file_name, + log_stdout_file_path, + log_stderr_file_path, ) # Create an object for interfacing with the global state.