diff --git a/doc/source/tempfile.rst b/doc/source/tempfile.rst index 11b3a772c..71ed0c1b2 100644 --- a/doc/source/tempfile.rst +++ b/doc/source/tempfile.rst @@ -4,19 +4,29 @@ Temporary Files Ray will produce some temporary files during running. They are useful for logging, debugging & sharing object store with other programs. +Ray session +----------- + +First we introduce the concept of a Ray session. + +A Ray session represents all tasks, processes, and resources managed by Ray. A +session is created by executing the ``ray start`` command or by calling +``ray.init()``, and it is terminated by executing ``ray stop`` or calling +``ray.shutdown()``. + +Each Ray session will have a unique name. By default, the name is +``session_{timestamp}_{pid}``. The format of ``timestamp`` is +``%Y-%m-%d_%H-%M-%S_%f`` (See `Python time format `__ for details); +the pid belongs to the startup process (the process calling ``ray.init()`` or +the Ray process executed by a shell in ``ray start``). + Location of Temporary Files --------------------------- -First we introduce the concept of a session of Ray. - -A session contains a set of processes. A session is created by executing -``ray start`` command or call ``ray.init()`` in a Python script and ended by -executing ``ray stop`` or call ``ray.shutdown()``. - -For each session, Ray will create a *root temporary directory* to place all its -temporary files. The path is ``/tmp/ray/session_{datetime}_{pid}`` by default. -The pid belongs to the startup process (the process calling ``ray.init()`` or -the Ray process executed by a shell in ``ray start``). +For each session, Ray will place all its temporary files under the +*session directory*. A *session directory* is a subdirectory of the +*root temporary path* (``/tmp/ray`` by default), +so the default session directory is ``/tmp/ray/{ray_session_name}``. You can sort by their names to find the latest session. You are allowed to change the *root temporary directory* in one of these ways: @@ -25,10 +35,7 @@ You are allowed to change the *root temporary directory* in one of these ways: * Specify ``temp_dir`` when call ``ray.init()`` You can also use ``default_worker.py --temp-dir={your temp path}`` to -start a new worker with given *root temporary directory*. - -The *root temporary directory* you specified will be given as it is, -without pids or datetime attached. +start a new worker with the given *root temporary directory*. Layout of Temporary Files ------------------------- @@ -45,11 +52,11 @@ A typical layout of temporary files could look like this: │   ├── log_monitor.out │   ├── monitor.err │   ├── monitor.out - │   ├── plasma_store_0.err # array of plasma stores' outputs - │   ├── plasma_store_0.out - │   ├── raylet_0.err - │   ├── raylet_0.out - │   ├── redis-shard_0.err # array of redis shards' outputs + │   ├── plasma_store.err # outputs of the plasma store + │   ├── plasma_store.out + │   ├── raylet.err # outputs of the raylet process + │   ├── raylet.out + │   ├── redis-shard_0.err # outputs of redis shards │   ├── redis-shard_0.out │   ├── redis.err # redis │   ├── redis.out @@ -58,7 +65,6 @@ A typical layout of temporary files could look like this: │   ├── worker-{worker_id}.err # redirected output of workers │   ├── worker-{worker_id}.out │   └── {other workers} - ├── ray_ui.ipynb # ipython notebook file └── sockets # for sockets ├── plasma_store └── raylet # this could be deleted by Ray's shutdown cleanup. diff --git a/python/ray/node.py b/python/ray/node.py index 991131d8b..733f21c9d 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -62,6 +62,7 @@ class Node(object): if shutdown_at_exit and connect_only: raise ValueError("'shutdown_at_exit' and 'connect_only' cannot " "be both true.") + self.head = head self.all_processes = {} # Try to get node IP address with the parameters. @@ -78,6 +79,7 @@ class Node(object): include_log_monitor=True, resources={}, include_webui=False, + temp_dir="/tmp/ray", worker_path=os.path.join( os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py")) @@ -87,7 +89,19 @@ class Node(object): self._config = (json.loads(ray_params._internal_config) if ray_params._internal_config else None) - self._init_temp() + if head: + redis_client = None + # date including microsecond + date_str = datetime.datetime.today().strftime( + "%Y-%m-%d_%H-%M-%S_%f") + self.session_name = "session_{date_str}_{pid}".format( + pid=os.getpid(), date_str=date_str) + else: + redis_client = self.create_redis_client() + self.session_name = ray.utils.decode( + redis_client.get("session_name")) + + self._init_temp(redis_client) if connect_only: # Get socket names from the configuration. @@ -119,7 +133,6 @@ class Node(object): ray_params.update_if_absent(num_redis_shards=1, include_webui=True) self._webui_url = None else: - redis_client = self.create_redis_client() self._webui_url = ( ray.services.get_webui_url_from_redis(redis_client)) ray_params.include_java = ( @@ -128,6 +141,10 @@ class Node(object): # Start processes. if head: self.start_head_processes() + redis_client = self.create_redis_client() + redis_client.set("session_name", self.session_name) + redis_client.set("session_dir", self._session_dir) + redis_client.set("temp_dir", self._temp_dir) if not connect_only: self.start_ray_processes() @@ -136,25 +153,31 @@ class Node(object): atexit.register(lambda: self.kill_all_processes( check_alive=False, allow_graceful=True)) - def _init_temp(self): + def _init_temp(self, redis_client): # Create an dictionary to store temp file index. self._incremental_dict = collections.defaultdict(lambda: 0) - self._temp_dir = self._ray_params.temp_dir - if self._temp_dir is None: - date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S") - self._temp_dir = self._make_inc_temp( - prefix="session_{date_str}_{pid}".format( - pid=os.getpid(), date_str=date_str), - directory_name="/tmp/ray") + if self.head: + self._temp_dir = self._ray_params.temp_dir + else: + self._temp_dir = ray.utils.decode(redis_client.get("temp_dir")) - try_to_create_directory(self._temp_dir) + try_to_create_directory(self._temp_dir, warn_if_exist=False) + + if self.head: + self._session_dir = os.path.join(self._temp_dir, self.session_name) + else: + self._session_dir = ray.utils.decode( + redis_client.get("session_dir")) + + # Send a warning message if the session exists. + try_to_create_directory(self._session_dir) # Create a directory to be used for socket files. - self._sockets_dir = os.path.join(self._temp_dir, "sockets") - try_to_create_directory(self._sockets_dir) + self._sockets_dir = os.path.join(self._session_dir, "sockets") + try_to_create_directory(self._sockets_dir, warn_if_exist=False) # Create a directory to be used for process log files. - self._logs_dir = os.path.join(self._temp_dir, "logs") - try_to_create_directory(self._logs_dir) + self._logs_dir = os.path.join(self._session_dir, "logs") + try_to_create_directory(self._logs_dir, warn_if_exist=False) @property def node_ip_address(self): @@ -204,6 +227,7 @@ class Node(object): "object_store_address": self._plasma_store_socket_name, "raylet_socket_name": self._raylet_socket_name, "webui_url": self._webui_url, + "session_dir": self._session_dir, } def create_redis_client(self): @@ -215,6 +239,10 @@ class Node(object): """Get the path of the temporary directory.""" return self._temp_dir + def get_session_dir_path(self): + """Get the path of the session directory.""" + return self._session_dir + def get_logs_dir_path(self): """Get the path of the log files directory.""" return self._logs_dir diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 191c9287e..2ba51c167 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -9,12 +9,6 @@ import pytest import ray from ray.tests.cluster_utils import Cluster -# Py2 compatibility -try: - FileNotFoundError -except NameError: - FileNotFoundError = OSError - def test_conn_cluster(): # plasma_store_socket_name @@ -45,13 +39,25 @@ def test_conn_cluster(): def test_tempdir(): + shutil.rmtree("/tmp/ray", ignore_errors=True) ray.init(temp_dir="/tmp/i_am_a_temp_dir") assert os.path.exists( "/tmp/i_am_a_temp_dir"), "Specified temp dir not found." + assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist." ray.shutdown() shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True) +def test_tempdir_commandline(): + shutil.rmtree("/tmp/ray", ignore_errors=True) + os.system("ray start --head --temp-dir=/tmp/i_am_a_temp_dir2") + assert os.path.exists( + "/tmp/i_am_a_temp_dir2"), "Specified temp dir not found." + assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist." + os.system("ray stop") + shutil.rmtree("/tmp/i_am_a_temp_dir2", ignore_errors=True) + + def test_raylet_socket_name(): ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket") assert os.path.exists( @@ -59,7 +65,7 @@ def test_raylet_socket_name(): ray.shutdown() try: os.remove("/tmp/i_am_a_temp_socket") - except FileNotFoundError: + except OSError: pass # It could have been removed by Ray. cluster = Cluster(True) cluster.add_node(raylet_socket_name="/tmp/i_am_a_temp_socket_2") @@ -68,7 +74,7 @@ def test_raylet_socket_name(): cluster.shutdown() try: os.remove("/tmp/i_am_a_temp_socket_2") - except FileNotFoundError: + except OSError: pass # It could have been removed by Ray. @@ -79,7 +85,7 @@ def test_temp_plasma_store_socket(): ray.shutdown() try: os.remove("/tmp/i_am_a_temp_socket") - except FileNotFoundError: + except OSError: pass # It could have been removed by Ray. cluster = Cluster(True) cluster.add_node(plasma_store_socket_name="/tmp/i_am_a_temp_socket_2") @@ -88,14 +94,14 @@ def test_temp_plasma_store_socket(): cluster.shutdown() try: os.remove("/tmp/i_am_a_temp_socket_2") - except FileNotFoundError: + except OSError: pass # It could have been removed by Ray. def test_raylet_tempfiles(): ray.init(num_cpus=0) node = ray.worker._global_node - top_levels = set(os.listdir(node.get_temp_dir_path())) + top_levels = set(os.listdir(node.get_session_dir_path())) assert top_levels.issuperset({"sockets", "logs"}) log_files = set(os.listdir(node.get_logs_dir_path())) assert log_files.issuperset({ @@ -110,7 +116,7 @@ def test_raylet_tempfiles(): ray.init(num_cpus=2) node = ray.worker._global_node - top_levels = set(os.listdir(node.get_temp_dir_path())) + top_levels = set(os.listdir(node.get_session_dir_path())) assert top_levels.issuperset({"sockets", "logs"}) time.sleep(3) # wait workers to start log_files = set(os.listdir(node.get_logs_dir_path())) @@ -128,3 +134,20 @@ def test_raylet_tempfiles(): socket_files = set(os.listdir(node.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} ray.shutdown() + + +def test_tempdir_privilege(): + os.chmod("/tmp/ray", 0o000) + ray.init(num_cpus=1) + session_dir = ray.worker._global_node.get_session_dir_path() + assert os.path.exists(session_dir), "Specified socket path not found." + ray.shutdown() + + +def test_session_dir_uniqueness(): + session_dirs = set() + for _ in range(3): + ray.init(num_cpus=1) + session_dirs.add(ray.worker._global_node.get_session_dir_path) + ray.shutdown() + assert len(session_dirs) == 3 diff --git a/python/ray/utils.py b/python/ray/utils.py index 76d36447f..0f26aa22d 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -500,25 +500,7 @@ def is_main_thread(): return threading.current_thread().getName() == "MainThread" -def try_to_create_directory(directory_path): - """Attempt to create a directory that is globally readable/writable. - - Args: - directory_path: The path of the directory to create. - """ - logger = logging.getLogger("ray") - directory_path = os.path.expanduser(directory_path) - if not os.path.exists(directory_path): - try: - os.makedirs(directory_path) - except OSError as e: - if e.errno != errno.EEXIST: - raise e - logger.warning( - "Attempted to create '{}', but the directory already " - "exists.".format(directory_path)) - # Change the log directory permissions so others can use it. This is - # important when multiple people are using the same machine. +def try_make_directory_shared(directory_path): try: os.chmod(directory_path, 0o0777) except OSError as e: @@ -531,3 +513,28 @@ def try_to_create_directory(directory_path): pass else: raise + + +def try_to_create_directory(directory_path, warn_if_exist=True): + """Attempt to create a directory that is globally readable/writable. + + Args: + directory_path: The path of the directory to create. + warn_if_exist (bool): Warn if the directory already exists. + """ + logger = logging.getLogger("ray") + directory_path = os.path.expanduser(directory_path) + if not os.path.exists(directory_path): + try: + os.makedirs(directory_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise e + if warn_if_exist: + logger.warning( + "Attempted to create '{}', but the directory already " + "exists.".format(directory_path)) + + # Change the log directory permissions so others can use it. This is + # important when multiple people are using the same machine. + try_make_directory_shared(directory_path) diff --git a/python/ray/worker.py b/python/ray/worker.py index c3a9f53ce..bbcf1bb22 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1694,8 +1694,7 @@ def connect(node, mode=WORKER_MODE, log_to_driver=False, worker=global_worker, - driver_id=None, - load_code_from_local=False): + driver_id=None): """Connect this worker to the raylet, to Plasma, and to Redis. Args: