From 21472b890aa1fc03e67b853bf57c89ca1f80ec90 Mon Sep 17 00:00:00 2001 From: Si-Yuan Date: Wed, 13 Feb 2019 09:34:04 +0800 Subject: [PATCH] Integrate "tempfile_service" into "ray.node.Node" (#3953) --- doc/source/tempfile.rst | 5 +- python/ray/node.py | 222 +++++++++++++++++++------ python/ray/services.py | 71 ++++---- python/ray/tempfile_services.py | 240 --------------------------- python/ray/utils.py | 34 ++++ python/ray/worker.py | 22 ++- python/ray/workers/default_worker.py | 17 +- test/stress_tests.py | 1 - test/tempfile_test.py | 22 +-- 9 files changed, 288 insertions(+), 346 deletions(-) delete mode 100644 python/ray/tempfile_services.py diff --git a/doc/source/tempfile.rst b/doc/source/tempfile.rst index d68e835e0..0666729b4 100644 --- a/doc/source/tempfile.rst +++ b/doc/source/tempfile.rst @@ -80,7 +80,4 @@ The path you specified will be given as it is without being affected any other p Notes ----- -Temporary file policies are defined in ``python/ray/tempfile_services.py``. - -Currently, we keep ``/tmp/ray`` as the default directory for temporary data files of RLlib as before. -It is not very reasonable and could be changed later. +Temporary file policies are defined in ``python/ray/node.py``. \ No newline at end of file diff --git a/python/ray/node.py b/python/ray/node.py index e3581ec2b..878fc0f98 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -3,21 +3,21 @@ from __future__ import division from __future__ import print_function import atexit +import collections +import datetime +import errno import json import os import logging import signal +import tempfile import threading import time import ray import ray.ray_constants as ray_constants -from ray.tempfile_services import ( - get_logs_dir_path, get_object_store_socket_name, get_raylet_socket_name, - new_log_monitor_log_file, new_monitor_log_file, - new_raylet_monitor_log_file, new_plasma_store_log_file, - new_raylet_log_file, new_webui_log_file, set_temp_root, - try_to_create_directory) +import ray.services +from ray.utils import try_to_create_directory # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray configures it by default automatically @@ -36,7 +36,11 @@ class Node(object): server list, which has multiple. """ - def __init__(self, ray_params, head=False, shutdown_at_exit=True): + def __init__(self, + ray_params, + head=False, + shutdown_at_exit=True, + connect_only=False): """Start a node. Args: @@ -48,7 +52,11 @@ class Node(object): shutdown_at_exit (bool): If true, a handler will be registered to shutdown the processes started here when the Python interpreter exits. + connect_only (bool): If true, connect to the node without starting + new processes. """ + assert not (shutdown_at_exit and connect_only), ( + "'shutdown_at_exit' and 'connect_only' cannot be both true.") self.all_processes = {} ray_params.update_if_absent( @@ -60,29 +68,60 @@ class Node(object): os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py")) + self._ray_params = ray_params + + self._node_ip_address = ray_params.node_ip_address + self._redis_address = ray_params.redis_address + self._config = (json.loads(ray_params._internal_config) + if ray_params._internal_config else None) + if head: ray_params.update_if_absent(num_redis_shards=1, include_webui=True) + self._plasma_store_socket_name = None + self._raylet_socket_name = None + self._webui_url = None else: - redis_client = ray.services.create_redis_client( - ray_params.redis_address, ray_params.redis_password) + self._plasma_store_socket_name = ( + ray_params.plasma_store_socket_name) + self._raylet_socket_name = ray_params.raylet_socket_name + redis_client = self.create_redis_client() + # TODO(suquark): Replace _webui_url_helper in worker.py in + # another PR. + _webui_url = redis_client.hmget("webui", "url")[0] + self._webui_url = (ray.utils.decode(_webui_url) + if _webui_url is not None else None) ray_params.include_java = ( ray.services.include_java_from_redis(redis_client)) - self._ray_params = ray_params - self._config = (json.loads(ray_params._internal_config) - if ray_params._internal_config else None) - self._node_ip_address = ray_params.node_ip_address - self._redis_address = ray_params.redis_address - self._plasma_store_socket_name = None - self._raylet_socket_name = None - self._webui_url = None + self._init_temp() - self.start_ray_processes() + if not connect_only: + self.start_ray_processes() if shutdown_at_exit: atexit.register(lambda: self.kill_all_processes( check_alive=False, allow_graceful=True)) + def _init_temp(self): + # Create an dictionary to store temp file index. + self._incremental_dict = collections.defaultdict(lambda: 0) + + self._temp_dir = self._ray_params.temp_dir + if self._temp_dir is None: + date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S") + self._temp_dir = self._make_inc_temp( + prefix="session_{date_str}_{pid}".format( + pid=os.getpid(), date_str=date_str), + directory_name="/tmp/ray") + + try_to_create_directory(self._temp_dir) + # Create a directory to be used for socket files. + self._sockets_dir = os.path.join(self._temp_dir, "sockets") + try_to_create_directory(self._sockets_dir) + # Create a directory to be used for process log files. + self._logs_dir = os.path.join(self._temp_dir, "logs") + try_to_create_directory(self._logs_dir) + @property def node_ip_address(self): """Get the cluster Redis address.""" @@ -108,7 +147,87 @@ class Node(object): """Get the node's raylet socket name.""" return self._raylet_socket_name - def prepare_socket_file(self, socket_path): + def create_redis_client(self): + """Create a redis client.""" + return ray.services.create_redis_client( + self._redis_address, self._ray_params.redis_password) + + def get_temp_dir_path(self): + """Get the path of the temporary directory.""" + return self._temp_dir + + def get_logs_dir_path(self): + """Get the path of the log files directory.""" + return self._logs_dir + + def get_sockets_dir_path(self): + """Get the path of the sockets directory.""" + return self._sockets_dir + + def _make_inc_temp(self, suffix="", prefix="", directory_name="/tmp/ray"): + """Return a incremental temporary file name. The file is not created. + + Args: + suffix (str): The suffix of the temp file. + prefix (str): The prefix of the temp file. + directory_name (str) : The base directory of the temp file. + + Returns: + A string of file name. If there existing a file having + the same name, the returned name will look like + "{directory_name}/{prefix}.{unique_index}{suffix}" + """ + directory_name = os.path.expanduser(directory_name) + index = self._incremental_dict[suffix, prefix, directory_name] + # `tempfile.TMP_MAX` could be extremely large, + # so using `range` in Python2.x should be avoided. + while index < tempfile.TMP_MAX: + if index == 0: + filename = os.path.join(directory_name, prefix + suffix) + else: + filename = os.path.join(directory_name, + prefix + "." + str(index) + suffix) + index += 1 + if not os.path.exists(filename): + # Save the index. + self._incremental_dict[suffix, prefix, directory_name] = index + return filename + + raise FileExistsError(errno.EEXIST, + "No usable temporary filename found") + + def new_log_files(self, name, redirect_output=None): + """Generate partially randomized filenames for log files. + + Args: + name (str): descriptive string for this log file. + redirect_output (bool): True if files should be generated for + logging stdout and stderr and false if stdout and stderr + should not be redirected. + If it is None, it will use the "redirect_output" Ray parameter. + + Returns: + If redirect_output is true, this will return a tuple of two + file handles. The first is for redirecting stdout and the + second is for redirecting stderr. + If redirect_output is false, this will return a tuple + of two None objects. + """ + if redirect_output is None: + redirect_output = self._ray_params.redirect_output + if not redirect_output: + return None, None + + log_stdout = self._make_inc_temp( + suffix=".out", prefix=name, directory_name=self._logs_dir) + log_stderr = self._make_inc_temp( + suffix=".err", prefix=name, directory_name=self._logs_dir) + # Line-buffer the output (mode 1) + log_stdout_file = open(log_stdout, "a", buffering=1) + log_stderr_file = open(log_stderr, "a", buffering=1) + return log_stdout_file, log_stderr_file + + def _prepare_socket_file(self, socket_path, default_prefix): """Prepare the socket file for raylet and plasma. This method helps to prepare a socket file. @@ -118,24 +237,30 @@ class Node(object): Args: socket_path (string): the socket file to prepare. """ - if not os.path.exists(socket_path): - path = os.path.dirname(socket_path) - if not os.path.isdir(path): - try_to_create_directory(path) - else: - raise Exception("Socket file {} exists!".format(socket_path)) + if socket_path is not None: + if os.path.exists(socket_path): + raise Exception("Socket file {} exists!".format(socket_path)) + socket_dir = os.path.dirname(socket_path) + try_to_create_directory(socket_dir) + return socket_path + return self._make_inc_temp( + prefix=default_prefix, directory_name=self._sockets_dir) def start_redis(self): """Start the Redis servers.""" assert self._redis_address is None + redis_log_files = [self.new_log_files("redis")] + for i in range(self._ray_params.num_redis_shards): + redis_log_files.append(self.new_log_files("redis-shard_" + str(i))) + (self._redis_address, redis_shards, process_infos) = ray.services.start_redis( self._node_ip_address, + redis_log_files, port=self._ray_params.redis_port, redis_shard_ports=self._ray_params.redis_shard_ports, num_redis_shards=self._ray_params.num_redis_shards, redis_max_clients=self._ray_params.redis_max_clients, - redirect_output=self._ray_params.redirect_output, redirect_worker_output=self._ray_params.redirect_worker_output, password=self._ray_params.redis_password, redis_max_memory=self._ray_params.redis_max_memory) @@ -146,9 +271,10 @@ class Node(object): def start_log_monitor(self): """Start the log monitor.""" - stdout_file, stderr_file = new_log_monitor_log_file() + stdout_file, stderr_file = self.new_log_files("log_monitor", True) process_info = ray.services.start_log_monitor( self.redis_address, + self._logs_dir, stdout_file=stdout_file, stderr_file=stderr_file, redis_password=self._ray_params.redis_password) @@ -159,9 +285,12 @@ class Node(object): def start_ui(self): """Start the web UI.""" - stdout_file, stderr_file = new_webui_log_file() + stdout_file, stderr_file = self.new_log_files("webui", True) + notebook_name = self._make_inc_temp( + suffix=".ipynb", prefix="ray_ui", directory_name=self._temp_dir) self._webui_url, process_info = ray.services.start_ui( self._redis_address, + notebook_name, stdout_file=stdout_file, stderr_file=stderr_file) assert ray_constants.PROCESS_TYPE_WEB_UI not in self.all_processes @@ -174,15 +303,11 @@ class Node(object): """Start the plasma store.""" assert self._plasma_store_socket_name is None # If the user specified a socket name, use it. - self._plasma_store_socket_name = ( - self._ray_params.plasma_store_socket_name - or get_object_store_socket_name()) - self.prepare_socket_file(self._plasma_store_socket_name) - stdout_file, stderr_file = (new_plasma_store_log_file( - self._ray_params.redirect_output)) + self._plasma_store_socket_name = self._prepare_socket_file( + self._ray_params.plasma_store_socket_name, + default_prefix="plasma_store") + stdout_file, stderr_file = self.new_log_files("plasma_store") process_info = ray.services.start_plasma_store( - self._node_ip_address, - self._redis_address, stdout_file=stdout_file, stderr_file=stderr_file, object_store_memory=self._ray_params.object_store_memory, @@ -206,17 +331,16 @@ class Node(object): """ assert self._raylet_socket_name is None # If the user specified a socket name, use it. - self._raylet_socket_name = (self._ray_params.raylet_socket_name - or get_raylet_socket_name()) - self.prepare_socket_file(self._raylet_socket_name) - stdout_file, stderr_file = new_raylet_log_file( - redirect_output=self._ray_params.redirect_output) + self._raylet_socket_name = self._prepare_socket_file( + self._ray_params.raylet_socket_name, default_prefix="raylet") + stdout_file, stderr_file = self.new_log_files("raylet") process_info = ray.services.start_raylet( self._redis_address, self._node_ip_address, self._raylet_socket_name, self._plasma_store_socket_name, self._ray_params.worker_path, + self._temp_dir, self._ray_params.num_cpus, self._ray_params.num_gpus, self._ray_params.resources, @@ -234,17 +358,21 @@ class Node(object): assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] + def new_worker_redirected_log_file(self, worker_id): + """Create new logging files for workers to redirect its output.""" + worker_stdout_file, worker_stderr_file = (self.new_log_files( + "worker-" + ray.utils.binary_to_hex(worker_id), True)) + return worker_stdout_file, worker_stderr_file + def start_worker(self): """Start a worker process.""" raise NotImplementedError def start_monitor(self): """Start the monitor.""" - stdout_file, stderr_file = new_monitor_log_file( - self._ray_params.redirect_output) + stdout_file, stderr_file = self.new_log_files("monitor") process_info = ray.services.start_monitor( self._redis_address, - self._node_ip_address, stdout_file=stdout_file, stderr_file=stderr_file, autoscaling_config=self._ray_params.autoscaling_config, @@ -254,8 +382,7 @@ class Node(object): def start_raylet_monitor(self): """Start the raylet monitor.""" - stdout_file, stderr_file = new_raylet_monitor_log_file( - self._ray_params.redirect_output) + stdout_file, stderr_file = self.new_log_files("raylet_monitor") process_info = ray.services.start_raylet_monitor( self._redis_address, stdout_file=stdout_file, @@ -270,10 +397,9 @@ class Node(object): def start_ray_processes(self): """Start all of the processes on the node.""" - set_temp_root(self._ray_params.temp_dir) logger.info( "Process STDOUT and STDERR is being redirected to {}.".format( - get_logs_dir_path())) + self._logs_dir)) # If this is the head node, start the relevant head node processes. if self._redis_address is None: diff --git a/python/ray/services.py b/python/ray/services.py index 7487bd5b6..ea88daa86 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -10,6 +10,7 @@ import multiprocessing import os import random import resource +import shutil import socket import subprocess import sys @@ -21,14 +22,6 @@ import pyarrow import ray import ray.ray_constants as ray_constants -from ray.tempfile_services import ( - get_gdb_init_path, - get_ipython_notebook_path, - get_logs_dir_path, - get_temp_root, - new_redis_log_file, -) - # True if processes are run in the valgrind profiler. RUN_RAYLET_PROFILER = False RUN_PLASMA_STORE_PROFILER = False @@ -291,7 +284,10 @@ def start_ray_process(command, if not use_tmux: raise ValueError( "If 'use_gdb' is true, then 'use_tmux' must be true as well.") - gdb_init_path = get_gdb_init_path(process_type) + + # TODO(suquark): Any better temp file creation here? + gdb_init_path = "/tmp/ray/gdb_init_{}_{}".format( + process_type, time.time()) ray_process_path = command[0] ray_process_args = command[1:] run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args]) @@ -458,11 +454,11 @@ def check_version_info(redis_client): def start_redis(node_ip_address, + redirect_files, port=None, redis_shard_ports=None, num_redis_shards=1, redis_max_clients=None, - redirect_output=False, redirect_worker_output=False, password=None, use_credis=None, @@ -473,6 +469,7 @@ def start_redis(node_ip_address, Args: node_ip_address: The IP address of the current node. This is only used for recording the log filenames in Redis. + redirect_files: The list of (stdout, stderr) file pairs. port (int): If provided, the primary Redis shard will be started on this port. redis_shard_ports: A list of the ports to use for the non-primary Redis @@ -482,8 +479,6 @@ def start_redis(node_ip_address, shard. redis_max_clients: If this is provided, Ray will attempt to configure Redis with this maxclients number. - redirect_output (bool): True if output should be redirected to a file - and false otherwise. redirect_worker_output (bool): True if worker output should be redirected to a file and false otherwise. Workers will have access to this value when they start up. @@ -505,8 +500,10 @@ def start_redis(node_ip_address, addresses for the remaining shards, and the processes that were started. """ - redis_stdout_file, redis_stderr_file = new_redis_log_file(redirect_output) + assert len(redirect_files) == 1 + num_redis_shards, ( + "The number of redirect file pairs should be equal to the number of" + "redis shards (including the primary shard) we will start.") if redis_shard_ports is None: redis_shard_ports = num_redis_shards * [None] elif len(redis_shard_ports) != num_redis_shards: @@ -541,6 +538,7 @@ def start_redis(node_ip_address, redis_executable = REDIS_EXECUTABLE redis_modules = [REDIS_MODULE] + redis_stdout_file, redis_stderr_file = redirect_files[0] # Start the primary Redis shard. port, p = _start_redis_instance( redis_executable, @@ -587,9 +585,7 @@ def start_redis(node_ip_address, # prefixed by "redis-". redis_shards = [] for i in range(num_redis_shards): - redis_stdout_file, redis_stderr_file = new_redis_log_file( - redirect_output, shard_number=i) - + redis_stdout_file, redis_stderr_file = redirect_files[i + 1] if use_credis: redis_executable = CREDIS_EXECUTABLE # It is important to load the credis module BEFORE the ray module, @@ -795,6 +791,7 @@ def _start_redis_instance(executable, def start_log_monitor(redis_address, + logs_dir, stdout_file=None, stderr_file=None, redis_password=None): @@ -802,6 +799,7 @@ def start_log_monitor(redis_address, Args: redis_address (str): The address of the Redis instance. + logs_dir (str): The directory of logging files. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If @@ -815,8 +813,8 @@ def start_log_monitor(redis_address, os.path.dirname(os.path.abspath(__file__)), "log_monitor.py") command = [ sys.executable, "-u", log_monitor_filepath, - "--redis-address={}".format(redis_address), "--logs-dir={}".format( - get_logs_dir_path()) + "--redis-address={}".format(redis_address), + "--logs-dir={}".format(logs_dir) ] if redis_password: command += ["--redis-password", redis_password] @@ -828,11 +826,12 @@ def start_log_monitor(redis_address, return process_info -def start_ui(redis_address, stdout_file=None, stderr_file=None): +def start_ui(redis_address, notebook_name, stdout_file=None, stderr_file=None): """Start a UI process. Args: redis_address: The address of the primary Redis shard. + notebook_name: The destination of the notebook file. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If @@ -853,7 +852,12 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None): except socket.error: port += 1 - notebook_name = get_ipython_notebook_path() + notebook_filepath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb") + # We copy the notebook file so that the original doesn't get modified by + # the user. + shutil.copy(notebook_filepath, notebook_name) + new_notebook_directory = os.path.dirname(notebook_name) # We generate the token used for authentication ourselves to avoid # querying the jupyter server. @@ -955,6 +959,7 @@ def start_raylet(redis_address, raylet_name, plasma_store_name, worker_path, + temp_dir, num_cpus=None, num_gpus=None, resources=None, @@ -978,6 +983,7 @@ def start_raylet(redis_address, to. worker_path (str): The path of the Python file that new worker processes will execute. + temp_dir (str): The path of the temporary directory Ray will use. num_cpus: The CPUs allocated for this raylet. num_gpus: The GPUs allocated for this raylet. resources: The custom resources allocated for this raylet. @@ -1034,6 +1040,7 @@ def start_raylet(redis_address, plasma_store_name, raylet_name, redis_password, + os.path.join(temp_dir, "sockets"), ) else: java_worker_command = "" @@ -1047,7 +1054,7 @@ def start_raylet(redis_address, "--temp-dir={}".format( sys.executable, worker_path, node_ip_address, plasma_store_name, raylet_name, redis_address, - get_temp_root())) + temp_dir)) if redis_password: start_worker_command += " --redis-password {}".format(redis_password) @@ -1076,7 +1083,7 @@ def start_raylet(redis_address, start_worker_command, java_worker_command, redis_password or "", - get_temp_root(), + temp_dir, ] process_info = start_ray_process( command, @@ -1097,6 +1104,7 @@ def build_java_worker_command( plasma_store_name, raylet_name, redis_password, + temp_dir, ): """This method assembles the command used to start a Java worker. @@ -1107,6 +1115,7 @@ def build_java_worker_command( to. raylet_name (str): The name of the raylet socket to create. redis_password (str): The password of connect to redis. + temp_dir (str): The path of the temporary directory Ray will use. Returns: The command string for starting Java worker. """ @@ -1127,7 +1136,8 @@ def build_java_worker_command( command += ("-Dray.redis-password=%s", redis_password) command += "-Dray.home={} ".format(RAY_HOME) - command += "-Dray.log-dir={} ".format(get_logs_dir_path()) + # TODO(suquark): We should use temp_dir as the input of a java worker. + command += "-Dray.log-dir={} ".format(os.path.join(temp_dir, "sockets")) command += "org.ray.runtime.runner.worker.DefaultWorker" return command @@ -1279,9 +1289,7 @@ def _start_plasma_store(plasma_store_memory, return process_info -def start_plasma_store(node_ip_address, - redis_address, - stdout_file=None, +def start_plasma_store(stdout_file=None, stderr_file=None, object_store_memory=None, plasma_directory=None, @@ -1290,9 +1298,6 @@ def start_plasma_store(node_ip_address, """This method starts an object store process. Args: - node_ip_address (str): The IP address of the node running the object - store. - redis_address (str): The address of the Redis instance to connect to. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr @@ -1338,6 +1343,7 @@ def start_worker(node_ip_address, raylet_name, redis_address, worker_path, + temp_dir, stdout_file=None, stderr_file=None): """This method starts a worker process. @@ -1350,6 +1356,7 @@ def start_worker(node_ip_address, redis_address (str): The address that the Redis server is listening on. worker_path (str): The path of the source code which the worker process will run. + temp_dir (str): The path of the temp dir. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If @@ -1363,8 +1370,7 @@ def start_worker(node_ip_address, "--node-ip-address=" + node_ip_address, "--object-store-name=" + object_store_name, "--raylet-name=" + raylet_name, - "--redis-address=" + str(redis_address), - "--temp-dir=" + get_temp_root() + "--redis-address=" + str(redis_address), "--temp-dir=" + temp_dir ] process_info = start_ray_process( command, @@ -1375,7 +1381,6 @@ def start_worker(node_ip_address, def start_monitor(redis_address, - node_ip_address, stdout_file=None, stderr_file=None, autoscaling_config=None, @@ -1384,8 +1389,6 @@ def start_monitor(redis_address, Args: redis_address (str): The address that the Redis server is listening on. - node_ip_address: The IP address of the node that this process will run - on. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If diff --git a/python/ray/tempfile_services.py b/python/ray/tempfile_services.py deleted file mode 100644 index 66016a4f1..000000000 --- a/python/ray/tempfile_services.py +++ /dev/null @@ -1,240 +0,0 @@ -import collections -import datetime -import errno -import logging -import os -import shutil -import tempfile - -import ray.utils - -logger = logging.getLogger(__name__) -_incremental_dict = collections.defaultdict(lambda: 0) -_temp_root = None - - -def make_inc_temp(suffix="", prefix="", directory_name="/tmp/ray"): - """Return a incremental temporary file name. The file is not created. - - Args: - suffix (str): The suffix of the temp file. - prefix (str): The prefix of the temp file. - directory_name (str) : The base directory of the temp file. - - Returns: - A string of file name. If there existing a file having the same name, - the returned name will look like - "{directory_name}/{prefix}.{unique_index}{suffix}" - """ - directory_name = os.path.expanduser(directory_name) - index = _incremental_dict[suffix, prefix, directory_name] - # `tempfile.TMP_MAX` could be extremely large, - # so using `range` in Python2.x should be avoided. - while index < tempfile.TMP_MAX: - if index == 0: - filename = os.path.join(directory_name, prefix + suffix) - else: - filename = os.path.join(directory_name, - prefix + "." + str(index) + suffix) - index += 1 - if not os.path.exists(filename): - _incremental_dict[suffix, prefix, - directory_name] = index # Save the index. - return filename - - raise FileExistsError(errno.EEXIST, "No usable temporary filename found") - - -def try_to_create_directory(directory_path): - """Attempt to create a directory that is globally readable/writable. - - Args: - directory_path: The path of the directory to create. - """ - directory_path = os.path.expanduser(directory_path) - if not os.path.exists(directory_path): - try: - os.makedirs(directory_path) - except OSError as e: - if e.errno != errno.EEXIST: - raise e - logger.warning( - "Attempted to create '{}', but the directory already " - "exists.".format(directory_path)) - # Change the log directory permissions so others can use it. This is - # important when multiple people are using the same machine. - try: - os.chmod(directory_path, 0o0777) - except OSError as e: - # Silently suppress the PermissionError that is thrown by the chmod. - # This is done because the user attempting to change the permissions - # on a directory may not own it. The chmod is attempted whether the - # directory is new or not to avoid race conditions. - # ray-project/ray/#3591 - if e.errno in [errno.EACCES, errno.EPERM]: - pass - else: - raise - - -def get_temp_root(): - """Get the path of the temporary root. If not existing, it will be created. - """ - global _temp_root - - date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S") - - # Lazy creation. Avoid creating directories never used. - if _temp_root is None: - _temp_root = make_inc_temp( - prefix="session_{date_str}_{pid}".format( - pid=os.getpid(), date_str=date_str), - directory_name="/tmp/ray") - try_to_create_directory(_temp_root) - return _temp_root - - -def set_temp_root(path): - """Set the path of the temporary root. It will be created lazily.""" - global _temp_root - _temp_root = path - - -def get_logs_dir_path(): - """Get a temp dir for logging.""" - logs_dir = os.path.join(get_temp_root(), "logs") - try_to_create_directory(logs_dir) - return logs_dir - - -def get_sockets_dir_path(): - """Get a temp dir for sockets.""" - sockets_dir = os.path.join(get_temp_root(), "sockets") - try_to_create_directory(sockets_dir) - return sockets_dir - - -def get_raylet_socket_name(suffix=""): - """Get a socket name for raylet.""" - sockets_dir = get_sockets_dir_path() - - raylet_socket_name = make_inc_temp( - prefix="raylet", directory_name=sockets_dir, suffix=suffix) - return raylet_socket_name - - -def get_object_store_socket_name(): - """Get a socket name for plasma object store.""" - sockets_dir = get_sockets_dir_path() - return make_inc_temp(prefix="plasma_store", directory_name=sockets_dir) - - -def get_ipython_notebook_path(): - """Get a new ipython notebook path""" - - notebook_filepath = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb") - # We copy the notebook file so that the original doesn't get modified by - # the user. - notebook_name = make_inc_temp( - suffix=".ipynb", prefix="ray_ui", directory_name=get_temp_root()) - shutil.copy(notebook_filepath, notebook_name) - return notebook_name - - -def get_gdb_init_path(process_type): - return make_inc_temp( - prefix="gdb_init_{}".format(process_type), - directory_name=get_temp_root()) - - -def new_log_files(name, redirect_output): - """Generate partially randomized filenames for log files. - - Args: - name (str): descriptive string for this log file. - redirect_output (bool): True if files should be generated for logging - stdout and stderr and false if stdout and stderr should not be - redirected. - - Returns: - If redirect_output is true, this will return a tuple of two - filehandles. The first is for redirecting stdout and the second is - for redirecting stderr. If redirect_output is false, this will - return a tuple of two None objects. - """ - if not redirect_output: - return None, None - - # Create a directory to be used for process log files. - logs_dir = get_logs_dir_path() - # Create another directory that will be used by some of the RL algorithms. - - log_stdout = make_inc_temp( - suffix=".out", prefix=name, directory_name=logs_dir) - log_stderr = make_inc_temp( - suffix=".err", prefix=name, directory_name=logs_dir) - # Line-buffer the output (mode 1) - log_stdout_file = open(log_stdout, "a", buffering=1) - log_stderr_file = open(log_stderr, "a", buffering=1) - return log_stdout_file, log_stderr_file - - -def new_redis_log_file(redirect_output, shard_number=None): - """Create new logging files for redis""" - if shard_number is None: - redis_stdout_file, redis_stderr_file = new_log_files( - "redis", redirect_output) - else: - redis_stdout_file, redis_stderr_file = new_log_files( - "redis-shard_{}".format(shard_number), redirect_output) - return redis_stdout_file, redis_stderr_file - - -def new_raylet_log_file(redirect_output): - """Create new logging files for raylet.""" - raylet_stdout_file, raylet_stderr_file = new_log_files( - "raylet", redirect_output=redirect_output) - return raylet_stdout_file, raylet_stderr_file - - -def new_webui_log_file(): - """Create new logging files for web ui.""" - ui_stdout_file, ui_stderr_file = new_log_files( - "webui", redirect_output=True) - return ui_stdout_file, ui_stderr_file - - -def new_worker_redirected_log_file(worker_id): - """Create new logging files for workers to redirect its output.""" - worker_stdout_file, worker_stderr_file = (new_log_files( - "worker-" + ray.utils.binary_to_hex(worker_id), True)) - return worker_stdout_file, worker_stderr_file - - -def new_log_monitor_log_file(): - """Create new logging files for the log monitor.""" - log_monitor_stdout_file, log_monitor_stderr_file = new_log_files( - "log_monitor", redirect_output=True) - return log_monitor_stdout_file, log_monitor_stderr_file - - -def new_plasma_store_log_file(redirect_output): - """Create new logging files for the plasma store.""" - plasma_store_stdout_file, plasma_store_stderr_file = new_log_files( - "plasma_store", redirect_output) - return plasma_store_stdout_file, plasma_store_stderr_file - - -def new_monitor_log_file(redirect_output): - """Create new logging files for the monitor.""" - monitor_stdout_file, monitor_stderr_file = new_log_files( - "monitor", redirect_output) - return monitor_stdout_file, monitor_stderr_file - - -def new_raylet_monitor_log_file(redirect_output): - """Create new logging files for the raylet monitor.""" - raylet_monitor_stdout_file, raylet_monitor_stderr_file = new_log_files( - "raylet_monitor", redirect_output) - return raylet_monitor_stdout_file, raylet_monitor_stderr_file diff --git a/python/ray/utils.py b/python/ray/utils.py index a7da864e3..d80e10f79 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import binascii +import errno import functools import hashlib import inspect @@ -465,3 +466,36 @@ def thread_safe_client(client, lock=None): def is_main_thread(): return threading.current_thread().getName() == "MainThread" + + +def try_to_create_directory(directory_path): + """Attempt to create a directory that is globally readable/writable. + + Args: + directory_path: The path of the directory to create. + """ + logger = logging.getLogger("ray") + directory_path = os.path.expanduser(directory_path) + if not os.path.exists(directory_path): + try: + os.makedirs(directory_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise e + logger.warning( + "Attempted to create '{}', but the directory already " + "exists.".format(directory_path)) + # Change the log directory permissions so others can use it. This is + # important when multiple people are using the same machine. + try: + os.chmod(directory_path, 0o0777) + except OSError as e: + # Silently suppress the PermissionError that is thrown by the chmod. + # This is done because the user attempting to change the permissions + # on a directory may not own it. The chmod is attempted whether the + # directory is new or not to avoid race conditions. + # ray-project/ray/#3591 + if e.errno in [errno.EACCES, errno.EPERM]: + pass + else: + raise diff --git a/python/ray/worker.py b/python/ray/worker.py index aef0b7ae9..5c936bfa2 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -32,7 +32,6 @@ import ray.remote_function import ray.serialization as serialization import ray.services as services import ray.signature -import ray.tempfile_services as tempfile_services import ray.ray_constants as ray_constants from ray import import_thread from ray import ObjectID, DriverID, ActorID, ActorHandleID, ClientID, TaskID @@ -1387,10 +1386,13 @@ def init(redis_address=None, "redis_address": redis_address } + global _global_node if driver_mode == LOCAL_MODE: # If starting Ray in LOCAL_MODE, don't start any other processes. pass elif redis_address is None: + # TODO(suquark): We should remove the code below because they + # have been set when initializing the node. if node_ip_address is None: node_ip_address = ray.services.get_node_ip_address() if num_redis_shards is None: @@ -1424,7 +1426,6 @@ def init(redis_address=None, # Start the Ray processes. We set shutdown_at_exit=False because we # shutdown the node in the ray.shutdown call that happens in the atexit # handler. - global _global_node _global_node = ray.node.Node( head=True, shutdown_at_exit=False, ray_params=ray_params) address_info["redis_address"] = _global_node.redis_address @@ -1481,6 +1482,18 @@ def init(redis_address=None, # Get the address info of the processes to connect to from Redis. address_info = get_address_info_from_redis( redis_address, node_ip_address, redis_password=redis_password) + # TODO(suquark): Use "node" as the input of "connect()". + # In this case, we only need to connect the node. + ray_params = ray.parameter.RayParams( + node_ip_address=node_ip_address, + redis_address=redis_address, + redis_password=redis_password, + plasma_store_socket_name=address_info["object_store_address"], + raylet_socket_name=address_info["raylet_socket_name"], + object_id_seed=object_id_seed, + temp_dir=temp_dir) + _global_node = ray.node.Node( + ray_params, head=False, shutdown_at_exit=False, connect_only=True) if driver_mode == LOCAL_MODE: driver_address_info = {} @@ -1493,8 +1506,6 @@ def init(redis_address=None, "raylet_socket_name": address_info["raylet_socket_name"], } - # We only pass `temp_dir` to a worker (WORKER_MODE). - # It can't be a worker here. connect( driver_address_info, redis_password=redis_password, @@ -1851,8 +1862,7 @@ def connect(info, redirect_worker_output = 0 if redirect_worker_output: log_stdout_file, log_stderr_file = ( - tempfile_services.new_worker_redirected_log_file( - worker.worker_id)) + _global_node.new_worker_redirected_log_file(worker.worker_id)) # Redirect stdout/stderr at the file descriptor level. If we simply # set sys.stdout and sys.stderr, then logging from C++ can fail to # be redirected. diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index ffb1d29f0..c73167630 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -7,9 +7,10 @@ import traceback import ray import ray.actor +import ray.node import ray.ray_constants as ray_constants -import ray.tempfile_services as tempfile_services import ray.utils +from ray.parameter import RayParams parser = argparse.ArgumentParser( description=("Parse addresses for the worker " @@ -70,9 +71,19 @@ if __name__ == "__main__": ray.utils.setup_logger(args.logging_level, args.logging_format) - # Override the temporary directory. - tempfile_services.set_temp_root(args.temp_dir) + ray_params = RayParams( + node_ip_address=args.node_ip_address, + redis_address=args.redis_address, + redis_password=args.redis_password, + plasma_store_socket_name=args.object_store_name, + raylet_socket_name=args.raylet_name, + temp_dir=args.temp_dir) + node = ray.node.Node( + ray_params, head=False, shutdown_at_exit=False, connect_only=True) + ray.worker._global_node = node + + # TODO(suquark): Use "node" as the input of "connect". ray.worker.connect( info, redis_password=args.redis_password, mode=ray.WORKER_MODE) diff --git a/test/stress_tests.py b/test/stress_tests.py index 311fbd48e..117002a2f 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -9,7 +9,6 @@ import pytest import time import ray -import ray.tempfile_services from ray.test.cluster_utils import Cluster import ray.ray_constants as ray_constants diff --git a/test/tempfile_test.py b/test/tempfile_test.py index 465738c5d..7bd167006 100644 --- a/test/tempfile_test.py +++ b/test/tempfile_test.py @@ -3,7 +3,6 @@ import shutil import time import pytest import ray -import ray.tempfile_services as tempfile_services def test_conn_cluster(): @@ -66,9 +65,10 @@ def test_temp_plasma_store_socket(): def test_raylet_tempfiles(): ray.init(redirect_worker_output=False) - top_levels = set(os.listdir(tempfile_services.get_temp_root())) + node = ray.worker._global_node + top_levels = set(os.listdir(node.get_temp_dir_path())) assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} - log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) + log_files = set(os.listdir(node.get_logs_dir_path())) assert log_files == { "log_monitor.out", "log_monitor.err", "plasma_store.out", "plasma_store.err", "webui.out", "webui.err", "monitor.out", @@ -76,14 +76,15 @@ def test_raylet_tempfiles(): "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err", "raylet.out", "raylet.err" } # with raylet logs - socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) + socket_files = set(os.listdir(node.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} ray.shutdown() ray.init(redirect_worker_output=True, num_cpus=0) - top_levels = set(os.listdir(tempfile_services.get_temp_root())) + node = ray.worker._global_node + top_levels = set(os.listdir(node.get_temp_dir_path())) assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} - log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) + log_files = set(os.listdir(node.get_logs_dir_path())) assert log_files == { "log_monitor.out", "log_monitor.err", "plasma_store.out", "plasma_store.err", "webui.out", "webui.err", "monitor.out", @@ -91,15 +92,16 @@ def test_raylet_tempfiles(): "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err", "raylet.out", "raylet.err" } # with raylet logs - socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) + socket_files = set(os.listdir(node.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} ray.shutdown() ray.init(redirect_worker_output=True, num_cpus=2) - top_levels = set(os.listdir(tempfile_services.get_temp_root())) + node = ray.worker._global_node + top_levels = set(os.listdir(node.get_temp_dir_path())) assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} time.sleep(3) # wait workers to start - log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) + log_files = set(os.listdir(node.get_logs_dir_path())) assert log_files.issuperset({ "log_monitor.out", "log_monitor.err", "plasma_store.out", "plasma_store.err", "webui.out", "webui.err", "monitor.out", @@ -112,6 +114,6 @@ def test_raylet_tempfiles(): assert sum( 1 for filename in log_files if filename.startswith("worker")) == 4 - socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) + socket_files = set(os.listdir(node.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} ray.shutdown()