mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 05:07:50 +08:00
Integrate "tempfile_service" into "ray.node.Node" (#3953)
This commit is contained in:
committed by
Robert Nishihara
parent
dac1969647
commit
21472b890a
+174
-48
@@ -3,21 +3,21 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import atexit
|
||||
import collections
|
||||
import datetime
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
import signal
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
from ray.tempfile_services import (
|
||||
get_logs_dir_path, get_object_store_socket_name, get_raylet_socket_name,
|
||||
new_log_monitor_log_file, new_monitor_log_file,
|
||||
new_raylet_monitor_log_file, new_plasma_store_log_file,
|
||||
new_raylet_log_file, new_webui_log_file, set_temp_root,
|
||||
try_to_create_directory)
|
||||
import ray.services
|
||||
from ray.utils import try_to_create_directory
|
||||
|
||||
# Logger for this module. It should be configured at the entry point
|
||||
# into the program using Ray. Ray configures it by default automatically
|
||||
@@ -36,7 +36,11 @@ class Node(object):
|
||||
server list, which has multiple.
|
||||
"""
|
||||
|
||||
def __init__(self, ray_params, head=False, shutdown_at_exit=True):
|
||||
def __init__(self,
|
||||
ray_params,
|
||||
head=False,
|
||||
shutdown_at_exit=True,
|
||||
connect_only=False):
|
||||
"""Start a node.
|
||||
|
||||
Args:
|
||||
@@ -48,7 +52,11 @@ class Node(object):
|
||||
shutdown_at_exit (bool): If true, a handler will be registered to
|
||||
shutdown the processes started here when the Python interpreter
|
||||
exits.
|
||||
connect_only (bool): If true, connect to the node without starting
|
||||
new processes.
|
||||
"""
|
||||
assert not (shutdown_at_exit and connect_only), (
|
||||
"'shutdown_at_exit' and 'connect_only' cannot be both true.")
|
||||
self.all_processes = {}
|
||||
|
||||
ray_params.update_if_absent(
|
||||
@@ -60,29 +68,60 @@ class Node(object):
|
||||
os.path.dirname(os.path.abspath(__file__)),
|
||||
"workers/default_worker.py"))
|
||||
|
||||
self._ray_params = ray_params
|
||||
|
||||
self._node_ip_address = ray_params.node_ip_address
|
||||
self._redis_address = ray_params.redis_address
|
||||
self._config = (json.loads(ray_params._internal_config)
|
||||
if ray_params._internal_config else None)
|
||||
|
||||
if head:
|
||||
ray_params.update_if_absent(num_redis_shards=1, include_webui=True)
|
||||
self._plasma_store_socket_name = None
|
||||
self._raylet_socket_name = None
|
||||
self._webui_url = None
|
||||
else:
|
||||
redis_client = ray.services.create_redis_client(
|
||||
ray_params.redis_address, ray_params.redis_password)
|
||||
self._plasma_store_socket_name = (
|
||||
ray_params.plasma_store_socket_name)
|
||||
self._raylet_socket_name = ray_params.raylet_socket_name
|
||||
redis_client = self.create_redis_client()
|
||||
# TODO(suquark): Replace _webui_url_helper in worker.py in
|
||||
# another PR.
|
||||
_webui_url = redis_client.hmget("webui", "url")[0]
|
||||
self._webui_url = (ray.utils.decode(_webui_url)
|
||||
if _webui_url is not None else None)
|
||||
ray_params.include_java = (
|
||||
ray.services.include_java_from_redis(redis_client))
|
||||
|
||||
self._ray_params = ray_params
|
||||
self._config = (json.loads(ray_params._internal_config)
|
||||
if ray_params._internal_config else None)
|
||||
self._node_ip_address = ray_params.node_ip_address
|
||||
self._redis_address = ray_params.redis_address
|
||||
self._plasma_store_socket_name = None
|
||||
self._raylet_socket_name = None
|
||||
self._webui_url = None
|
||||
self._init_temp()
|
||||
|
||||
self.start_ray_processes()
|
||||
if not connect_only:
|
||||
self.start_ray_processes()
|
||||
|
||||
if shutdown_at_exit:
|
||||
atexit.register(lambda: self.kill_all_processes(
|
||||
check_alive=False, allow_graceful=True))
|
||||
|
||||
def _init_temp(self):
|
||||
# Create an dictionary to store temp file index.
|
||||
self._incremental_dict = collections.defaultdict(lambda: 0)
|
||||
|
||||
self._temp_dir = self._ray_params.temp_dir
|
||||
if self._temp_dir is None:
|
||||
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
self._temp_dir = self._make_inc_temp(
|
||||
prefix="session_{date_str}_{pid}".format(
|
||||
pid=os.getpid(), date_str=date_str),
|
||||
directory_name="/tmp/ray")
|
||||
|
||||
try_to_create_directory(self._temp_dir)
|
||||
# Create a directory to be used for socket files.
|
||||
self._sockets_dir = os.path.join(self._temp_dir, "sockets")
|
||||
try_to_create_directory(self._sockets_dir)
|
||||
# Create a directory to be used for process log files.
|
||||
self._logs_dir = os.path.join(self._temp_dir, "logs")
|
||||
try_to_create_directory(self._logs_dir)
|
||||
|
||||
@property
|
||||
def node_ip_address(self):
|
||||
"""Get the cluster Redis address."""
|
||||
@@ -108,7 +147,87 @@ class Node(object):
|
||||
"""Get the node's raylet socket name."""
|
||||
return self._raylet_socket_name
|
||||
|
||||
def prepare_socket_file(self, socket_path):
|
||||
def create_redis_client(self):
|
||||
"""Create a redis client."""
|
||||
return ray.services.create_redis_client(
|
||||
self._redis_address, self._ray_params.redis_password)
|
||||
|
||||
def get_temp_dir_path(self):
|
||||
"""Get the path of the temporary directory."""
|
||||
return self._temp_dir
|
||||
|
||||
def get_logs_dir_path(self):
|
||||
"""Get the path of the log files directory."""
|
||||
return self._logs_dir
|
||||
|
||||
def get_sockets_dir_path(self):
|
||||
"""Get the path of the sockets directory."""
|
||||
return self._sockets_dir
|
||||
|
||||
def _make_inc_temp(self, suffix="", prefix="", directory_name="/tmp/ray"):
|
||||
"""Return a incremental temporary file name. The file is not created.
|
||||
|
||||
Args:
|
||||
suffix (str): The suffix of the temp file.
|
||||
prefix (str): The prefix of the temp file.
|
||||
directory_name (str) : The base directory of the temp file.
|
||||
|
||||
Returns:
|
||||
A string of file name. If there existing a file having
|
||||
the same name, the returned name will look like
|
||||
"{directory_name}/{prefix}.{unique_index}{suffix}"
|
||||
"""
|
||||
directory_name = os.path.expanduser(directory_name)
|
||||
index = self._incremental_dict[suffix, prefix, directory_name]
|
||||
# `tempfile.TMP_MAX` could be extremely large,
|
||||
# so using `range` in Python2.x should be avoided.
|
||||
while index < tempfile.TMP_MAX:
|
||||
if index == 0:
|
||||
filename = os.path.join(directory_name, prefix + suffix)
|
||||
else:
|
||||
filename = os.path.join(directory_name,
|
||||
prefix + "." + str(index) + suffix)
|
||||
index += 1
|
||||
if not os.path.exists(filename):
|
||||
# Save the index.
|
||||
self._incremental_dict[suffix, prefix, directory_name] = index
|
||||
return filename
|
||||
|
||||
raise FileExistsError(errno.EEXIST,
|
||||
"No usable temporary filename found")
|
||||
|
||||
def new_log_files(self, name, redirect_output=None):
|
||||
"""Generate partially randomized filenames for log files.
|
||||
|
||||
Args:
|
||||
name (str): descriptive string for this log file.
|
||||
redirect_output (bool): True if files should be generated for
|
||||
logging stdout and stderr and false if stdout and stderr
|
||||
should not be redirected.
|
||||
If it is None, it will use the "redirect_output" Ray parameter.
|
||||
|
||||
Returns:
|
||||
If redirect_output is true, this will return a tuple of two
|
||||
file handles. The first is for redirecting stdout and the
|
||||
second is for redirecting stderr.
|
||||
If redirect_output is false, this will return a tuple
|
||||
of two None objects.
|
||||
"""
|
||||
if redirect_output is None:
|
||||
redirect_output = self._ray_params.redirect_output
|
||||
if not redirect_output:
|
||||
return None, None
|
||||
|
||||
log_stdout = self._make_inc_temp(
|
||||
suffix=".out", prefix=name, directory_name=self._logs_dir)
|
||||
log_stderr = self._make_inc_temp(
|
||||
suffix=".err", prefix=name, directory_name=self._logs_dir)
|
||||
# Line-buffer the output (mode 1)
|
||||
log_stdout_file = open(log_stdout, "a", buffering=1)
|
||||
log_stderr_file = open(log_stderr, "a", buffering=1)
|
||||
return log_stdout_file, log_stderr_file
|
||||
|
||||
def _prepare_socket_file(self, socket_path, default_prefix):
|
||||
"""Prepare the socket file for raylet and plasma.
|
||||
|
||||
This method helps to prepare a socket file.
|
||||
@@ -118,24 +237,30 @@ class Node(object):
|
||||
Args:
|
||||
socket_path (string): the socket file to prepare.
|
||||
"""
|
||||
if not os.path.exists(socket_path):
|
||||
path = os.path.dirname(socket_path)
|
||||
if not os.path.isdir(path):
|
||||
try_to_create_directory(path)
|
||||
else:
|
||||
raise Exception("Socket file {} exists!".format(socket_path))
|
||||
if socket_path is not None:
|
||||
if os.path.exists(socket_path):
|
||||
raise Exception("Socket file {} exists!".format(socket_path))
|
||||
socket_dir = os.path.dirname(socket_path)
|
||||
try_to_create_directory(socket_dir)
|
||||
return socket_path
|
||||
return self._make_inc_temp(
|
||||
prefix=default_prefix, directory_name=self._sockets_dir)
|
||||
|
||||
def start_redis(self):
|
||||
"""Start the Redis servers."""
|
||||
assert self._redis_address is None
|
||||
redis_log_files = [self.new_log_files("redis")]
|
||||
for i in range(self._ray_params.num_redis_shards):
|
||||
redis_log_files.append(self.new_log_files("redis-shard_" + str(i)))
|
||||
|
||||
(self._redis_address, redis_shards,
|
||||
process_infos) = ray.services.start_redis(
|
||||
self._node_ip_address,
|
||||
redis_log_files,
|
||||
port=self._ray_params.redis_port,
|
||||
redis_shard_ports=self._ray_params.redis_shard_ports,
|
||||
num_redis_shards=self._ray_params.num_redis_shards,
|
||||
redis_max_clients=self._ray_params.redis_max_clients,
|
||||
redirect_output=self._ray_params.redirect_output,
|
||||
redirect_worker_output=self._ray_params.redirect_worker_output,
|
||||
password=self._ray_params.redis_password,
|
||||
redis_max_memory=self._ray_params.redis_max_memory)
|
||||
@@ -146,9 +271,10 @@ class Node(object):
|
||||
|
||||
def start_log_monitor(self):
|
||||
"""Start the log monitor."""
|
||||
stdout_file, stderr_file = new_log_monitor_log_file()
|
||||
stdout_file, stderr_file = self.new_log_files("log_monitor", True)
|
||||
process_info = ray.services.start_log_monitor(
|
||||
self.redis_address,
|
||||
self._logs_dir,
|
||||
stdout_file=stdout_file,
|
||||
stderr_file=stderr_file,
|
||||
redis_password=self._ray_params.redis_password)
|
||||
@@ -159,9 +285,12 @@ class Node(object):
|
||||
|
||||
def start_ui(self):
|
||||
"""Start the web UI."""
|
||||
stdout_file, stderr_file = new_webui_log_file()
|
||||
stdout_file, stderr_file = self.new_log_files("webui", True)
|
||||
notebook_name = self._make_inc_temp(
|
||||
suffix=".ipynb", prefix="ray_ui", directory_name=self._temp_dir)
|
||||
self._webui_url, process_info = ray.services.start_ui(
|
||||
self._redis_address,
|
||||
notebook_name,
|
||||
stdout_file=stdout_file,
|
||||
stderr_file=stderr_file)
|
||||
assert ray_constants.PROCESS_TYPE_WEB_UI not in self.all_processes
|
||||
@@ -174,15 +303,11 @@ class Node(object):
|
||||
"""Start the plasma store."""
|
||||
assert self._plasma_store_socket_name is None
|
||||
# If the user specified a socket name, use it.
|
||||
self._plasma_store_socket_name = (
|
||||
self._ray_params.plasma_store_socket_name
|
||||
or get_object_store_socket_name())
|
||||
self.prepare_socket_file(self._plasma_store_socket_name)
|
||||
stdout_file, stderr_file = (new_plasma_store_log_file(
|
||||
self._ray_params.redirect_output))
|
||||
self._plasma_store_socket_name = self._prepare_socket_file(
|
||||
self._ray_params.plasma_store_socket_name,
|
||||
default_prefix="plasma_store")
|
||||
stdout_file, stderr_file = self.new_log_files("plasma_store")
|
||||
process_info = ray.services.start_plasma_store(
|
||||
self._node_ip_address,
|
||||
self._redis_address,
|
||||
stdout_file=stdout_file,
|
||||
stderr_file=stderr_file,
|
||||
object_store_memory=self._ray_params.object_store_memory,
|
||||
@@ -206,17 +331,16 @@ class Node(object):
|
||||
"""
|
||||
assert self._raylet_socket_name is None
|
||||
# If the user specified a socket name, use it.
|
||||
self._raylet_socket_name = (self._ray_params.raylet_socket_name
|
||||
or get_raylet_socket_name())
|
||||
self.prepare_socket_file(self._raylet_socket_name)
|
||||
stdout_file, stderr_file = new_raylet_log_file(
|
||||
redirect_output=self._ray_params.redirect_output)
|
||||
self._raylet_socket_name = self._prepare_socket_file(
|
||||
self._ray_params.raylet_socket_name, default_prefix="raylet")
|
||||
stdout_file, stderr_file = self.new_log_files("raylet")
|
||||
process_info = ray.services.start_raylet(
|
||||
self._redis_address,
|
||||
self._node_ip_address,
|
||||
self._raylet_socket_name,
|
||||
self._plasma_store_socket_name,
|
||||
self._ray_params.worker_path,
|
||||
self._temp_dir,
|
||||
self._ray_params.num_cpus,
|
||||
self._ray_params.num_gpus,
|
||||
self._ray_params.resources,
|
||||
@@ -234,17 +358,21 @@ class Node(object):
|
||||
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
|
||||
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
|
||||
|
||||
def new_worker_redirected_log_file(self, worker_id):
|
||||
"""Create new logging files for workers to redirect its output."""
|
||||
worker_stdout_file, worker_stderr_file = (self.new_log_files(
|
||||
"worker-" + ray.utils.binary_to_hex(worker_id), True))
|
||||
return worker_stdout_file, worker_stderr_file
|
||||
|
||||
def start_worker(self):
|
||||
"""Start a worker process."""
|
||||
raise NotImplementedError
|
||||
|
||||
def start_monitor(self):
|
||||
"""Start the monitor."""
|
||||
stdout_file, stderr_file = new_monitor_log_file(
|
||||
self._ray_params.redirect_output)
|
||||
stdout_file, stderr_file = self.new_log_files("monitor")
|
||||
process_info = ray.services.start_monitor(
|
||||
self._redis_address,
|
||||
self._node_ip_address,
|
||||
stdout_file=stdout_file,
|
||||
stderr_file=stderr_file,
|
||||
autoscaling_config=self._ray_params.autoscaling_config,
|
||||
@@ -254,8 +382,7 @@ class Node(object):
|
||||
|
||||
def start_raylet_monitor(self):
|
||||
"""Start the raylet monitor."""
|
||||
stdout_file, stderr_file = new_raylet_monitor_log_file(
|
||||
self._ray_params.redirect_output)
|
||||
stdout_file, stderr_file = self.new_log_files("raylet_monitor")
|
||||
process_info = ray.services.start_raylet_monitor(
|
||||
self._redis_address,
|
||||
stdout_file=stdout_file,
|
||||
@@ -270,10 +397,9 @@ class Node(object):
|
||||
|
||||
def start_ray_processes(self):
|
||||
"""Start all of the processes on the node."""
|
||||
set_temp_root(self._ray_params.temp_dir)
|
||||
logger.info(
|
||||
"Process STDOUT and STDERR is being redirected to {}.".format(
|
||||
get_logs_dir_path()))
|
||||
self._logs_dir))
|
||||
|
||||
# If this is the head node, start the relevant head node processes.
|
||||
if self._redis_address is None:
|
||||
|
||||
+37
-34
@@ -10,6 +10,7 @@ import multiprocessing
|
||||
import os
|
||||
import random
|
||||
import resource
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -21,14 +22,6 @@ import pyarrow
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
|
||||
from ray.tempfile_services import (
|
||||
get_gdb_init_path,
|
||||
get_ipython_notebook_path,
|
||||
get_logs_dir_path,
|
||||
get_temp_root,
|
||||
new_redis_log_file,
|
||||
)
|
||||
|
||||
# True if processes are run in the valgrind profiler.
|
||||
RUN_RAYLET_PROFILER = False
|
||||
RUN_PLASMA_STORE_PROFILER = False
|
||||
@@ -291,7 +284,10 @@ def start_ray_process(command,
|
||||
if not use_tmux:
|
||||
raise ValueError(
|
||||
"If 'use_gdb' is true, then 'use_tmux' must be true as well.")
|
||||
gdb_init_path = get_gdb_init_path(process_type)
|
||||
|
||||
# TODO(suquark): Any better temp file creation here?
|
||||
gdb_init_path = "/tmp/ray/gdb_init_{}_{}".format(
|
||||
process_type, time.time())
|
||||
ray_process_path = command[0]
|
||||
ray_process_args = command[1:]
|
||||
run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args])
|
||||
@@ -458,11 +454,11 @@ def check_version_info(redis_client):
|
||||
|
||||
|
||||
def start_redis(node_ip_address,
|
||||
redirect_files,
|
||||
port=None,
|
||||
redis_shard_ports=None,
|
||||
num_redis_shards=1,
|
||||
redis_max_clients=None,
|
||||
redirect_output=False,
|
||||
redirect_worker_output=False,
|
||||
password=None,
|
||||
use_credis=None,
|
||||
@@ -473,6 +469,7 @@ def start_redis(node_ip_address,
|
||||
Args:
|
||||
node_ip_address: The IP address of the current node. This is only used
|
||||
for recording the log filenames in Redis.
|
||||
redirect_files: The list of (stdout, stderr) file pairs.
|
||||
port (int): If provided, the primary Redis shard will be started on
|
||||
this port.
|
||||
redis_shard_ports: A list of the ports to use for the non-primary Redis
|
||||
@@ -482,8 +479,6 @@ def start_redis(node_ip_address,
|
||||
shard.
|
||||
redis_max_clients: If this is provided, Ray will attempt to configure
|
||||
Redis with this maxclients number.
|
||||
redirect_output (bool): True if output should be redirected to a file
|
||||
and false otherwise.
|
||||
redirect_worker_output (bool): True if worker output should be
|
||||
redirected to a file and false otherwise. Workers will have access
|
||||
to this value when they start up.
|
||||
@@ -505,8 +500,10 @@ def start_redis(node_ip_address,
|
||||
addresses for the remaining shards, and the processes that were
|
||||
started.
|
||||
"""
|
||||
redis_stdout_file, redis_stderr_file = new_redis_log_file(redirect_output)
|
||||
|
||||
assert len(redirect_files) == 1 + num_redis_shards, (
|
||||
"The number of redirect file pairs should be equal to the number of"
|
||||
"redis shards (including the primary shard) we will start.")
|
||||
if redis_shard_ports is None:
|
||||
redis_shard_ports = num_redis_shards * [None]
|
||||
elif len(redis_shard_ports) != num_redis_shards:
|
||||
@@ -541,6 +538,7 @@ def start_redis(node_ip_address,
|
||||
redis_executable = REDIS_EXECUTABLE
|
||||
redis_modules = [REDIS_MODULE]
|
||||
|
||||
redis_stdout_file, redis_stderr_file = redirect_files[0]
|
||||
# Start the primary Redis shard.
|
||||
port, p = _start_redis_instance(
|
||||
redis_executable,
|
||||
@@ -587,9 +585,7 @@ def start_redis(node_ip_address,
|
||||
# prefixed by "redis-<shard number>".
|
||||
redis_shards = []
|
||||
for i in range(num_redis_shards):
|
||||
redis_stdout_file, redis_stderr_file = new_redis_log_file(
|
||||
redirect_output, shard_number=i)
|
||||
|
||||
redis_stdout_file, redis_stderr_file = redirect_files[i + 1]
|
||||
if use_credis:
|
||||
redis_executable = CREDIS_EXECUTABLE
|
||||
# It is important to load the credis module BEFORE the ray module,
|
||||
@@ -795,6 +791,7 @@ def _start_redis_instance(executable,
|
||||
|
||||
|
||||
def start_log_monitor(redis_address,
|
||||
logs_dir,
|
||||
stdout_file=None,
|
||||
stderr_file=None,
|
||||
redis_password=None):
|
||||
@@ -802,6 +799,7 @@ def start_log_monitor(redis_address,
|
||||
|
||||
Args:
|
||||
redis_address (str): The address of the Redis instance.
|
||||
logs_dir (str): The directory of logging files.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If
|
||||
no redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If
|
||||
@@ -815,8 +813,8 @@ def start_log_monitor(redis_address,
|
||||
os.path.dirname(os.path.abspath(__file__)), "log_monitor.py")
|
||||
command = [
|
||||
sys.executable, "-u", log_monitor_filepath,
|
||||
"--redis-address={}".format(redis_address), "--logs-dir={}".format(
|
||||
get_logs_dir_path())
|
||||
"--redis-address={}".format(redis_address),
|
||||
"--logs-dir={}".format(logs_dir)
|
||||
]
|
||||
if redis_password:
|
||||
command += ["--redis-password", redis_password]
|
||||
@@ -828,11 +826,12 @@ def start_log_monitor(redis_address,
|
||||
return process_info
|
||||
|
||||
|
||||
def start_ui(redis_address, stdout_file=None, stderr_file=None):
|
||||
def start_ui(redis_address, notebook_name, stdout_file=None, stderr_file=None):
|
||||
"""Start a UI process.
|
||||
|
||||
Args:
|
||||
redis_address: The address of the primary Redis shard.
|
||||
notebook_name: The destination of the notebook file.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If
|
||||
no redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If
|
||||
@@ -853,7 +852,12 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None):
|
||||
except socket.error:
|
||||
port += 1
|
||||
|
||||
notebook_name = get_ipython_notebook_path()
|
||||
notebook_filepath = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb")
|
||||
# We copy the notebook file so that the original doesn't get modified by
|
||||
# the user.
|
||||
shutil.copy(notebook_filepath, notebook_name)
|
||||
|
||||
new_notebook_directory = os.path.dirname(notebook_name)
|
||||
# We generate the token used for authentication ourselves to avoid
|
||||
# querying the jupyter server.
|
||||
@@ -955,6 +959,7 @@ def start_raylet(redis_address,
|
||||
raylet_name,
|
||||
plasma_store_name,
|
||||
worker_path,
|
||||
temp_dir,
|
||||
num_cpus=None,
|
||||
num_gpus=None,
|
||||
resources=None,
|
||||
@@ -978,6 +983,7 @@ def start_raylet(redis_address,
|
||||
to.
|
||||
worker_path (str): The path of the Python file that new worker
|
||||
processes will execute.
|
||||
temp_dir (str): The path of the temporary directory Ray will use.
|
||||
num_cpus: The CPUs allocated for this raylet.
|
||||
num_gpus: The GPUs allocated for this raylet.
|
||||
resources: The custom resources allocated for this raylet.
|
||||
@@ -1034,6 +1040,7 @@ def start_raylet(redis_address,
|
||||
plasma_store_name,
|
||||
raylet_name,
|
||||
redis_password,
|
||||
os.path.join(temp_dir, "sockets"),
|
||||
)
|
||||
else:
|
||||
java_worker_command = ""
|
||||
@@ -1047,7 +1054,7 @@ def start_raylet(redis_address,
|
||||
"--temp-dir={}".format(
|
||||
sys.executable, worker_path, node_ip_address,
|
||||
plasma_store_name, raylet_name, redis_address,
|
||||
get_temp_root()))
|
||||
temp_dir))
|
||||
if redis_password:
|
||||
start_worker_command += " --redis-password {}".format(redis_password)
|
||||
|
||||
@@ -1076,7 +1083,7 @@ def start_raylet(redis_address,
|
||||
start_worker_command,
|
||||
java_worker_command,
|
||||
redis_password or "",
|
||||
get_temp_root(),
|
||||
temp_dir,
|
||||
]
|
||||
process_info = start_ray_process(
|
||||
command,
|
||||
@@ -1097,6 +1104,7 @@ def build_java_worker_command(
|
||||
plasma_store_name,
|
||||
raylet_name,
|
||||
redis_password,
|
||||
temp_dir,
|
||||
):
|
||||
"""This method assembles the command used to start a Java worker.
|
||||
|
||||
@@ -1107,6 +1115,7 @@ def build_java_worker_command(
|
||||
to.
|
||||
raylet_name (str): The name of the raylet socket to create.
|
||||
redis_password (str): The password of connect to redis.
|
||||
temp_dir (str): The path of the temporary directory Ray will use.
|
||||
Returns:
|
||||
The command string for starting Java worker.
|
||||
"""
|
||||
@@ -1127,7 +1136,8 @@ def build_java_worker_command(
|
||||
command += ("-Dray.redis-password=%s", redis_password)
|
||||
|
||||
command += "-Dray.home={} ".format(RAY_HOME)
|
||||
command += "-Dray.log-dir={} ".format(get_logs_dir_path())
|
||||
# TODO(suquark): We should use temp_dir as the input of a java worker.
|
||||
command += "-Dray.log-dir={} ".format(os.path.join(temp_dir, "sockets"))
|
||||
command += "org.ray.runtime.runner.worker.DefaultWorker"
|
||||
|
||||
return command
|
||||
@@ -1279,9 +1289,7 @@ def _start_plasma_store(plasma_store_memory,
|
||||
return process_info
|
||||
|
||||
|
||||
def start_plasma_store(node_ip_address,
|
||||
redis_address,
|
||||
stdout_file=None,
|
||||
def start_plasma_store(stdout_file=None,
|
||||
stderr_file=None,
|
||||
object_store_memory=None,
|
||||
plasma_directory=None,
|
||||
@@ -1290,9 +1298,6 @@ def start_plasma_store(node_ip_address,
|
||||
"""This method starts an object store process.
|
||||
|
||||
Args:
|
||||
node_ip_address (str): The IP address of the node running the object
|
||||
store.
|
||||
redis_address (str): The address of the Redis instance to connect to.
|
||||
stdout_file: A file handle opened for writing to redirect stdout
|
||||
to. If no redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr
|
||||
@@ -1338,6 +1343,7 @@ def start_worker(node_ip_address,
|
||||
raylet_name,
|
||||
redis_address,
|
||||
worker_path,
|
||||
temp_dir,
|
||||
stdout_file=None,
|
||||
stderr_file=None):
|
||||
"""This method starts a worker process.
|
||||
@@ -1350,6 +1356,7 @@ def start_worker(node_ip_address,
|
||||
redis_address (str): The address that the Redis server is listening on.
|
||||
worker_path (str): The path of the source code which the worker process
|
||||
will run.
|
||||
temp_dir (str): The path of the temp dir.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If
|
||||
no redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If
|
||||
@@ -1363,8 +1370,7 @@ def start_worker(node_ip_address,
|
||||
"--node-ip-address=" + node_ip_address,
|
||||
"--object-store-name=" + object_store_name,
|
||||
"--raylet-name=" + raylet_name,
|
||||
"--redis-address=" + str(redis_address),
|
||||
"--temp-dir=" + get_temp_root()
|
||||
"--redis-address=" + str(redis_address), "--temp-dir=" + temp_dir
|
||||
]
|
||||
process_info = start_ray_process(
|
||||
command,
|
||||
@@ -1375,7 +1381,6 @@ def start_worker(node_ip_address,
|
||||
|
||||
|
||||
def start_monitor(redis_address,
|
||||
node_ip_address,
|
||||
stdout_file=None,
|
||||
stderr_file=None,
|
||||
autoscaling_config=None,
|
||||
@@ -1384,8 +1389,6 @@ def start_monitor(redis_address,
|
||||
|
||||
Args:
|
||||
redis_address (str): The address that the Redis server is listening on.
|
||||
node_ip_address: The IP address of the node that this process will run
|
||||
on.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If
|
||||
no redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If
|
||||
|
||||
@@ -1,240 +0,0 @@
|
||||
import collections
|
||||
import datetime
|
||||
import errno
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import ray.utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
_incremental_dict = collections.defaultdict(lambda: 0)
|
||||
_temp_root = None
|
||||
|
||||
|
||||
def make_inc_temp(suffix="", prefix="", directory_name="/tmp/ray"):
|
||||
"""Return a incremental temporary file name. The file is not created.
|
||||
|
||||
Args:
|
||||
suffix (str): The suffix of the temp file.
|
||||
prefix (str): The prefix of the temp file.
|
||||
directory_name (str) : The base directory of the temp file.
|
||||
|
||||
Returns:
|
||||
A string of file name. If there existing a file having the same name,
|
||||
the returned name will look like
|
||||
"{directory_name}/{prefix}.{unique_index}{suffix}"
|
||||
"""
|
||||
directory_name = os.path.expanduser(directory_name)
|
||||
index = _incremental_dict[suffix, prefix, directory_name]
|
||||
# `tempfile.TMP_MAX` could be extremely large,
|
||||
# so using `range` in Python2.x should be avoided.
|
||||
while index < tempfile.TMP_MAX:
|
||||
if index == 0:
|
||||
filename = os.path.join(directory_name, prefix + suffix)
|
||||
else:
|
||||
filename = os.path.join(directory_name,
|
||||
prefix + "." + str(index) + suffix)
|
||||
index += 1
|
||||
if not os.path.exists(filename):
|
||||
_incremental_dict[suffix, prefix,
|
||||
directory_name] = index # Save the index.
|
||||
return filename
|
||||
|
||||
raise FileExistsError(errno.EEXIST, "No usable temporary filename found")
|
||||
|
||||
|
||||
def try_to_create_directory(directory_path):
|
||||
"""Attempt to create a directory that is globally readable/writable.
|
||||
|
||||
Args:
|
||||
directory_path: The path of the directory to create.
|
||||
"""
|
||||
directory_path = os.path.expanduser(directory_path)
|
||||
if not os.path.exists(directory_path):
|
||||
try:
|
||||
os.makedirs(directory_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise e
|
||||
logger.warning(
|
||||
"Attempted to create '{}', but the directory already "
|
||||
"exists.".format(directory_path))
|
||||
# Change the log directory permissions so others can use it. This is
|
||||
# important when multiple people are using the same machine.
|
||||
try:
|
||||
os.chmod(directory_path, 0o0777)
|
||||
except OSError as e:
|
||||
# Silently suppress the PermissionError that is thrown by the chmod.
|
||||
# This is done because the user attempting to change the permissions
|
||||
# on a directory may not own it. The chmod is attempted whether the
|
||||
# directory is new or not to avoid race conditions.
|
||||
# ray-project/ray/#3591
|
||||
if e.errno in [errno.EACCES, errno.EPERM]:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def get_temp_root():
|
||||
"""Get the path of the temporary root. If not existing, it will be created.
|
||||
"""
|
||||
global _temp_root
|
||||
|
||||
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
|
||||
# Lazy creation. Avoid creating directories never used.
|
||||
if _temp_root is None:
|
||||
_temp_root = make_inc_temp(
|
||||
prefix="session_{date_str}_{pid}".format(
|
||||
pid=os.getpid(), date_str=date_str),
|
||||
directory_name="/tmp/ray")
|
||||
try_to_create_directory(_temp_root)
|
||||
return _temp_root
|
||||
|
||||
|
||||
def set_temp_root(path):
|
||||
"""Set the path of the temporary root. It will be created lazily."""
|
||||
global _temp_root
|
||||
_temp_root = path
|
||||
|
||||
|
||||
def get_logs_dir_path():
|
||||
"""Get a temp dir for logging."""
|
||||
logs_dir = os.path.join(get_temp_root(), "logs")
|
||||
try_to_create_directory(logs_dir)
|
||||
return logs_dir
|
||||
|
||||
|
||||
def get_sockets_dir_path():
|
||||
"""Get a temp dir for sockets."""
|
||||
sockets_dir = os.path.join(get_temp_root(), "sockets")
|
||||
try_to_create_directory(sockets_dir)
|
||||
return sockets_dir
|
||||
|
||||
|
||||
def get_raylet_socket_name(suffix=""):
|
||||
"""Get a socket name for raylet."""
|
||||
sockets_dir = get_sockets_dir_path()
|
||||
|
||||
raylet_socket_name = make_inc_temp(
|
||||
prefix="raylet", directory_name=sockets_dir, suffix=suffix)
|
||||
return raylet_socket_name
|
||||
|
||||
|
||||
def get_object_store_socket_name():
|
||||
"""Get a socket name for plasma object store."""
|
||||
sockets_dir = get_sockets_dir_path()
|
||||
return make_inc_temp(prefix="plasma_store", directory_name=sockets_dir)
|
||||
|
||||
|
||||
def get_ipython_notebook_path():
|
||||
"""Get a new ipython notebook path"""
|
||||
|
||||
notebook_filepath = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)), "WebUI.ipynb")
|
||||
# We copy the notebook file so that the original doesn't get modified by
|
||||
# the user.
|
||||
notebook_name = make_inc_temp(
|
||||
suffix=".ipynb", prefix="ray_ui", directory_name=get_temp_root())
|
||||
shutil.copy(notebook_filepath, notebook_name)
|
||||
return notebook_name
|
||||
|
||||
|
||||
def get_gdb_init_path(process_type):
|
||||
return make_inc_temp(
|
||||
prefix="gdb_init_{}".format(process_type),
|
||||
directory_name=get_temp_root())
|
||||
|
||||
|
||||
def new_log_files(name, redirect_output):
|
||||
"""Generate partially randomized filenames for log files.
|
||||
|
||||
Args:
|
||||
name (str): descriptive string for this log file.
|
||||
redirect_output (bool): True if files should be generated for logging
|
||||
stdout and stderr and false if stdout and stderr should not be
|
||||
redirected.
|
||||
|
||||
Returns:
|
||||
If redirect_output is true, this will return a tuple of two
|
||||
filehandles. The first is for redirecting stdout and the second is
|
||||
for redirecting stderr. If redirect_output is false, this will
|
||||
return a tuple of two None objects.
|
||||
"""
|
||||
if not redirect_output:
|
||||
return None, None
|
||||
|
||||
# Create a directory to be used for process log files.
|
||||
logs_dir = get_logs_dir_path()
|
||||
# Create another directory that will be used by some of the RL algorithms.
|
||||
|
||||
log_stdout = make_inc_temp(
|
||||
suffix=".out", prefix=name, directory_name=logs_dir)
|
||||
log_stderr = make_inc_temp(
|
||||
suffix=".err", prefix=name, directory_name=logs_dir)
|
||||
# Line-buffer the output (mode 1)
|
||||
log_stdout_file = open(log_stdout, "a", buffering=1)
|
||||
log_stderr_file = open(log_stderr, "a", buffering=1)
|
||||
return log_stdout_file, log_stderr_file
|
||||
|
||||
|
||||
def new_redis_log_file(redirect_output, shard_number=None):
|
||||
"""Create new logging files for redis"""
|
||||
if shard_number is None:
|
||||
redis_stdout_file, redis_stderr_file = new_log_files(
|
||||
"redis", redirect_output)
|
||||
else:
|
||||
redis_stdout_file, redis_stderr_file = new_log_files(
|
||||
"redis-shard_{}".format(shard_number), redirect_output)
|
||||
return redis_stdout_file, redis_stderr_file
|
||||
|
||||
|
||||
def new_raylet_log_file(redirect_output):
|
||||
"""Create new logging files for raylet."""
|
||||
raylet_stdout_file, raylet_stderr_file = new_log_files(
|
||||
"raylet", redirect_output=redirect_output)
|
||||
return raylet_stdout_file, raylet_stderr_file
|
||||
|
||||
|
||||
def new_webui_log_file():
|
||||
"""Create new logging files for web ui."""
|
||||
ui_stdout_file, ui_stderr_file = new_log_files(
|
||||
"webui", redirect_output=True)
|
||||
return ui_stdout_file, ui_stderr_file
|
||||
|
||||
|
||||
def new_worker_redirected_log_file(worker_id):
|
||||
"""Create new logging files for workers to redirect its output."""
|
||||
worker_stdout_file, worker_stderr_file = (new_log_files(
|
||||
"worker-" + ray.utils.binary_to_hex(worker_id), True))
|
||||
return worker_stdout_file, worker_stderr_file
|
||||
|
||||
|
||||
def new_log_monitor_log_file():
|
||||
"""Create new logging files for the log monitor."""
|
||||
log_monitor_stdout_file, log_monitor_stderr_file = new_log_files(
|
||||
"log_monitor", redirect_output=True)
|
||||
return log_monitor_stdout_file, log_monitor_stderr_file
|
||||
|
||||
|
||||
def new_plasma_store_log_file(redirect_output):
|
||||
"""Create new logging files for the plasma store."""
|
||||
plasma_store_stdout_file, plasma_store_stderr_file = new_log_files(
|
||||
"plasma_store", redirect_output)
|
||||
return plasma_store_stdout_file, plasma_store_stderr_file
|
||||
|
||||
|
||||
def new_monitor_log_file(redirect_output):
|
||||
"""Create new logging files for the monitor."""
|
||||
monitor_stdout_file, monitor_stderr_file = new_log_files(
|
||||
"monitor", redirect_output)
|
||||
return monitor_stdout_file, monitor_stderr_file
|
||||
|
||||
|
||||
def new_raylet_monitor_log_file(redirect_output):
|
||||
"""Create new logging files for the raylet monitor."""
|
||||
raylet_monitor_stdout_file, raylet_monitor_stderr_file = new_log_files(
|
||||
"raylet_monitor", redirect_output)
|
||||
return raylet_monitor_stdout_file, raylet_monitor_stderr_file
|
||||
@@ -3,6 +3,7 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import binascii
|
||||
import errno
|
||||
import functools
|
||||
import hashlib
|
||||
import inspect
|
||||
@@ -465,3 +466,36 @@ def thread_safe_client(client, lock=None):
|
||||
|
||||
def is_main_thread():
|
||||
return threading.current_thread().getName() == "MainThread"
|
||||
|
||||
|
||||
def try_to_create_directory(directory_path):
|
||||
"""Attempt to create a directory that is globally readable/writable.
|
||||
|
||||
Args:
|
||||
directory_path: The path of the directory to create.
|
||||
"""
|
||||
logger = logging.getLogger("ray")
|
||||
directory_path = os.path.expanduser(directory_path)
|
||||
if not os.path.exists(directory_path):
|
||||
try:
|
||||
os.makedirs(directory_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise e
|
||||
logger.warning(
|
||||
"Attempted to create '{}', but the directory already "
|
||||
"exists.".format(directory_path))
|
||||
# Change the log directory permissions so others can use it. This is
|
||||
# important when multiple people are using the same machine.
|
||||
try:
|
||||
os.chmod(directory_path, 0o0777)
|
||||
except OSError as e:
|
||||
# Silently suppress the PermissionError that is thrown by the chmod.
|
||||
# This is done because the user attempting to change the permissions
|
||||
# on a directory may not own it. The chmod is attempted whether the
|
||||
# directory is new or not to avoid race conditions.
|
||||
# ray-project/ray/#3591
|
||||
if e.errno in [errno.EACCES, errno.EPERM]:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
+16
-6
@@ -32,7 +32,6 @@ import ray.remote_function
|
||||
import ray.serialization as serialization
|
||||
import ray.services as services
|
||||
import ray.signature
|
||||
import ray.tempfile_services as tempfile_services
|
||||
import ray.ray_constants as ray_constants
|
||||
from ray import import_thread
|
||||
from ray import ObjectID, DriverID, ActorID, ActorHandleID, ClientID, TaskID
|
||||
@@ -1387,10 +1386,13 @@ def init(redis_address=None,
|
||||
"redis_address": redis_address
|
||||
}
|
||||
|
||||
global _global_node
|
||||
if driver_mode == LOCAL_MODE:
|
||||
# If starting Ray in LOCAL_MODE, don't start any other processes.
|
||||
pass
|
||||
elif redis_address is None:
|
||||
# TODO(suquark): We should remove the code below because they
|
||||
# have been set when initializing the node.
|
||||
if node_ip_address is None:
|
||||
node_ip_address = ray.services.get_node_ip_address()
|
||||
if num_redis_shards is None:
|
||||
@@ -1424,7 +1426,6 @@ def init(redis_address=None,
|
||||
# Start the Ray processes. We set shutdown_at_exit=False because we
|
||||
# shutdown the node in the ray.shutdown call that happens in the atexit
|
||||
# handler.
|
||||
global _global_node
|
||||
_global_node = ray.node.Node(
|
||||
head=True, shutdown_at_exit=False, ray_params=ray_params)
|
||||
address_info["redis_address"] = _global_node.redis_address
|
||||
@@ -1481,6 +1482,18 @@ def init(redis_address=None,
|
||||
# Get the address info of the processes to connect to from Redis.
|
||||
address_info = get_address_info_from_redis(
|
||||
redis_address, node_ip_address, redis_password=redis_password)
|
||||
# TODO(suquark): Use "node" as the input of "connect()".
|
||||
# In this case, we only need to connect the node.
|
||||
ray_params = ray.parameter.RayParams(
|
||||
node_ip_address=node_ip_address,
|
||||
redis_address=redis_address,
|
||||
redis_password=redis_password,
|
||||
plasma_store_socket_name=address_info["object_store_address"],
|
||||
raylet_socket_name=address_info["raylet_socket_name"],
|
||||
object_id_seed=object_id_seed,
|
||||
temp_dir=temp_dir)
|
||||
_global_node = ray.node.Node(
|
||||
ray_params, head=False, shutdown_at_exit=False, connect_only=True)
|
||||
|
||||
if driver_mode == LOCAL_MODE:
|
||||
driver_address_info = {}
|
||||
@@ -1493,8 +1506,6 @@ def init(redis_address=None,
|
||||
"raylet_socket_name": address_info["raylet_socket_name"],
|
||||
}
|
||||
|
||||
# We only pass `temp_dir` to a worker (WORKER_MODE).
|
||||
# It can't be a worker here.
|
||||
connect(
|
||||
driver_address_info,
|
||||
redis_password=redis_password,
|
||||
@@ -1851,8 +1862,7 @@ def connect(info,
|
||||
redirect_worker_output = 0
|
||||
if redirect_worker_output:
|
||||
log_stdout_file, log_stderr_file = (
|
||||
tempfile_services.new_worker_redirected_log_file(
|
||||
worker.worker_id))
|
||||
_global_node.new_worker_redirected_log_file(worker.worker_id))
|
||||
# Redirect stdout/stderr at the file descriptor level. If we simply
|
||||
# set sys.stdout and sys.stderr, then logging from C++ can fail to
|
||||
# be redirected.
|
||||
|
||||
@@ -7,9 +7,10 @@ import traceback
|
||||
|
||||
import ray
|
||||
import ray.actor
|
||||
import ray.node
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.tempfile_services as tempfile_services
|
||||
import ray.utils
|
||||
from ray.parameter import RayParams
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description=("Parse addresses for the worker "
|
||||
@@ -70,9 +71,19 @@ if __name__ == "__main__":
|
||||
|
||||
ray.utils.setup_logger(args.logging_level, args.logging_format)
|
||||
|
||||
# Override the temporary directory.
|
||||
tempfile_services.set_temp_root(args.temp_dir)
|
||||
ray_params = RayParams(
|
||||
node_ip_address=args.node_ip_address,
|
||||
redis_address=args.redis_address,
|
||||
redis_password=args.redis_password,
|
||||
plasma_store_socket_name=args.object_store_name,
|
||||
raylet_socket_name=args.raylet_name,
|
||||
temp_dir=args.temp_dir)
|
||||
|
||||
node = ray.node.Node(
|
||||
ray_params, head=False, shutdown_at_exit=False, connect_only=True)
|
||||
ray.worker._global_node = node
|
||||
|
||||
# TODO(suquark): Use "node" as the input of "connect".
|
||||
ray.worker.connect(
|
||||
info, redis_password=args.redis_password, mode=ray.WORKER_MODE)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user