Change logfile names and also allow plasma store socket to be passed in. (#2862)

This commit is contained in:
Si-Yuan
2018-10-03 10:03:53 -07:00
committed by Robert Nishihara
parent 9c606ea06c
commit cc7e2ecdd5
13 changed files with 696 additions and 140 deletions
@@ -4,14 +4,12 @@ from __future__ import print_function
import multiprocessing
import os
import random
import subprocess
import sys
import time
def random_name():
return str(random.randint(0, 99999999))
from ray.tempfile_services import (get_local_scheduler_socket_name,
get_temp_root)
def start_local_scheduler(plasma_store_name,
@@ -71,7 +69,7 @@ def start_local_scheduler(plasma_store_name,
local_scheduler_executable = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"../core/src/local_scheduler/local_scheduler")
local_scheduler_name = "/tmp/scheduler{}".format(random_name())
local_scheduler_name = get_local_scheduler_socket_name()
command = [
local_scheduler_executable, "-s", local_scheduler_name, "-p",
plasma_store_name, "-h", node_ip_address, "-n",
@@ -88,11 +86,12 @@ def start_local_scheduler(plasma_store_name,
"--object-store-name={} "
"--object-store-manager-name={} "
"--local-scheduler-name={} "
"--redis-address={}".format(
"--redis-address={} "
"--temp-dir={}".format(
sys.executable, worker_path,
node_ip_address, plasma_store_name,
plasma_manager_name, local_scheduler_name,
redis_address))
redis_address, get_temp_root()))
command += ["-w", start_worker_command]
if redis_address is not None:
command += ["-r", redis_address]
+9 -7
View File
@@ -8,6 +8,9 @@ import subprocess
import sys
import time
from ray.tempfile_services import (get_object_store_socket_name,
get_plasma_manager_socket_name)
__all__ = [
"start_plasma_store", "start_plasma_manager", "DEFAULT_PLASMA_STORE_MEMORY"
]
@@ -17,17 +20,14 @@ PLASMA_WAIT_TIMEOUT = 2**30
DEFAULT_PLASMA_STORE_MEMORY = 10**9
def random_name():
return str(random.randint(0, 99999999))
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=False,
use_profiler=False,
stdout_file=None,
stderr_file=None,
plasma_directory=None,
huge_pages=False):
huge_pages=False,
socket_name=None):
"""Start a plasma store process.
Args:
@@ -43,6 +43,8 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
be created.
huge_pages: a boolean flag indicating whether to start the
Object Store with hugetlbfs support. Requires plasma_directory.
socket_name (str): If provided, it will specify the socket
name used by the plasma store.
Return:
A tuple of the name of the plasma store socket and the process ID of
@@ -66,7 +68,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
plasma_store_executable = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"../core/src/plasma/plasma_store_server")
plasma_store_name = "/tmp/plasma_store{}".format(random_name())
plasma_store_name = socket_name or get_object_store_socket_name()
command = [
plasma_store_executable, "-s", plasma_store_name, "-m",
str(plasma_store_memory)
@@ -136,7 +138,7 @@ def start_plasma_manager(store_name,
plasma_manager_executable = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"../core/src/plasma/plasma_manager")
plasma_manager_name = "/tmp/plasma_manager{}".format(random_name())
plasma_manager_name = get_plasma_manager_socket_name()
if plasma_manager_port is not None:
if num_retries != 1:
raise Exception("num_retries must be 1 if port is specified.")
+2 -2
View File
@@ -5,7 +5,7 @@ from __future__ import print_function
import os
import ray
from ray.local_scheduler import ObjectID
def env_integer(key, default):
@@ -15,7 +15,7 @@ def env_integer(key, default):
ID_SIZE = 20
NIL_JOB_ID = ray.ObjectID(ID_SIZE * b"\xff")
NIL_JOB_ID = ObjectID(ID_SIZE * b"\xff")
# If a remote function or actor (or some other export) has serialized size
# greater than this quantity, print an warning.
+22 -3
View File
@@ -177,11 +177,24 @@ def cli(logging_level, logging_format):
is_flag=True,
default=False,
help="do not redirect non-worker stdout and stderr to files")
@click.option(
"--plasma-store-socket-name",
default=None,
help="manually specify the socket name of the plasma store")
@click.option(
"--raylet-socket-name",
default=None,
help="manually specify the socket path of the raylet process")
@click.option(
"--temp-dir",
default=None,
help="manually specify the root temporary dir of the Ray process")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_shard_ports, object_manager_port,
object_store_memory, num_workers, num_cpus, num_gpus, resources,
head, no_ui, block, plasma_directory, huge_pages, autoscaling_config,
use_raylet, no_redirect_worker_output, no_redirect_output):
use_raylet, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir):
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
@@ -260,7 +273,10 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
autoscaling_config=autoscaling_config,
use_raylet=use_raylet)
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
logger.info(address_info)
logger.info(
"\nStarted Ray on this node. You can add additional nodes to "
@@ -329,7 +345,10 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
resources=resources,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
use_raylet=use_raylet)
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
logger.info(address_info)
logger.info("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
+93 -111
View File
@@ -2,14 +2,12 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import binascii
import json
import logging
import multiprocessing
import os
import random
import resource
import shutil
import signal
import socket
import subprocess
@@ -17,8 +15,6 @@ import sys
import threading
import time
from collections import OrderedDict, namedtuple
from datetime import datetime
import redis
import pyarrow
@@ -28,6 +24,14 @@ import ray.global_scheduler as global_scheduler
import ray.local_scheduler
import ray.plasma
from ray.tempfile_services import (
get_ipython_notebook_path, get_logs_dir_path, get_raylet_socket_name,
get_temp_redis_config_path, get_temp_root, new_global_scheduler_log_file,
new_local_scheduler_log_file, new_log_monitor_log_file,
new_monitor_log_file, new_plasma_manager_log_file,
new_plasma_store_log_file, new_raylet_log_file, new_redis_log_file,
new_webui_log_file, new_worker_log_file, set_temp_root)
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
PROCESS_TYPE_WORKER = "worker"
@@ -120,10 +124,6 @@ def new_port():
return random.randint(10000, 65535)
def random_name():
return str(random.randint(0, 99999999))
def kill_process(p):
"""Kill a process.
@@ -456,8 +456,7 @@ def start_redis(node_ip_address,
A tuple of the address for the primary Redis shard and a list of
addresses for the remaining shards.
"""
redis_stdout_file, redis_stderr_file = new_log_files(
"redis", redirect_output)
redis_stdout_file, redis_stderr_file = new_redis_log_file(redirect_output)
if redis_shard_ports is None:
redis_shard_ports = num_redis_shards * [None]
@@ -517,8 +516,8 @@ def start_redis(node_ip_address,
# prefixed by "redis-<shard number>".
redis_shards = []
for i in range(num_redis_shards):
redis_stdout_file, redis_stderr_file = new_log_files(
"redis-{}".format(i), redirect_output)
redis_stdout_file, redis_stderr_file = new_redis_log_file(
redirect_output, shard_number=i)
if not use_credis:
redis_shard_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
@@ -572,7 +571,7 @@ def _make_temp_redis_config(node_ip_address):
node_ip_address: The IP address of this node. This should not be
127.0.0.1.
"""
redis_config_name = "/tmp/redis_conf{}".format(random_name())
redis_config_name = get_temp_redis_config_path()
with open(redis_config_name, 'w') as f:
# This allows redis clients on the same machine to connect using the
# node's IP address as opposed to just 127.0.0.1. This is only relevant
@@ -799,15 +798,7 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True):
then this process will be killed by services.cleanup() when the
Python process that imported services exits.
"""
new_env = os.environ.copy()
notebook_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb")
# We copy the notebook file so that the original doesn't get modified by
# the user.
random_ui_id = random.randint(0, 100000)
new_notebook_filepath = "/tmp/raylogs/ray_ui{}.ipynb".format(random_ui_id)
new_notebook_directory = os.path.dirname(new_notebook_filepath)
shutil.copy(notebook_filepath, new_notebook_filepath)
port = 8888
while True:
try:
@@ -821,7 +812,8 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True):
new_env["REDIS_ADDRESS"] = redis_address
# We generate the token used for authentication ourselves to avoid
# querying the jupyter server.
token = ray.utils.decode(binascii.hexlify(os.urandom(24)))
new_notebook_directory, webui_url, token = (
get_ipython_notebook_path(port))
# The --ip=0.0.0.0 flag is intended to enable connecting to a notebook
# running within a docker container (from the outside).
command = [
@@ -847,8 +839,6 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True):
else:
if cleanup:
all_processes[PROCESS_TYPE_WEB_UI].append(ui_process)
webui_url = ("http://localhost:{}/notebooks/ray_ui{}.ipynb?token={}"
.format(port, random_ui_id, token))
logger.info("\n" + "=" * 70)
logger.info("View the web UI at {}".format(webui_url))
logger.info("=" * 70 + "\n")
@@ -971,6 +961,7 @@ def start_local_scheduler(redis_address,
def start_raylet(redis_address,
node_ip_address,
raylet_name,
plasma_store_name,
worker_path,
resources=None,
@@ -988,6 +979,7 @@ def start_raylet(redis_address,
scheduler is running on.
plasma_store_name (str): The name of the plasma store socket to connect
to.
raylet_name (str): The name of the raylet socket to create.
worker_path (str): The path of the script to use when the local
scheduler starts up new workers.
use_valgrind (bool): True if the raylet should be started inside
@@ -1023,16 +1015,17 @@ def start_raylet(redis_address,
])
gcs_ip_address, gcs_port = redis_address.split(":")
raylet_name = "/tmp/raylet{}".format(random_name())
# Create the command that the Raylet will use to start workers.
start_worker_command = ("{} {} "
"--node-ip-address={} "
"--object-store-name={} "
"--raylet-name={} "
"--redis-address={}".format(
"--redis-address={} "
"--temp-dir={}".format(
sys.executable, worker_path, node_ip_address,
plasma_store_name, raylet_name, redis_address))
plasma_store_name, raylet_name, redis_address,
get_temp_root()))
command = [
RAYLET_EXECUTABLE,
@@ -1084,7 +1077,8 @@ def start_plasma_store(node_ip_address,
cleanup=True,
plasma_directory=None,
huge_pages=False,
use_raylet=False):
use_raylet=False,
plasma_store_socket_name=None):
"""This method starts an object store process.
Args:
@@ -1158,7 +1152,8 @@ def start_plasma_store(node_ip_address,
stdout_file=store_stdout_file,
stderr_file=store_stderr_file,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
huge_pages=huge_pages,
socket_name=plasma_store_socket_name)
# Start the plasma manager.
if not use_raylet:
if object_manager_port is not None:
@@ -1235,7 +1230,8 @@ def start_worker(node_ip_address,
"--object-store-name=" + object_store_name,
"--object-store-manager-name=" + object_store_manager_name,
"--local-scheduler-name=" + local_scheduler_name,
"--redis-address=" + str(redis_address)
"--redis-address=" + str(redis_address),
"--temp-dir=" + get_temp_root()
]
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
@@ -1327,7 +1323,10 @@ def start_ray_processes(address_info=None,
plasma_directory=None,
huge_pages=False,
autoscaling_config=None,
use_raylet=False):
use_raylet=False,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
"""Helper method to start Ray processes.
Args:
@@ -1385,13 +1384,22 @@ def start_ray_processes(address_info=None,
autoscaling_config: path to autoscaling config file.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
Returns:
A dictionary of the address information for the processes that were
started.
"""
logger.info(
"Process STDOUT and STDERR is being redirected to /tmp/raylogs/.")
set_temp_root(temp_dir)
logger.info("Process STDOUT and STDERR is being redirected to {}.".format(
get_logs_dir_path()))
if resources is None:
resources = {}
@@ -1438,8 +1446,8 @@ def start_ray_processes(address_info=None,
time.sleep(0.1)
# Start monitoring the processes.
monitor_stdout_file, monitor_stderr_file = new_log_files(
"monitor", redirect_output)
monitor_stdout_file, monitor_stderr_file = new_monitor_log_file(
redirect_output)
start_monitor(
redis_address,
node_ip_address,
@@ -1464,8 +1472,8 @@ def start_ray_processes(address_info=None,
# Start the log monitor, if necessary.
if include_log_monitor:
log_monitor_stdout_file, log_monitor_stderr_file = new_log_files(
"log_monitor", redirect_output=True)
log_monitor_stdout_file, log_monitor_stderr_file = (
new_log_monitor_log_file())
start_log_monitor(
redis_address,
node_ip_address,
@@ -1476,7 +1484,7 @@ def start_ray_processes(address_info=None,
# Start the global scheduler, if necessary.
if include_global_scheduler and not use_raylet:
global_scheduler_stdout_file, global_scheduler_stderr_file = (
new_log_files("global_scheduler", redirect_output))
new_global_scheduler_log_file(redirect_output))
start_global_scheduler(
redis_address,
node_ip_address,
@@ -1505,10 +1513,14 @@ def start_ray_processes(address_info=None,
# Start any object stores that do not yet exist.
for i in range(num_local_schedulers - len(object_store_addresses)):
# Start Plasma.
plasma_store_stdout_file, plasma_store_stderr_file = new_log_files(
"plasma_store_{}".format(i), redirect_output)
plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files(
"plasma_manager_{}".format(i), redirect_output)
plasma_store_stdout_file, plasma_store_stderr_file = (
new_plasma_store_log_file(i, redirect_output))
# If we use raylet, plasma manager won't be started and we don't need
# to create temp files for them.
plasma_manager_stdout_file, plasma_manager_stderr_file = (
new_plasma_manager_log_file(i, redirect_output and not use_raylet))
object_store_address = start_plasma_store(
node_ip_address,
redis_address,
@@ -1521,7 +1533,8 @@ def start_ray_processes(address_info=None,
cleanup=cleanup,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
use_raylet=use_raylet)
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name)
object_store_addresses.append(object_store_address)
time.sleep(0.1)
@@ -1546,9 +1559,8 @@ def start_ray_processes(address_info=None,
# redirect the worker output, then we cannot redirect the local
# scheduler output.
local_scheduler_stdout_file, local_scheduler_stderr_file = (
new_log_files(
"local_scheduler_{}".format(i),
redirect_output=redirect_worker_output))
new_local_scheduler_log_file(
i, redirect_output=redirect_worker_output))
local_scheduler_name = start_local_scheduler(
redis_address,
node_ip_address,
@@ -1571,12 +1583,13 @@ def start_ray_processes(address_info=None,
else:
# Start any raylets that do not exist yet.
for i in range(len(raylet_socket_names), num_local_schedulers):
raylet_stdout_file, raylet_stderr_file = new_log_files(
"raylet_{}".format(i), redirect_output=redirect_worker_output)
raylet_stdout_file, raylet_stderr_file = new_raylet_log_file(
i, redirect_output=redirect_worker_output)
address_info["raylet_socket_names"].append(
start_raylet(
redis_address,
node_ip_address,
raylet_socket_name or get_raylet_socket_name(),
object_store_addresses[i].name,
worker_path,
resources=resources[i],
@@ -1592,8 +1605,8 @@ def start_ray_processes(address_info=None,
object_store_address = object_store_addresses[i]
local_scheduler_name = local_scheduler_socket_names[i]
for j in range(num_local_scheduler_workers):
worker_stdout_file, worker_stderr_file = new_log_files(
"worker_{}_{}".format(i, j), redirect_output)
worker_stdout_file, worker_stderr_file = new_worker_log_file(
i, j, redirect_output)
start_worker(
node_ip_address,
object_store_address.name,
@@ -1611,8 +1624,7 @@ def start_ray_processes(address_info=None,
# Try to start the web UI.
if include_webui:
ui_stdout_file, ui_stderr_file = new_log_files(
"webui", redirect_output=True)
ui_stdout_file, ui_stderr_file = new_webui_log_file()
address_info["webui_url"] = start_ui(
redis_address,
stdout_file=ui_stdout_file,
@@ -1637,7 +1649,10 @@ def start_ray_node(node_ip_address,
resources=None,
plasma_directory=None,
huge_pages=False,
use_raylet=False):
use_raylet=False,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
"""Start the Ray processes for a single node.
This assumes that the Ray processes on some master node have already been
@@ -1672,6 +1687,12 @@ def start_ray_node(node_ip_address,
Store with hugetlbfs support. Requires plasma_directory.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
Returns:
A dictionary of the address information for the processes that were
@@ -1695,7 +1716,10 @@ def start_ray_node(node_ip_address,
resources=resources,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
use_raylet=use_raylet)
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
def start_ray_head(address_info=None,
@@ -1718,7 +1742,10 @@ def start_ray_head(address_info=None,
plasma_directory=None,
huge_pages=False,
autoscaling_config=None,
use_raylet=False):
use_raylet=False,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
"""Start Ray in local mode.
Args:
@@ -1770,6 +1797,12 @@ def start_ray_head(address_info=None,
autoscaling_config: path to autoscaling config file.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
Returns:
A dictionary of the address information for the processes that were
@@ -1799,58 +1832,7 @@ def start_ray_head(address_info=None,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
autoscaling_config=autoscaling_config,
use_raylet=use_raylet)
def try_to_create_directory(directory_path):
"""Attempt to create a directory that is globally readable/writable.
Args:
directory_path: The path of the directory to create.
"""
if not os.path.exists(directory_path):
try:
os.makedirs(directory_path)
except OSError as e:
if e.errno != os.errno.EEXIST:
raise e
logger.warning(
"Attempted to create '{}', but the directory already "
"exists.".format(directory_path))
# Change the log directory permissions so others can use it. This is
# important when multiple people are using the same machine.
os.chmod(directory_path, 0o0777)
def new_log_files(name, redirect_output):
"""Generate partially randomized filenames for log files.
Args:
name (str): descriptive string for this log file.
redirect_output (bool): True if files should be generated for logging
stdout and stderr and false if stdout and stderr should not be
redirected.
Returns:
If redirect_output is true, this will return a tuple of two
filehandles. The first is for redirecting stdout and the second is
for redirecting stderr. If redirect_output is false, this will
return a tuple of two None objects.
"""
if not redirect_output:
return None, None
# Create a directory to be used for process log files.
logs_dir = "/tmp/raylogs"
try_to_create_directory(logs_dir)
# Create another directory that will be used by some of the RL algorithms.
try_to_create_directory("/tmp/ray")
log_id = random.randint(0, 10000)
date_str = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
log_stdout = "{}/{}-{}-{:05d}.out".format(logs_dir, name, date_str, log_id)
log_stderr = "{}/{}-{}-{:05d}.err".format(logs_dir, name, date_str, log_id)
# Line-buffer the output (mode 1)
log_stdout_file = open(log_stdout, "a", buffering=1)
log_stderr_file = open(log_stderr, "a", buffering=1)
return log_stdout_file, log_stderr_file
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
+292
View File
@@ -0,0 +1,292 @@
import binascii
import collections
import datetime
import errno
import logging
import os
import shutil
import tempfile
import ray.utils
logger = logging.getLogger(__name__)
_incremental_dict = collections.defaultdict(lambda: 0)
_temp_root = None
def make_inc_temp(suffix="", prefix="", directory_name="/tmp/ray"):
"""Return a incremental temporary file name. The file is not created.
Args:
suffix (str): The suffix of the temp file.
prefix (str): The prefix of the temp file.
directory_name (str) : The base directory of the temp file.
Returns:
A string of file name. If there existing a file having the same name,
the returned name will look like
"{directory_name}/{prefix}.{unique_index}{suffix}"
"""
index = _incremental_dict[suffix, prefix, directory_name]
# `tempfile.TMP_MAX` could be extremely large,
# so using `range` in Python2.x should be avoided.
while index < tempfile.TMP_MAX:
if index == 0:
filename = os.path.join(directory_name, prefix + suffix)
else:
filename = os.path.join(directory_name,
prefix + "." + str(index) + suffix)
index += 1
if not os.path.exists(filename):
_incremental_dict[suffix, prefix,
directory_name] = index # Save the index.
return filename
raise FileExistsError(errno.EEXIST, "No usable temporary filename found")
def try_to_create_directory(directory_path):
"""Attempt to create a directory that is globally readable/writable.
Args:
directory_path: The path of the directory to create.
"""
if not os.path.exists(directory_path):
try:
os.makedirs(directory_path)
except OSError as e:
if e.errno != os.errno.EEXIST:
raise e
logger.warning(
"Attempted to create '{}', but the directory already "
"exists.".format(directory_path))
# Change the log directory permissions so others can use it. This is
# important when multiple people are using the same machine.
os.chmod(directory_path, 0o0777)
def get_temp_root():
"""Get the path of the temporary root. If not existing, it will be created.
"""
global _temp_root
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
# Lazy creation. Avoid creating directories never used.
if _temp_root is None:
_temp_root = make_inc_temp(
prefix="session_{date_str}_{pid}".format(
pid=os.getpid(), date_str=date_str),
directory_name="/tmp/ray")
try_to_create_directory(_temp_root)
return _temp_root
def set_temp_root(path):
"""Set the path of the temporary root. It will be created lazily."""
global _temp_root
_temp_root = path
def get_logs_dir_path():
"""Get a temp dir for logging."""
logs_dir = os.path.join(get_temp_root(), "logs")
try_to_create_directory(logs_dir)
return logs_dir
def get_sockets_dir_path():
"""Get a temp dir for sockets."""
sockets_dir = os.path.join(get_temp_root(), "sockets")
try_to_create_directory(sockets_dir)
return sockets_dir
def get_raylet_socket_name(suffix=""):
"""Get a socket name for raylet."""
sockets_dir = get_sockets_dir_path()
raylet_socket_name = make_inc_temp(
prefix="raylet", directory_name=sockets_dir, suffix=suffix)
return raylet_socket_name
def get_object_store_socket_name():
"""Get a socket name for plasma object store."""
sockets_dir = get_sockets_dir_path()
return make_inc_temp(prefix="plasma_store", directory_name=sockets_dir)
def get_plasma_manager_socket_name():
"""Get a socket name for plasma manager."""
sockets_dir = get_sockets_dir_path()
return make_inc_temp(prefix="plasma_manager", directory_name=sockets_dir)
def get_local_scheduler_socket_name(suffix=""):
"""Get a socket name for local scheduler.
This function could be unsafe. The socket name may
refer to a file that did not exist at some point, but by the time
you get around to creating it, someone else may have beaten you to
the punch.
"""
sockets_dir = get_sockets_dir_path()
raylet_socket_name = make_inc_temp(
prefix="scheduler", directory_name=sockets_dir, suffix=suffix)
return raylet_socket_name
def get_ipython_notebook_path(port):
"""Get a new ipython notebook path"""
notebook_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb")
# We copy the notebook file so that the original doesn't get modified by
# the user.
notebook_name = make_inc_temp(
suffix=".ipynb", prefix="ray_ui", directory_name=get_temp_root())
new_notebook_filepath = os.path.join(get_logs_dir_path(), notebook_name)
shutil.copy(notebook_filepath, new_notebook_filepath)
new_notebook_directory = os.path.dirname(new_notebook_filepath)
token = ray.utils.decode(binascii.hexlify(os.urandom(24)))
webui_url = ("http://localhost:{}/notebooks/{}?token={}".format(
port, os.path.basename(notebook_name), token))
return new_notebook_directory, webui_url, token
def get_temp_redis_config_path():
"""Get a temp name of the redis config file."""
redis_config_name = make_inc_temp(
prefix="redis_conf", directory_name=get_temp_root())
return redis_config_name
def new_log_files(name, redirect_output):
"""Generate partially randomized filenames for log files.
Args:
name (str): descriptive string for this log file.
redirect_output (bool): True if files should be generated for logging
stdout and stderr and false if stdout and stderr should not be
redirected.
Returns:
If redirect_output is true, this will return a tuple of two
filehandles. The first is for redirecting stdout and the second is
for redirecting stderr. If redirect_output is false, this will
return a tuple of two None objects.
"""
if not redirect_output:
return None, None
# Create a directory to be used for process log files.
logs_dir = get_logs_dir_path()
# Create another directory that will be used by some of the RL algorithms.
# TODO(suquark): This is done by the old code.
# We should be able to control its path later.
try_to_create_directory("/tmp/ray")
log_stdout = make_inc_temp(
suffix=".out", prefix=name, directory_name=logs_dir)
log_stderr = make_inc_temp(
suffix=".err", prefix=name, directory_name=logs_dir)
# Line-buffer the output (mode 1)
log_stdout_file = open(log_stdout, "a", buffering=1)
log_stderr_file = open(log_stderr, "a", buffering=1)
return log_stdout_file, log_stderr_file
def new_redis_log_file(redirect_output, shard_number=None):
"""Create new logging files for redis"""
if shard_number is None:
redis_stdout_file, redis_stderr_file = new_log_files(
"redis", redirect_output)
else:
redis_stdout_file, redis_stderr_file = new_log_files(
"redis-shard_{}".format(shard_number), redirect_output)
return redis_stdout_file, redis_stderr_file
def new_raylet_log_file(local_scheduler_index, redirect_output):
"""Create new logging files for raylet."""
raylet_stdout_file, raylet_stderr_file = new_log_files(
"raylet_{}".format(local_scheduler_index),
redirect_output=redirect_output)
return raylet_stdout_file, raylet_stderr_file
def new_local_scheduler_log_file(local_scheduler_index, redirect_output):
"""Create new logging files for local scheduler.
It is only used in non-raylet versions.
"""
local_scheduler_stdout_file, local_scheduler_stderr_file = (new_log_files(
"local_scheduler_{}".format(local_scheduler_index),
redirect_output=redirect_output))
return local_scheduler_stdout_file, local_scheduler_stderr_file
def new_webui_log_file():
"""Create new logging files for web ui."""
ui_stdout_file, ui_stderr_file = new_log_files(
"webui", redirect_output=True)
return ui_stdout_file, ui_stderr_file
def new_worker_log_file(local_scheduler_index, worker_index, redirect_output):
"""Create new logging files for workers with local scheduler index.
It is only used in non-raylet versions.
"""
worker_stdout_file, worker_stderr_file = new_log_files(
"worker_{}_{}".format(local_scheduler_index, worker_index),
redirect_output)
return worker_stdout_file, worker_stderr_file
def new_worker_redirected_log_file(worker_id):
"""Create new logging files for workers to redirect its output."""
worker_stdout_file, worker_stderr_file = (new_log_files(
"worker-" + ray.utils.binary_to_hex(worker_id), True))
return worker_stdout_file, worker_stderr_file
def new_log_monitor_log_file():
"""Create new logging files for the log monitor."""
log_monitor_stdout_file, log_monitor_stderr_file = new_log_files(
"log_monitor", redirect_output=True)
return log_monitor_stdout_file, log_monitor_stderr_file
def new_global_scheduler_log_file(redirect_output):
"""Create new logging files for the new global scheduler.
It is only used in non-raylet versions.
"""
global_scheduler_stdout_file, global_scheduler_stderr_file = (
new_log_files("global_scheduler", redirect_output))
return global_scheduler_stdout_file, global_scheduler_stderr_file
def new_plasma_store_log_file(local_scheduler_index, redirect_output):
"""Create new logging files for the plasma store."""
plasma_store_stdout_file, plasma_store_stderr_file = new_log_files(
"plasma_store_{}".format(local_scheduler_index), redirect_output)
return plasma_store_stdout_file, plasma_store_stderr_file
def new_plasma_manager_log_file(local_scheduler_index, redirect_output):
"""Create new logging files for the plasma manager."""
plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files(
"plasma_manager_{}".format(local_scheduler_index), redirect_output)
return plasma_manager_stdout_file, plasma_manager_stderr_file
def new_monitor_log_file(redirect_output):
"""Create new logging files for the monitor."""
monitor_stdout_file, monitor_stderr_file = new_log_files(
"monitor", redirect_output)
return monitor_stdout_file, monitor_stderr_file
+44 -6
View File
@@ -27,6 +27,7 @@ import ray.remote_function
import ray.serialization as serialization
import ray.services as services
import ray.signature
import ray.tempfile_services as tempfile_services
import ray.local_scheduler
import ray.plasma
import ray.ray_constants as ray_constants
@@ -1528,7 +1529,10 @@ def _init(address_info=None,
plasma_directory=None,
huge_pages=False,
include_webui=True,
use_raylet=None):
use_raylet=None,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
"""Helper method to connect to an existing Ray cluster or start a new one.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1584,6 +1588,12 @@ def _init(address_info=None,
include_webui: Boolean flag indicating whether to start the web
UI, which is a Jupyter notebook.
use_raylet: True if the new raylet code path should be used.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
Returns:
Address information about the started processes.
@@ -1658,7 +1668,10 @@ def _init(address_info=None,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
include_webui=include_webui,
use_raylet=use_raylet)
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
else:
if redis_address is None:
raise Exception("When connecting to an existing cluster, "
@@ -1690,6 +1703,15 @@ def _init(address_info=None,
if huge_pages:
raise Exception("When connecting to an existing cluster, "
"huge_pages must not be provided.")
if temp_dir is not None:
raise Exception("When connecting to an existing cluster, "
"temp_dir must not be provided.")
if plasma_store_socket_name is not None:
raise Exception("When connecting to an existing cluster, "
"plasma_store_socket_name must not be provided.")
if raylet_socket_name is not None:
raise Exception("When connecting to an existing cluster, "
"raylet_socket_name must not be provided.")
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address(redis_address)
@@ -1719,6 +1741,9 @@ def _init(address_info=None,
else:
driver_address_info["raylet_socket_name"] = (
address_info["raylet_socket_names"][0])
# We only pass `temp_dir` to a worker (WORKER_MODE).
# It can't be a worker here.
connect(
driver_address_info,
object_id_seed=object_id_seed,
@@ -1750,7 +1775,10 @@ def init(redis_address=None,
use_raylet=None,
configure_logging=True,
logging_level=logging.INFO,
logging_format=ray_constants.LOGGER_FORMAT):
logging_format=ray_constants.LOGGER_FORMAT,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
"""Connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1815,6 +1843,12 @@ def init(redis_address=None,
logging_level: Logging level, default will be loging.INFO.
logging_format: Logging format, default will be "%(message)s"
which means only contains the message.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
Returns:
Address information about the started processes.
@@ -1863,7 +1897,10 @@ def init(redis_address=None,
huge_pages=huge_pages,
include_webui=include_webui,
object_store_memory=object_store_memory,
use_raylet=use_raylet)
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
for hook in _post_init_hooks:
hook()
return ret
@@ -2135,8 +2172,9 @@ def connect(info,
else:
redirect_worker_output = 0
if redirect_worker_output:
log_stdout_file, log_stderr_file = services.new_log_files(
"worker", True)
log_stdout_file, log_stderr_file = (
tempfile_services.new_worker_redirected_log_file(
worker.worker_id))
sys.stdout = log_stdout_file
sys.stderr = log_stderr_file
services.record_log_files_in_redis(
+10
View File
@@ -9,6 +9,7 @@ import traceback
import ray
import ray.actor
import ray.ray_constants as ray_constants
import ray.tempfile_services as tempfile_services
parser = argparse.ArgumentParser(
description=("Parse addresses for the worker "
@@ -53,6 +54,12 @@ parser.add_argument(
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
parser.add_argument(
"--temp-dir",
required=False,
type=str,
default=None,
help="Specify the path of the temporary directory use by Ray process.")
if __name__ == "__main__":
args = parser.parse_args()
@@ -70,6 +77,9 @@ if __name__ == "__main__":
level=logging.getLevelName(args.logging_level.upper()),
format=args.logging_format)
# Override the temporary directory.
tempfile_services.set_temp_root(args.temp_dir)
ray.worker.connect(
info, mode=ray.WORKER_MODE, use_raylet=(args.raylet_name is not None))