From cc7e2ecdd5499bfd5ba93205f17c2a0783578533 Mon Sep 17 00:00:00 2001 From: Si-Yuan Date: Wed, 3 Oct 2018 10:03:53 -0700 Subject: [PATCH] Change logfile names and also allow plasma store socket to be passed in. (#2862) --- .travis.yml | 6 + doc/source/index.rst | 1 + doc/source/tempfile.rst | 87 ++++++ .../local_scheduler_services.py | 13 +- python/ray/plasma/plasma.py | 16 +- python/ray/ray_constants.py | 4 +- python/ray/scripts/scripts.py | 25 +- python/ray/services.py | 204 ++++++------ python/ray/tempfile_services.py | 292 ++++++++++++++++++ python/ray/worker.py | 50 ++- python/ray/workers/default_worker.py | 10 + test/stress_tests.py | 9 +- test/tempfile_test.py | 119 +++++++ 13 files changed, 696 insertions(+), 140 deletions(-) create mode 100644 doc/source/tempfile.rst create mode 100644 python/ray/tempfile_services.py create mode 100644 test/tempfile_test.py diff --git a/.travis.yml b/.travis.yml index 47bef360e..35743b764 100644 --- a/.travis.yml +++ b/.travis.yml @@ -165,6 +165,9 @@ matrix: - python -m pytest -v python/ray/rllib/test/test_optimizers.py - python -m pytest -v python/ray/rllib/test/test_evaluators.py + # ray temp file tests + - python -m pytest -v test/tempfile_test.py + install: - ./.travis/install-dependencies.sh @@ -237,6 +240,9 @@ script: - python -m pytest -v python/ray/rllib/test/test_optimizers.py - python -m pytest -v python/ray/rllib/test/test_evaluators.py + # ray temp file tests + - python -m pytest -v test/tempfile_test.py + deploy: - provider: s3 access_key_id: AKIAJ2L7XDUSZVTXI5QA diff --git a/doc/source/index.rst b/doc/source/index.rst index d951066e8..d8870bbaa 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -118,6 +118,7 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin plasma-object-store.rst resources.rst redis-memory-management.rst + tempfile.rst .. toctree:: :maxdepth: 1 diff --git a/doc/source/tempfile.rst b/doc/source/tempfile.rst new file mode 100644 index 000000000..7e489348c --- /dev/null +++ b/doc/source/tempfile.rst @@ -0,0 +1,87 @@ +Temporary Files +=============== + +Ray will produce some temporary files during running. +They are useful for logging, debugging & sharing object store with other programs. + +Location of Temporary Files +--------------------------- + +First we introduce the concept of a session of Ray. + +A session contains a set of processes. A session is created by executing +``ray start`` command or call ``ray.init()`` in a Python script and ended by +executing ``ray stop`` or call ``ray.shutdown()``. + +For each session, Ray will create a *root temporary directory* to place all its +temporary files. The path is ``/tmp/ray/session_{datetime}_{pid}`` by default. +The pid belongs to the startup process (the process calling ``ray.init()`` or +the Ray process executed by a shell in ``ray start``). +You can sort by their names to find the latest session. + +You are allowed to change the *root temporary directory* in one of these ways: + +* Pass ``--temp-dir={your temp path}`` to ``ray start`` +* Specify ``temp_dir`` when call ``ray.init()`` + +You can also use ``default_worker.py --temp-dir={your temp path}`` to +start a new worker with given *root temporary directory*. + +The *root temporary directory* you specified will be given as it is, +without pids or datetime attached. + +Layout of Temporary Files +------------------------- + +A typical layout of temporary files could look like this: + +.. code-block:: text + + /tmp + └── ray + └── session_{datetime}_{pid} + ├── logs + │   ├── log_monitor.err + │   ├── log_monitor.out + │   ├── monitor.err + │   ├── monitor.out + │   ├── plasma_manager_0.err # array of plasma managers' outputs + │   ├── plasma_manager_0.out + │   ├── plasma_store_0.err # array of plasma stores' outputs + │   ├── plasma_store_0.out + │   ├── raylet_0.err # array of raylets' outputs. Control it with `--no-redirect-worker-output` (in Ray's command line) or `redirect_worker_output` (in ray.init()) + │   ├── raylet_0.out + │   ├── redis-shard_0.err # array of redis shards' outputs + │   ├── redis-shard_0.out + │   ├── redis.err # redis + │   ├── redis.out + │   ├── webui.err # ipython notebook web ui + │   ├── webui.out + │   ├── worker-{worker_id}.err # redirected output of workers + │   ├── worker-{worker_id}.out + │   └── {other workers} + ├── ray_ui.ipynb # ipython notebook file + └── sockets # for logging + ├── plasma_store + └── raylet # this could be deleted by Ray's shutdown cleanup. + + +Plasma Object Store Socket +-------------------------- + +Plasma object store sockets can be used to share objects with other programs using Apache Arrow. + +You are allowed to specify the plasma object store socket in one of these ways: + +* Pass ``--plasma-store-socket-name={your socket path}`` to ``ray start`` +* Specify ``plasma_store_socket_name`` when call ``ray.init()`` + +The path you specified will be given as it is without being affected any other paths. + +Notes +----- + +Temporary file policies are defined in ``python/ray/tempfile_services.py``. + +Currently, we keep ``/tmp/ray`` as the default directory for temporary data files of RLlib as before. +It is not very reasonable and could be changed later. diff --git a/python/ray/local_scheduler/local_scheduler_services.py b/python/ray/local_scheduler/local_scheduler_services.py index f7847ce55..c576014e2 100644 --- a/python/ray/local_scheduler/local_scheduler_services.py +++ b/python/ray/local_scheduler/local_scheduler_services.py @@ -4,14 +4,12 @@ from __future__ import print_function import multiprocessing import os -import random import subprocess import sys import time - -def random_name(): - return str(random.randint(0, 99999999)) +from ray.tempfile_services import (get_local_scheduler_socket_name, + get_temp_root) def start_local_scheduler(plasma_store_name, @@ -71,7 +69,7 @@ def start_local_scheduler(plasma_store_name, local_scheduler_executable = os.path.join( os.path.dirname(os.path.abspath(__file__)), "../core/src/local_scheduler/local_scheduler") - local_scheduler_name = "/tmp/scheduler{}".format(random_name()) + local_scheduler_name = get_local_scheduler_socket_name() command = [ local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name, "-h", node_ip_address, "-n", @@ -88,11 +86,12 @@ def start_local_scheduler(plasma_store_name, "--object-store-name={} " "--object-store-manager-name={} " "--local-scheduler-name={} " - "--redis-address={}".format( + "--redis-address={} " + "--temp-dir={}".format( sys.executable, worker_path, node_ip_address, plasma_store_name, plasma_manager_name, local_scheduler_name, - redis_address)) + redis_address, get_temp_root())) command += ["-w", start_worker_command] if redis_address is not None: command += ["-r", redis_address] diff --git a/python/ray/plasma/plasma.py b/python/ray/plasma/plasma.py index 60870c2b2..262aeebfb 100644 --- a/python/ray/plasma/plasma.py +++ b/python/ray/plasma/plasma.py @@ -8,6 +8,9 @@ import subprocess import sys import time +from ray.tempfile_services import (get_object_store_socket_name, + get_plasma_manager_socket_name) + __all__ = [ "start_plasma_store", "start_plasma_manager", "DEFAULT_PLASMA_STORE_MEMORY" ] @@ -17,17 +20,14 @@ PLASMA_WAIT_TIMEOUT = 2**30 DEFAULT_PLASMA_STORE_MEMORY = 10**9 -def random_name(): - return str(random.randint(0, 99999999)) - - def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=False, use_profiler=False, stdout_file=None, stderr_file=None, plasma_directory=None, - huge_pages=False): + huge_pages=False, + socket_name=None): """Start a plasma store process. Args: @@ -43,6 +43,8 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, be created. huge_pages: a boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. + socket_name (str): If provided, it will specify the socket + name used by the plasma store. Return: A tuple of the name of the plasma store socket and the process ID of @@ -66,7 +68,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, plasma_store_executable = os.path.join( os.path.abspath(os.path.dirname(__file__)), "../core/src/plasma/plasma_store_server") - plasma_store_name = "/tmp/plasma_store{}".format(random_name()) + plasma_store_name = socket_name or get_object_store_socket_name() command = [ plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory) @@ -136,7 +138,7 @@ def start_plasma_manager(store_name, plasma_manager_executable = os.path.join( os.path.abspath(os.path.dirname(__file__)), "../core/src/plasma/plasma_manager") - plasma_manager_name = "/tmp/plasma_manager{}".format(random_name()) + plasma_manager_name = get_plasma_manager_socket_name() if plasma_manager_port is not None: if num_retries != 1: raise Exception("num_retries must be 1 if port is specified.") diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index a9e4519d4..d62b57b5c 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -5,7 +5,7 @@ from __future__ import print_function import os -import ray +from ray.local_scheduler import ObjectID def env_integer(key, default): @@ -15,7 +15,7 @@ def env_integer(key, default): ID_SIZE = 20 -NIL_JOB_ID = ray.ObjectID(ID_SIZE * b"\xff") +NIL_JOB_ID = ObjectID(ID_SIZE * b"\xff") # If a remote function or actor (or some other export) has serialized size # greater than this quantity, print an warning. diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 0826a1387..d3e9417c1 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -177,11 +177,24 @@ def cli(logging_level, logging_format): is_flag=True, default=False, help="do not redirect non-worker stdout and stderr to files") +@click.option( + "--plasma-store-socket-name", + default=None, + help="manually specify the socket name of the plasma store") +@click.option( + "--raylet-socket-name", + default=None, + help="manually specify the socket path of the raylet process") +@click.option( + "--temp-dir", + default=None, + help="manually specify the root temporary dir of the Ray process") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_shard_ports, object_manager_port, object_store_memory, num_workers, num_cpus, num_gpus, resources, head, no_ui, block, plasma_directory, huge_pages, autoscaling_config, - use_raylet, no_redirect_worker_output, no_redirect_output): + use_raylet, no_redirect_worker_output, no_redirect_output, + plasma_store_socket_name, raylet_socket_name, temp_dir): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -260,7 +273,10 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, plasma_directory=plasma_directory, huge_pages=huge_pages, autoscaling_config=autoscaling_config, - use_raylet=use_raylet) + use_raylet=use_raylet, + plasma_store_socket_name=plasma_store_socket_name, + raylet_socket_name=raylet_socket_name, + temp_dir=temp_dir) logger.info(address_info) logger.info( "\nStarted Ray on this node. You can add additional nodes to " @@ -329,7 +345,10 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, resources=resources, plasma_directory=plasma_directory, huge_pages=huge_pages, - use_raylet=use_raylet) + use_raylet=use_raylet, + plasma_store_socket_name=plasma_store_socket_name, + raylet_socket_name=raylet_socket_name, + temp_dir=temp_dir) logger.info(address_info) logger.info("\nStarted Ray on this node. If you wish to terminate the " "processes that have been started, run\n\n" diff --git a/python/ray/services.py b/python/ray/services.py index 3a421437c..ab632354f 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -2,14 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import binascii import json import logging import multiprocessing import os import random import resource -import shutil import signal import socket import subprocess @@ -17,8 +15,6 @@ import sys import threading import time from collections import OrderedDict, namedtuple -from datetime import datetime - import redis import pyarrow @@ -28,6 +24,14 @@ import ray.global_scheduler as global_scheduler import ray.local_scheduler import ray.plasma +from ray.tempfile_services import ( + get_ipython_notebook_path, get_logs_dir_path, get_raylet_socket_name, + get_temp_redis_config_path, get_temp_root, new_global_scheduler_log_file, + new_local_scheduler_log_file, new_log_monitor_log_file, + new_monitor_log_file, new_plasma_manager_log_file, + new_plasma_store_log_file, new_raylet_log_file, new_redis_log_file, + new_webui_log_file, new_worker_log_file, set_temp_root) + PROCESS_TYPE_MONITOR = "monitor" PROCESS_TYPE_LOG_MONITOR = "log_monitor" PROCESS_TYPE_WORKER = "worker" @@ -120,10 +124,6 @@ def new_port(): return random.randint(10000, 65535) -def random_name(): - return str(random.randint(0, 99999999)) - - def kill_process(p): """Kill a process. @@ -456,8 +456,7 @@ def start_redis(node_ip_address, A tuple of the address for the primary Redis shard and a list of addresses for the remaining shards. """ - redis_stdout_file, redis_stderr_file = new_log_files( - "redis", redirect_output) + redis_stdout_file, redis_stderr_file = new_redis_log_file(redirect_output) if redis_shard_ports is None: redis_shard_ports = num_redis_shards * [None] @@ -517,8 +516,8 @@ def start_redis(node_ip_address, # prefixed by "redis-". redis_shards = [] for i in range(num_redis_shards): - redis_stdout_file, redis_stderr_file = new_log_files( - "redis-{}".format(i), redirect_output) + redis_stdout_file, redis_stderr_file = new_redis_log_file( + redirect_output, shard_number=i) if not use_credis: redis_shard_port, _ = _start_redis_instance( node_ip_address=node_ip_address, @@ -572,7 +571,7 @@ def _make_temp_redis_config(node_ip_address): node_ip_address: The IP address of this node. This should not be 127.0.0.1. """ - redis_config_name = "/tmp/redis_conf{}".format(random_name()) + redis_config_name = get_temp_redis_config_path() with open(redis_config_name, 'w') as f: # This allows redis clients on the same machine to connect using the # node's IP address as opposed to just 127.0.0.1. This is only relevant @@ -799,15 +798,7 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True): then this process will be killed by services.cleanup() when the Python process that imported services exits. """ - new_env = os.environ.copy() - notebook_filepath = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb") - # We copy the notebook file so that the original doesn't get modified by - # the user. - random_ui_id = random.randint(0, 100000) - new_notebook_filepath = "/tmp/raylogs/ray_ui{}.ipynb".format(random_ui_id) - new_notebook_directory = os.path.dirname(new_notebook_filepath) - shutil.copy(notebook_filepath, new_notebook_filepath) + port = 8888 while True: try: @@ -821,7 +812,8 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True): new_env["REDIS_ADDRESS"] = redis_address # We generate the token used for authentication ourselves to avoid # querying the jupyter server. - token = ray.utils.decode(binascii.hexlify(os.urandom(24))) + new_notebook_directory, webui_url, token = ( + get_ipython_notebook_path(port)) # The --ip=0.0.0.0 flag is intended to enable connecting to a notebook # running within a docker container (from the outside). command = [ @@ -847,8 +839,6 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True): else: if cleanup: all_processes[PROCESS_TYPE_WEB_UI].append(ui_process) - webui_url = ("http://localhost:{}/notebooks/ray_ui{}.ipynb?token={}" - .format(port, random_ui_id, token)) logger.info("\n" + "=" * 70) logger.info("View the web UI at {}".format(webui_url)) logger.info("=" * 70 + "\n") @@ -971,6 +961,7 @@ def start_local_scheduler(redis_address, def start_raylet(redis_address, node_ip_address, + raylet_name, plasma_store_name, worker_path, resources=None, @@ -988,6 +979,7 @@ def start_raylet(redis_address, scheduler is running on. plasma_store_name (str): The name of the plasma store socket to connect to. + raylet_name (str): The name of the raylet socket to create. worker_path (str): The path of the script to use when the local scheduler starts up new workers. use_valgrind (bool): True if the raylet should be started inside @@ -1023,16 +1015,17 @@ def start_raylet(redis_address, ]) gcs_ip_address, gcs_port = redis_address.split(":") - raylet_name = "/tmp/raylet{}".format(random_name()) # Create the command that the Raylet will use to start workers. start_worker_command = ("{} {} " "--node-ip-address={} " "--object-store-name={} " "--raylet-name={} " - "--redis-address={}".format( + "--redis-address={} " + "--temp-dir={}".format( sys.executable, worker_path, node_ip_address, - plasma_store_name, raylet_name, redis_address)) + plasma_store_name, raylet_name, redis_address, + get_temp_root())) command = [ RAYLET_EXECUTABLE, @@ -1084,7 +1077,8 @@ def start_plasma_store(node_ip_address, cleanup=True, plasma_directory=None, huge_pages=False, - use_raylet=False): + use_raylet=False, + plasma_store_socket_name=None): """This method starts an object store process. Args: @@ -1158,7 +1152,8 @@ def start_plasma_store(node_ip_address, stdout_file=store_stdout_file, stderr_file=store_stderr_file, plasma_directory=plasma_directory, - huge_pages=huge_pages) + huge_pages=huge_pages, + socket_name=plasma_store_socket_name) # Start the plasma manager. if not use_raylet: if object_manager_port is not None: @@ -1235,7 +1230,8 @@ def start_worker(node_ip_address, "--object-store-name=" + object_store_name, "--object-store-manager-name=" + object_store_manager_name, "--local-scheduler-name=" + local_scheduler_name, - "--redis-address=" + str(redis_address) + "--redis-address=" + str(redis_address), + "--temp-dir=" + get_temp_root() ] p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) if cleanup: @@ -1327,7 +1323,10 @@ def start_ray_processes(address_info=None, plasma_directory=None, huge_pages=False, autoscaling_config=None, - use_raylet=False): + use_raylet=False, + plasma_store_socket_name=None, + raylet_socket_name=None, + temp_dir=None): """Helper method to start Ray processes. Args: @@ -1385,13 +1384,22 @@ def start_ray_processes(address_info=None, autoscaling_config: path to autoscaling config file. use_raylet: True if the new raylet code path should be used. This is not supported yet. + plasma_store_socket_name (str): If provided, it will specify the socket + name used by the plasma store. + raylet_socket_name (str): If provided, it will specify the socket path + used by the raylet process. + temp_dir (str): If provided, it will specify the root temporary + directory for the Ray process. Returns: A dictionary of the address information for the processes that were started. """ - logger.info( - "Process STDOUT and STDERR is being redirected to /tmp/raylogs/.") + + set_temp_root(temp_dir) + + logger.info("Process STDOUT and STDERR is being redirected to {}.".format( + get_logs_dir_path())) if resources is None: resources = {} @@ -1438,8 +1446,8 @@ def start_ray_processes(address_info=None, time.sleep(0.1) # Start monitoring the processes. - monitor_stdout_file, monitor_stderr_file = new_log_files( - "monitor", redirect_output) + monitor_stdout_file, monitor_stderr_file = new_monitor_log_file( + redirect_output) start_monitor( redis_address, node_ip_address, @@ -1464,8 +1472,8 @@ def start_ray_processes(address_info=None, # Start the log monitor, if necessary. if include_log_monitor: - log_monitor_stdout_file, log_monitor_stderr_file = new_log_files( - "log_monitor", redirect_output=True) + log_monitor_stdout_file, log_monitor_stderr_file = ( + new_log_monitor_log_file()) start_log_monitor( redis_address, node_ip_address, @@ -1476,7 +1484,7 @@ def start_ray_processes(address_info=None, # Start the global scheduler, if necessary. if include_global_scheduler and not use_raylet: global_scheduler_stdout_file, global_scheduler_stderr_file = ( - new_log_files("global_scheduler", redirect_output)) + new_global_scheduler_log_file(redirect_output)) start_global_scheduler( redis_address, node_ip_address, @@ -1505,10 +1513,14 @@ def start_ray_processes(address_info=None, # Start any object stores that do not yet exist. for i in range(num_local_schedulers - len(object_store_addresses)): # Start Plasma. - plasma_store_stdout_file, plasma_store_stderr_file = new_log_files( - "plasma_store_{}".format(i), redirect_output) - plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files( - "plasma_manager_{}".format(i), redirect_output) + plasma_store_stdout_file, plasma_store_stderr_file = ( + new_plasma_store_log_file(i, redirect_output)) + + # If we use raylet, plasma manager won't be started and we don't need + # to create temp files for them. + plasma_manager_stdout_file, plasma_manager_stderr_file = ( + new_plasma_manager_log_file(i, redirect_output and not use_raylet)) + object_store_address = start_plasma_store( node_ip_address, redis_address, @@ -1521,7 +1533,8 @@ def start_ray_processes(address_info=None, cleanup=cleanup, plasma_directory=plasma_directory, huge_pages=huge_pages, - use_raylet=use_raylet) + use_raylet=use_raylet, + plasma_store_socket_name=plasma_store_socket_name) object_store_addresses.append(object_store_address) time.sleep(0.1) @@ -1546,9 +1559,8 @@ def start_ray_processes(address_info=None, # redirect the worker output, then we cannot redirect the local # scheduler output. local_scheduler_stdout_file, local_scheduler_stderr_file = ( - new_log_files( - "local_scheduler_{}".format(i), - redirect_output=redirect_worker_output)) + new_local_scheduler_log_file( + i, redirect_output=redirect_worker_output)) local_scheduler_name = start_local_scheduler( redis_address, node_ip_address, @@ -1571,12 +1583,13 @@ def start_ray_processes(address_info=None, else: # Start any raylets that do not exist yet. for i in range(len(raylet_socket_names), num_local_schedulers): - raylet_stdout_file, raylet_stderr_file = new_log_files( - "raylet_{}".format(i), redirect_output=redirect_worker_output) + raylet_stdout_file, raylet_stderr_file = new_raylet_log_file( + i, redirect_output=redirect_worker_output) address_info["raylet_socket_names"].append( start_raylet( redis_address, node_ip_address, + raylet_socket_name or get_raylet_socket_name(), object_store_addresses[i].name, worker_path, resources=resources[i], @@ -1592,8 +1605,8 @@ def start_ray_processes(address_info=None, object_store_address = object_store_addresses[i] local_scheduler_name = local_scheduler_socket_names[i] for j in range(num_local_scheduler_workers): - worker_stdout_file, worker_stderr_file = new_log_files( - "worker_{}_{}".format(i, j), redirect_output) + worker_stdout_file, worker_stderr_file = new_worker_log_file( + i, j, redirect_output) start_worker( node_ip_address, object_store_address.name, @@ -1611,8 +1624,7 @@ def start_ray_processes(address_info=None, # Try to start the web UI. if include_webui: - ui_stdout_file, ui_stderr_file = new_log_files( - "webui", redirect_output=True) + ui_stdout_file, ui_stderr_file = new_webui_log_file() address_info["webui_url"] = start_ui( redis_address, stdout_file=ui_stdout_file, @@ -1637,7 +1649,10 @@ def start_ray_node(node_ip_address, resources=None, plasma_directory=None, huge_pages=False, - use_raylet=False): + use_raylet=False, + plasma_store_socket_name=None, + raylet_socket_name=None, + temp_dir=None): """Start the Ray processes for a single node. This assumes that the Ray processes on some master node have already been @@ -1672,6 +1687,12 @@ def start_ray_node(node_ip_address, Store with hugetlbfs support. Requires plasma_directory. use_raylet: True if the new raylet code path should be used. This is not supported yet. + plasma_store_socket_name (str): If provided, it will specify the socket + name used by the plasma store. + raylet_socket_name (str): If provided, it will specify the socket path + used by the raylet process. + temp_dir (str): If provided, it will specify the root temporary + directory for the Ray process. Returns: A dictionary of the address information for the processes that were @@ -1695,7 +1716,10 @@ def start_ray_node(node_ip_address, resources=resources, plasma_directory=plasma_directory, huge_pages=huge_pages, - use_raylet=use_raylet) + use_raylet=use_raylet, + plasma_store_socket_name=plasma_store_socket_name, + raylet_socket_name=raylet_socket_name, + temp_dir=temp_dir) def start_ray_head(address_info=None, @@ -1718,7 +1742,10 @@ def start_ray_head(address_info=None, plasma_directory=None, huge_pages=False, autoscaling_config=None, - use_raylet=False): + use_raylet=False, + plasma_store_socket_name=None, + raylet_socket_name=None, + temp_dir=None): """Start Ray in local mode. Args: @@ -1770,6 +1797,12 @@ def start_ray_head(address_info=None, autoscaling_config: path to autoscaling config file. use_raylet: True if the new raylet code path should be used. This is not supported yet. + plasma_store_socket_name (str): If provided, it will specify the socket + name used by the plasma store. + raylet_socket_name (str): If provided, it will specify the socket path + used by the raylet process. + temp_dir (str): If provided, it will specify the root temporary + directory for the Ray process. Returns: A dictionary of the address information for the processes that were @@ -1799,58 +1832,7 @@ def start_ray_head(address_info=None, plasma_directory=plasma_directory, huge_pages=huge_pages, autoscaling_config=autoscaling_config, - use_raylet=use_raylet) - - -def try_to_create_directory(directory_path): - """Attempt to create a directory that is globally readable/writable. - - Args: - directory_path: The path of the directory to create. - """ - if not os.path.exists(directory_path): - try: - os.makedirs(directory_path) - except OSError as e: - if e.errno != os.errno.EEXIST: - raise e - logger.warning( - "Attempted to create '{}', but the directory already " - "exists.".format(directory_path)) - # Change the log directory permissions so others can use it. This is - # important when multiple people are using the same machine. - os.chmod(directory_path, 0o0777) - - -def new_log_files(name, redirect_output): - """Generate partially randomized filenames for log files. - - Args: - name (str): descriptive string for this log file. - redirect_output (bool): True if files should be generated for logging - stdout and stderr and false if stdout and stderr should not be - redirected. - - Returns: - If redirect_output is true, this will return a tuple of two - filehandles. The first is for redirecting stdout and the second is - for redirecting stderr. If redirect_output is false, this will - return a tuple of two None objects. - """ - if not redirect_output: - return None, None - - # Create a directory to be used for process log files. - logs_dir = "/tmp/raylogs" - try_to_create_directory(logs_dir) - # Create another directory that will be used by some of the RL algorithms. - try_to_create_directory("/tmp/ray") - - log_id = random.randint(0, 10000) - date_str = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") - log_stdout = "{}/{}-{}-{:05d}.out".format(logs_dir, name, date_str, log_id) - log_stderr = "{}/{}-{}-{:05d}.err".format(logs_dir, name, date_str, log_id) - # Line-buffer the output (mode 1) - log_stdout_file = open(log_stdout, "a", buffering=1) - log_stderr_file = open(log_stderr, "a", buffering=1) - return log_stdout_file, log_stderr_file + use_raylet=use_raylet, + plasma_store_socket_name=plasma_store_socket_name, + raylet_socket_name=raylet_socket_name, + temp_dir=temp_dir) diff --git a/python/ray/tempfile_services.py b/python/ray/tempfile_services.py new file mode 100644 index 000000000..3b5adfa80 --- /dev/null +++ b/python/ray/tempfile_services.py @@ -0,0 +1,292 @@ +import binascii +import collections +import datetime +import errno +import logging +import os +import shutil +import tempfile + +import ray.utils + +logger = logging.getLogger(__name__) +_incremental_dict = collections.defaultdict(lambda: 0) +_temp_root = None + + +def make_inc_temp(suffix="", prefix="", directory_name="/tmp/ray"): + """Return a incremental temporary file name. The file is not created. + + Args: + suffix (str): The suffix of the temp file. + prefix (str): The prefix of the temp file. + directory_name (str) : The base directory of the temp file. + + Returns: + A string of file name. If there existing a file having the same name, + the returned name will look like + "{directory_name}/{prefix}.{unique_index}{suffix}" + """ + index = _incremental_dict[suffix, prefix, directory_name] + # `tempfile.TMP_MAX` could be extremely large, + # so using `range` in Python2.x should be avoided. + while index < tempfile.TMP_MAX: + if index == 0: + filename = os.path.join(directory_name, prefix + suffix) + else: + filename = os.path.join(directory_name, + prefix + "." + str(index) + suffix) + index += 1 + if not os.path.exists(filename): + _incremental_dict[suffix, prefix, + directory_name] = index # Save the index. + return filename + + raise FileExistsError(errno.EEXIST, "No usable temporary filename found") + + +def try_to_create_directory(directory_path): + """Attempt to create a directory that is globally readable/writable. + + Args: + directory_path: The path of the directory to create. + """ + if not os.path.exists(directory_path): + try: + os.makedirs(directory_path) + except OSError as e: + if e.errno != os.errno.EEXIST: + raise e + logger.warning( + "Attempted to create '{}', but the directory already " + "exists.".format(directory_path)) + # Change the log directory permissions so others can use it. This is + # important when multiple people are using the same machine. + os.chmod(directory_path, 0o0777) + + +def get_temp_root(): + """Get the path of the temporary root. If not existing, it will be created. + """ + global _temp_root + + date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S") + + # Lazy creation. Avoid creating directories never used. + if _temp_root is None: + _temp_root = make_inc_temp( + prefix="session_{date_str}_{pid}".format( + pid=os.getpid(), date_str=date_str), + directory_name="/tmp/ray") + try_to_create_directory(_temp_root) + return _temp_root + + +def set_temp_root(path): + """Set the path of the temporary root. It will be created lazily.""" + global _temp_root + _temp_root = path + + +def get_logs_dir_path(): + """Get a temp dir for logging.""" + logs_dir = os.path.join(get_temp_root(), "logs") + try_to_create_directory(logs_dir) + return logs_dir + + +def get_sockets_dir_path(): + """Get a temp dir for sockets.""" + sockets_dir = os.path.join(get_temp_root(), "sockets") + try_to_create_directory(sockets_dir) + return sockets_dir + + +def get_raylet_socket_name(suffix=""): + """Get a socket name for raylet.""" + sockets_dir = get_sockets_dir_path() + + raylet_socket_name = make_inc_temp( + prefix="raylet", directory_name=sockets_dir, suffix=suffix) + return raylet_socket_name + + +def get_object_store_socket_name(): + """Get a socket name for plasma object store.""" + sockets_dir = get_sockets_dir_path() + return make_inc_temp(prefix="plasma_store", directory_name=sockets_dir) + + +def get_plasma_manager_socket_name(): + """Get a socket name for plasma manager.""" + sockets_dir = get_sockets_dir_path() + return make_inc_temp(prefix="plasma_manager", directory_name=sockets_dir) + + +def get_local_scheduler_socket_name(suffix=""): + """Get a socket name for local scheduler. + + This function could be unsafe. The socket name may + refer to a file that did not exist at some point, but by the time + you get around to creating it, someone else may have beaten you to + the punch. + """ + sockets_dir = get_sockets_dir_path() + raylet_socket_name = make_inc_temp( + prefix="scheduler", directory_name=sockets_dir, suffix=suffix) + + return raylet_socket_name + + +def get_ipython_notebook_path(port): + """Get a new ipython notebook path""" + + notebook_filepath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb") + # We copy the notebook file so that the original doesn't get modified by + # the user. + notebook_name = make_inc_temp( + suffix=".ipynb", prefix="ray_ui", directory_name=get_temp_root()) + new_notebook_filepath = os.path.join(get_logs_dir_path(), notebook_name) + shutil.copy(notebook_filepath, new_notebook_filepath) + new_notebook_directory = os.path.dirname(new_notebook_filepath) + token = ray.utils.decode(binascii.hexlify(os.urandom(24))) + webui_url = ("http://localhost:{}/notebooks/{}?token={}".format( + port, os.path.basename(notebook_name), token)) + return new_notebook_directory, webui_url, token + + +def get_temp_redis_config_path(): + """Get a temp name of the redis config file.""" + redis_config_name = make_inc_temp( + prefix="redis_conf", directory_name=get_temp_root()) + return redis_config_name + + +def new_log_files(name, redirect_output): + """Generate partially randomized filenames for log files. + + Args: + name (str): descriptive string for this log file. + redirect_output (bool): True if files should be generated for logging + stdout and stderr and false if stdout and stderr should not be + redirected. + + Returns: + If redirect_output is true, this will return a tuple of two + filehandles. The first is for redirecting stdout and the second is + for redirecting stderr. If redirect_output is false, this will + return a tuple of two None objects. + """ + if not redirect_output: + return None, None + + # Create a directory to be used for process log files. + logs_dir = get_logs_dir_path() + # Create another directory that will be used by some of the RL algorithms. + + # TODO(suquark): This is done by the old code. + # We should be able to control its path later. + try_to_create_directory("/tmp/ray") + + log_stdout = make_inc_temp( + suffix=".out", prefix=name, directory_name=logs_dir) + log_stderr = make_inc_temp( + suffix=".err", prefix=name, directory_name=logs_dir) + # Line-buffer the output (mode 1) + log_stdout_file = open(log_stdout, "a", buffering=1) + log_stderr_file = open(log_stderr, "a", buffering=1) + return log_stdout_file, log_stderr_file + + +def new_redis_log_file(redirect_output, shard_number=None): + """Create new logging files for redis""" + if shard_number is None: + redis_stdout_file, redis_stderr_file = new_log_files( + "redis", redirect_output) + else: + redis_stdout_file, redis_stderr_file = new_log_files( + "redis-shard_{}".format(shard_number), redirect_output) + return redis_stdout_file, redis_stderr_file + + +def new_raylet_log_file(local_scheduler_index, redirect_output): + """Create new logging files for raylet.""" + raylet_stdout_file, raylet_stderr_file = new_log_files( + "raylet_{}".format(local_scheduler_index), + redirect_output=redirect_output) + return raylet_stdout_file, raylet_stderr_file + + +def new_local_scheduler_log_file(local_scheduler_index, redirect_output): + """Create new logging files for local scheduler. + + It is only used in non-raylet versions. + """ + local_scheduler_stdout_file, local_scheduler_stderr_file = (new_log_files( + "local_scheduler_{}".format(local_scheduler_index), + redirect_output=redirect_output)) + return local_scheduler_stdout_file, local_scheduler_stderr_file + + +def new_webui_log_file(): + """Create new logging files for web ui.""" + ui_stdout_file, ui_stderr_file = new_log_files( + "webui", redirect_output=True) + return ui_stdout_file, ui_stderr_file + + +def new_worker_log_file(local_scheduler_index, worker_index, redirect_output): + """Create new logging files for workers with local scheduler index. + + It is only used in non-raylet versions. + """ + worker_stdout_file, worker_stderr_file = new_log_files( + "worker_{}_{}".format(local_scheduler_index, worker_index), + redirect_output) + return worker_stdout_file, worker_stderr_file + + +def new_worker_redirected_log_file(worker_id): + """Create new logging files for workers to redirect its output.""" + worker_stdout_file, worker_stderr_file = (new_log_files( + "worker-" + ray.utils.binary_to_hex(worker_id), True)) + return worker_stdout_file, worker_stderr_file + + +def new_log_monitor_log_file(): + """Create new logging files for the log monitor.""" + log_monitor_stdout_file, log_monitor_stderr_file = new_log_files( + "log_monitor", redirect_output=True) + return log_monitor_stdout_file, log_monitor_stderr_file + + +def new_global_scheduler_log_file(redirect_output): + """Create new logging files for the new global scheduler. + + It is only used in non-raylet versions. + """ + global_scheduler_stdout_file, global_scheduler_stderr_file = ( + new_log_files("global_scheduler", redirect_output)) + return global_scheduler_stdout_file, global_scheduler_stderr_file + + +def new_plasma_store_log_file(local_scheduler_index, redirect_output): + """Create new logging files for the plasma store.""" + plasma_store_stdout_file, plasma_store_stderr_file = new_log_files( + "plasma_store_{}".format(local_scheduler_index), redirect_output) + return plasma_store_stdout_file, plasma_store_stderr_file + + +def new_plasma_manager_log_file(local_scheduler_index, redirect_output): + """Create new logging files for the plasma manager.""" + plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files( + "plasma_manager_{}".format(local_scheduler_index), redirect_output) + return plasma_manager_stdout_file, plasma_manager_stderr_file + + +def new_monitor_log_file(redirect_output): + """Create new logging files for the monitor.""" + monitor_stdout_file, monitor_stderr_file = new_log_files( + "monitor", redirect_output) + return monitor_stdout_file, monitor_stderr_file diff --git a/python/ray/worker.py b/python/ray/worker.py index 2d1d45f65..c0714b1fc 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -27,6 +27,7 @@ import ray.remote_function import ray.serialization as serialization import ray.services as services import ray.signature +import ray.tempfile_services as tempfile_services import ray.local_scheduler import ray.plasma import ray.ray_constants as ray_constants @@ -1528,7 +1529,10 @@ def _init(address_info=None, plasma_directory=None, huge_pages=False, include_webui=True, - use_raylet=None): + use_raylet=None, + plasma_store_socket_name=None, + raylet_socket_name=None, + temp_dir=None): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -1584,6 +1588,12 @@ def _init(address_info=None, include_webui: Boolean flag indicating whether to start the web UI, which is a Jupyter notebook. use_raylet: True if the new raylet code path should be used. + plasma_store_socket_name (str): If provided, it will specify the socket + name used by the plasma store. + raylet_socket_name (str): If provided, it will specify the socket path + used by the raylet process. + temp_dir (str): If provided, it will specify the root temporary + directory for the Ray process. Returns: Address information about the started processes. @@ -1658,7 +1668,10 @@ def _init(address_info=None, plasma_directory=plasma_directory, huge_pages=huge_pages, include_webui=include_webui, - use_raylet=use_raylet) + use_raylet=use_raylet, + plasma_store_socket_name=plasma_store_socket_name, + raylet_socket_name=raylet_socket_name, + temp_dir=temp_dir) else: if redis_address is None: raise Exception("When connecting to an existing cluster, " @@ -1690,6 +1703,15 @@ def _init(address_info=None, if huge_pages: raise Exception("When connecting to an existing cluster, " "huge_pages must not be provided.") + if temp_dir is not None: + raise Exception("When connecting to an existing cluster, " + "temp_dir must not be provided.") + if plasma_store_socket_name is not None: + raise Exception("When connecting to an existing cluster, " + "plasma_store_socket_name must not be provided.") + if raylet_socket_name is not None: + raise Exception("When connecting to an existing cluster, " + "raylet_socket_name must not be provided.") # Get the node IP address if one is not provided. if node_ip_address is None: node_ip_address = services.get_node_ip_address(redis_address) @@ -1719,6 +1741,9 @@ def _init(address_info=None, else: driver_address_info["raylet_socket_name"] = ( address_info["raylet_socket_names"][0]) + + # We only pass `temp_dir` to a worker (WORKER_MODE). + # It can't be a worker here. connect( driver_address_info, object_id_seed=object_id_seed, @@ -1750,7 +1775,10 @@ def init(redis_address=None, use_raylet=None, configure_logging=True, logging_level=logging.INFO, - logging_format=ray_constants.LOGGER_FORMAT): + logging_format=ray_constants.LOGGER_FORMAT, + plasma_store_socket_name=None, + raylet_socket_name=None, + temp_dir=None): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -1815,6 +1843,12 @@ def init(redis_address=None, logging_level: Logging level, default will be loging.INFO. logging_format: Logging format, default will be "%(message)s" which means only contains the message. + plasma_store_socket_name (str): If provided, it will specify the socket + name used by the plasma store. + raylet_socket_name (str): If provided, it will specify the socket path + used by the raylet process. + temp_dir (str): If provided, it will specify the root temporary + directory for the Ray process. Returns: Address information about the started processes. @@ -1863,7 +1897,10 @@ def init(redis_address=None, huge_pages=huge_pages, include_webui=include_webui, object_store_memory=object_store_memory, - use_raylet=use_raylet) + use_raylet=use_raylet, + plasma_store_socket_name=plasma_store_socket_name, + raylet_socket_name=raylet_socket_name, + temp_dir=temp_dir) for hook in _post_init_hooks: hook() return ret @@ -2135,8 +2172,9 @@ def connect(info, else: redirect_worker_output = 0 if redirect_worker_output: - log_stdout_file, log_stderr_file = services.new_log_files( - "worker", True) + log_stdout_file, log_stderr_file = ( + tempfile_services.new_worker_redirected_log_file( + worker.worker_id)) sys.stdout = log_stdout_file sys.stderr = log_stderr_file services.record_log_files_in_redis( diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index cd7b3f4a4..72679722f 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -9,6 +9,7 @@ import traceback import ray import ray.actor import ray.ray_constants as ray_constants +import ray.tempfile_services as tempfile_services parser = argparse.ArgumentParser( description=("Parse addresses for the worker " @@ -53,6 +54,12 @@ parser.add_argument( type=str, default=ray_constants.LOGGER_FORMAT, help=ray_constants.LOGGER_FORMAT_HELP) +parser.add_argument( + "--temp-dir", + required=False, + type=str, + default=None, + help="Specify the path of the temporary directory use by Ray process.") if __name__ == "__main__": args = parser.parse_args() @@ -70,6 +77,9 @@ if __name__ == "__main__": level=logging.getLevelName(args.logging_level.upper()), format=args.logging_format) + # Override the temporary directory. + tempfile_services.set_temp_root(args.temp_dir) + ray.worker.connect( info, mode=ray.WORKER_MODE, use_raylet=(args.raylet_name is not None)) diff --git a/test/stress_tests.py b/test/stress_tests.py index 6cea02d82..6fc7cc487 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -8,6 +8,7 @@ import pytest import time import ray +import ray.tempfile_services import ray.ray_constants as ray_constants @@ -173,10 +174,10 @@ def ray_start_reconstruction(request): plasma_addresses = [] objstore_memory = plasma_store_memory // num_local_schedulers for i in range(num_local_schedulers): - store_stdout_file, store_stderr_file = ray.services.new_log_files( - "plasma_store_{}".format(i), True) - manager_stdout_file, manager_stderr_file = (ray.services.new_log_files( - "plasma_manager_{}".format(i), True)) + store_stdout_file, store_stderr_file = ( + ray.tempfile_services.new_plasma_store_log_file(i, True)) + manager_stdout_file, manager_stderr_file = ( + ray.tempfile_services.new_plasma_manager_log_file(i, True)) plasma_addresses.append( ray.services.start_plasma_store( node_ip_address, diff --git a/test/tempfile_test.py b/test/tempfile_test.py new file mode 100644 index 000000000..a174b621b --- /dev/null +++ b/test/tempfile_test.py @@ -0,0 +1,119 @@ +import os +import shutil +import time +import pytest +import ray +import ray.tempfile_services as tempfile_services + + +def test_conn_cluster(): + # plasma_store_socket_name + with pytest.raises(Exception) as exc_info: + ray.init( + use_raylet=True, + redis_address="127.0.0.1:6379", + plasma_store_socket_name="/tmp/this_should_fail") + assert exc_info.value.args[0] == ( + "When connecting to an existing cluster, " + "plasma_store_socket_name must not be provided.") + + # raylet_socket_name + with pytest.raises(Exception) as exc_info: + ray.init( + use_raylet=True, + redis_address="127.0.0.1:6379", + raylet_socket_name="/tmp/this_should_fail") + assert exc_info.value.args[0] == ( + "When connecting to an existing cluster, " + "raylet_socket_name must not be provided.") + + # temp_dir + with pytest.raises(Exception) as exc_info: + ray.init( + use_raylet=True, + redis_address="127.0.0.1:6379", + temp_dir="/tmp/this_should_fail") + assert exc_info.value.args[0] == ( + "When connecting to an existing cluster, " + "temp_dir must not be provided.") + + +def test_tempdir(): + ray.init(use_raylet=True, temp_dir="/tmp/i_am_a_temp_dir") + assert os.path.exists( + "/tmp/i_am_a_temp_dir"), "Specified temp dir not found." + ray.shutdown() + shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True) + + +def test_raylet_socket_name(): + ray.init(use_raylet=True, raylet_socket_name="/tmp/i_am_a_temp_socket") + assert os.path.exists( + "/tmp/i_am_a_temp_socket"), "Specified socket path not found." + ray.shutdown() + try: + os.remove("/tmp/i_am_a_temp_socket") + except Exception: + pass + + +def test_temp_plasma_store_socket(): + ray.init( + use_raylet=True, plasma_store_socket_name="/tmp/i_am_a_temp_socket") + assert os.path.exists( + "/tmp/i_am_a_temp_socket"), "Specified socket path not found." + ray.shutdown() + try: + os.remove("/tmp/i_am_a_temp_socket") + except Exception: + pass + + +def test_raylet_tempfiles(): + ray.init(use_raylet=True, redirect_worker_output=False) + top_levels = set(os.listdir(tempfile_services.get_temp_root())) + assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} + log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) + assert log_files == { + "log_monitor.out", "log_monitor.err", "plasma_store_0.out", + "plasma_store_0.err", "webui.out", "webui.err", "monitor.out", + "monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", + "redis.err" + } # without raylet logs + socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) + assert socket_files == {"plasma_store", "raylet"} + ray.shutdown() + + ray.init(use_raylet=True, redirect_worker_output=True, num_workers=0) + top_levels = set(os.listdir(tempfile_services.get_temp_root())) + assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} + log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) + assert log_files == { + "log_monitor.out", "log_monitor.err", "plasma_store_0.out", + "plasma_store_0.err", "webui.out", "webui.err", "monitor.out", + "monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", + "redis.err", "raylet_0.out", "raylet_0.err" + } # with raylet logs + socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) + assert socket_files == {"plasma_store", "raylet"} + ray.shutdown() + + ray.init(use_raylet=True, redirect_worker_output=True, num_workers=2) + top_levels = set(os.listdir(tempfile_services.get_temp_root())) + assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} + time.sleep(3) # wait workers to start + log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) + assert log_files.issuperset({ + "log_monitor.out", "log_monitor.err", "plasma_store_0.out", + "plasma_store_0.err", "webui.out", "webui.err", "monitor.out", + "monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", + "redis.err", "raylet_0.out", "raylet_0.err" + }) # with raylet logs + + # Check numbers of worker log file. + assert sum( + 1 for filename in log_files if filename.startswith("worker")) == 4 + + socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) + assert socket_files == {"plasma_store", "raylet"} + ray.shutdown()