mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 20:57:51 +08:00
Redirect process output to log files (#267)
* redirect process output to log files * formatting fixes * Generate all log files in start_ray_processes. * Fix bug.
This commit is contained in:
committed by
Philipp Moritz
parent
dd7e8d9105
commit
c9bc488ee0
@@ -6,7 +6,9 @@ import os
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False, redirect_output=False):
|
||||
def start_global_scheduler(redis_address, use_valgrind=False,
|
||||
use_profiler=False, stdout_file=None,
|
||||
stderr_file=None):
|
||||
"""Start a global scheduler process.
|
||||
|
||||
Args:
|
||||
@@ -15,8 +17,10 @@ def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False
|
||||
of valgrind. If this is True, use_profiler must be False.
|
||||
use_profiler (bool): True if the global scheduler should be started inside a
|
||||
profiler. If this is True, use_valgrind must be False.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
|
||||
Return:
|
||||
The process ID of the global scheduler process.
|
||||
@@ -25,16 +29,19 @@ def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False
|
||||
raise Exception("Cannot use valgrind and profiler at the same time.")
|
||||
global_scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/global_scheduler/global_scheduler")
|
||||
command = [global_scheduler_executable, "-r", redis_address]
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
if use_valgrind:
|
||||
pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(1.0)
|
||||
elif use_profiler:
|
||||
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(1.0)
|
||||
else:
|
||||
pid = subprocess.Popen(command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(0.1)
|
||||
if use_valgrind:
|
||||
pid = subprocess.Popen(["valgrind",
|
||||
"--track-origins=yes",
|
||||
"--leak-check=full",
|
||||
"--show-leak-kinds=all",
|
||||
"--error-exitcode=1"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(1.0)
|
||||
elif use_profiler:
|
||||
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(1.0)
|
||||
else:
|
||||
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(0.1)
|
||||
return pid
|
||||
|
||||
@@ -18,7 +18,8 @@ def start_local_scheduler(plasma_store_name,
|
||||
redis_address=None,
|
||||
use_valgrind=False,
|
||||
use_profiler=False,
|
||||
redirect_output=False,
|
||||
stdout_file=None,
|
||||
stderr_file=None,
|
||||
static_resource_list=None,
|
||||
num_workers=0):
|
||||
"""Start a local scheduler process.
|
||||
@@ -41,8 +42,10 @@ def start_local_scheduler(plasma_store_name,
|
||||
valgrind. If this is True, use_profiler must be False.
|
||||
use_profiler (bool): True if the local scheduler should be started inside a
|
||||
profiler. If this is True, use_valgrind must be False.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
static_resource_list (list): A list of integers specifying the local
|
||||
scheduler's resource capacities. The resources should appear in an order
|
||||
matching the order defined in task.h.
|
||||
@@ -63,8 +66,7 @@ def start_local_scheduler(plasma_store_name,
|
||||
"-s", local_scheduler_name,
|
||||
"-p", plasma_store_name,
|
||||
"-h", node_ip_address,
|
||||
"-n", str(num_workers),
|
||||
]
|
||||
"-n", str(num_workers)]
|
||||
if plasma_manager_name is not None:
|
||||
command += ["-m", plasma_manager_name]
|
||||
if worker_path is not None:
|
||||
@@ -91,16 +93,19 @@ def start_local_scheduler(plasma_store_name,
|
||||
assert all([isinstance(resource, int) or isinstance(resource, float) for resource in static_resource_list])
|
||||
command += ["-c", ",".join([str(resource) for resource in static_resource_list])]
|
||||
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
if use_valgrind:
|
||||
pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(1.0)
|
||||
elif use_profiler:
|
||||
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(1.0)
|
||||
else:
|
||||
pid = subprocess.Popen(command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(0.1)
|
||||
if use_valgrind:
|
||||
pid = subprocess.Popen(["valgrind",
|
||||
"--track-origins=yes",
|
||||
"--leak-check=full",
|
||||
"--show-leak-kinds=all",
|
||||
"--error-exitcode=1"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(1.0)
|
||||
elif use_profiler:
|
||||
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(1.0)
|
||||
else:
|
||||
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(0.1)
|
||||
return local_scheduler_name, pid
|
||||
|
||||
+40
-27
@@ -291,7 +291,9 @@ DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9
|
||||
def random_name():
|
||||
return str(random.randint(0, 99999999))
|
||||
|
||||
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=False, use_profiler=False, redirect_output=False):
|
||||
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
|
||||
use_valgrind=False, use_profiler=False,
|
||||
stdout_file=None, stderr_file=None):
|
||||
"""Start a plasma store process.
|
||||
|
||||
Args:
|
||||
@@ -299,8 +301,10 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg
|
||||
valgrind. If this is True, use_profiler must be False.
|
||||
use_profiler (bool): True if the plasma store should be started inside a
|
||||
profiler. If this is True, use_valgrind must be False.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
|
||||
Return:
|
||||
A tuple of the name of the plasma store socket and the process ID of the
|
||||
@@ -311,18 +315,21 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg
|
||||
plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/plasma/plasma_store")
|
||||
plasma_store_name = "/tmp/plasma_store{}".format(random_name())
|
||||
command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)]
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
if use_valgrind:
|
||||
pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(1.0)
|
||||
elif use_profiler:
|
||||
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(1.0)
|
||||
else:
|
||||
pid = subprocess.Popen(command, stdout=stdout, stderr=stderr)
|
||||
time.sleep(0.1)
|
||||
if use_valgrind:
|
||||
pid = subprocess.Popen(["valgrind",
|
||||
"--track-origins=yes",
|
||||
"--leak-check=full",
|
||||
"--show-leak-kinds=all",
|
||||
"--error-exitcode=1"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(1.0)
|
||||
elif use_profiler:
|
||||
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(1.0)
|
||||
else:
|
||||
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(0.1)
|
||||
return plasma_store_name, pid
|
||||
|
||||
def new_port():
|
||||
@@ -331,7 +338,7 @@ def new_port():
|
||||
def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1",
|
||||
plasma_manager_port=None, num_retries=20,
|
||||
use_valgrind=False, run_profiler=False,
|
||||
redirect_output=False):
|
||||
stdout_file=None, stderr_file=None):
|
||||
"""Start a plasma manager and return the ports it listens on.
|
||||
|
||||
Args:
|
||||
@@ -342,8 +349,10 @@ def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1",
|
||||
is not provided, a port will be generated at random.
|
||||
use_valgrind (bool): True if the Plasma manager should be started inside of
|
||||
valgrind and False otherwise.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
|
||||
Returns:
|
||||
A tuple of the Plasma manager socket name, the process ID of the Plasma
|
||||
@@ -370,15 +379,19 @@ def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1",
|
||||
"-h", node_ip_address,
|
||||
"-p", str(plasma_manager_port),
|
||||
"-r", redis_address]
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
if use_valgrind:
|
||||
process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr)
|
||||
elif run_profiler:
|
||||
process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr)
|
||||
else:
|
||||
process = subprocess.Popen(command, stdout=stdout, stderr=stderr)
|
||||
if use_valgrind:
|
||||
process = subprocess.Popen(["valgrind",
|
||||
"--track-origins=yes",
|
||||
"--leak-check=full",
|
||||
"--show-leak-kinds=all",
|
||||
"--error-exitcode=1"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
elif run_profiler:
|
||||
process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
else:
|
||||
process = subprocess.Popen(command, stdout=stdout_file,
|
||||
stderr=stderr_file)
|
||||
# This sleep is critical. If the plasma_manager fails to start because the
|
||||
# port is already in use, then we need it to fail within 0.1 seconds.
|
||||
time.sleep(0.1)
|
||||
|
||||
+164
-63
@@ -195,17 +195,20 @@ def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5):
|
||||
if counter == num_retries:
|
||||
raise Exception("Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly.")
|
||||
|
||||
def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False):
|
||||
def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None,
|
||||
cleanup=True):
|
||||
"""Start a Redis server.
|
||||
|
||||
Args:
|
||||
port (int): If provided, start a Redis server with this port.
|
||||
num_retries (int): The number of times to attempt to start Redis.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
|
||||
this process will be killed by serices.cleanup() when the Python process
|
||||
that imported services exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
|
||||
Returns:
|
||||
The port used by Redis. If a port is passed in, then the same value is
|
||||
@@ -227,10 +230,11 @@ def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False):
|
||||
while counter < num_retries:
|
||||
if counter > 0:
|
||||
print("Redis failed to start, retrying now.")
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning", "--loadmodule", redis_module], stdout=stdout, stderr=stderr)
|
||||
p = subprocess.Popen([redis_filepath,
|
||||
"--port", str(port),
|
||||
"--loglevel", "warning",
|
||||
"--loadmodule", redis_module],
|
||||
stdout=stdout_file, stderr=stderr_file)
|
||||
time.sleep(0.1)
|
||||
# Check if Redis successfully started (or at least if it the executable did
|
||||
# not exit within 0.1 seconds).
|
||||
@@ -257,31 +261,48 @@ def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False):
|
||||
redis_client.set("redis_start_time", time.time())
|
||||
return port
|
||||
|
||||
def start_global_scheduler(redis_address, cleanup=True, redirect_output=False):
|
||||
def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None,
|
||||
cleanup=True):
|
||||
"""Start a global scheduler process.
|
||||
|
||||
Args:
|
||||
redis_address (str): The address of the Redis instance.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
|
||||
this process will be killed by services.cleanup() when the Python process
|
||||
that imported services exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
"""
|
||||
p = global_scheduler.start_global_scheduler(redis_address, redirect_output=redirect_output)
|
||||
p = global_scheduler.start_global_scheduler(redis_address,
|
||||
stdout_file=stdout_file,
|
||||
stderr_file=stderr_file)
|
||||
if cleanup:
|
||||
all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p)
|
||||
|
||||
def start_webui(redis_address, cleanup=True, redirect_output=False):
|
||||
def start_webui(redis_address, backend_stdout_file=None,
|
||||
backend_stderr_file=None, polymer_stdout_file=None,
|
||||
polymer_stderr_file=None, cleanup=True):
|
||||
"""Attempt to start the Ray web UI.
|
||||
|
||||
Args:
|
||||
redis_address (str): The address of the Redis server.
|
||||
backend_stdout_file: A file handle opened for writing to redirect the
|
||||
backend stdout to. If no redirection should happen, then this should be
|
||||
None.
|
||||
backend_stderr_file: A file handle opened for writing to redirect the
|
||||
backend stderr to. If no redirection should happen, then this should be
|
||||
None.
|
||||
polymer_stdout_file: A file handle opened for writing to redirect the
|
||||
polymer stdout to. If no redirection should happen, then this should be
|
||||
None.
|
||||
polymer_stderr_file: A file handle opened for writing to redirect the
|
||||
polymer stderr to. If no redirection should happen, then this should be
|
||||
None.
|
||||
cleanup (bool): True if using Ray in local mode. If cleanup is True, then
|
||||
this process will be killed by services.cleanup() when the Python process
|
||||
that imported services exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
|
||||
Return:
|
||||
True if the web UI was successfully started, otherwise false.
|
||||
@@ -300,13 +321,11 @@ def start_webui(redis_address, cleanup=True, redirect_output=False):
|
||||
print("Not starting the web UI because the web UI requires Python 3.")
|
||||
return False
|
||||
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
backend_process = subprocess.Popen([python_executable,
|
||||
webui_backend_filepath,
|
||||
"--redis-address", redis_address],
|
||||
stdout=stdout, stderr=stderr)
|
||||
backend_process = subprocess.Popen([python_executable,
|
||||
webui_backend_filepath,
|
||||
"--redis-address", redis_address],
|
||||
stdout=backend_stdout_file,
|
||||
stderr=backend_stderr_file)
|
||||
|
||||
time.sleep(0.1)
|
||||
if backend_process.poll() is not None:
|
||||
@@ -318,12 +337,10 @@ def start_webui(redis_address, cleanup=True, redirect_output=False):
|
||||
# use. It'd be nice to test for this, but doing so by calling "bind" may start
|
||||
# using the port and prevent polymer from using it.
|
||||
try:
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
polymer_process = subprocess.Popen(["polymer", "serve", "--port", "8080"],
|
||||
cwd=webui_directory,
|
||||
stdout=stdout, stderr=stderr)
|
||||
polymer_process = subprocess.Popen(["polymer", "serve", "--port", "8080"],
|
||||
cwd=webui_directory,
|
||||
stdout=polymer_stdout_file,
|
||||
stderr=polymer_stderr_file)
|
||||
except Exception as e:
|
||||
print("Failed to start polymer.")
|
||||
# Kill the backend since it won't work without polymer.
|
||||
@@ -359,8 +376,9 @@ def start_local_scheduler(redis_address,
|
||||
plasma_manager_name,
|
||||
worker_path,
|
||||
plasma_address=None,
|
||||
stdout_file=None,
|
||||
stderr_file=None,
|
||||
cleanup=True,
|
||||
redirect_output=False,
|
||||
num_cpus=None,
|
||||
num_gpus=None,
|
||||
num_workers=0):
|
||||
@@ -375,11 +393,13 @@ def start_local_scheduler(redis_address,
|
||||
to.
|
||||
worker_path (str): The path of the script to use when the local scheduler
|
||||
starts up new workers.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
|
||||
this process will be killed by serices.cleanup() when the Python process
|
||||
that imported services exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
num_cpus: The number of CPUs the local scheduler should be configured with.
|
||||
num_gpus: The number of GPUs the local scheduler should be configured with.
|
||||
num_workers (int): The number of workers that the local scheduler should
|
||||
@@ -402,7 +422,8 @@ def start_local_scheduler(redis_address,
|
||||
redis_address=redis_address,
|
||||
plasma_address=plasma_address,
|
||||
use_profiler=RUN_PHOTON_PROFILER,
|
||||
redirect_output=redirect_output,
|
||||
stdout_file=stdout_file,
|
||||
stderr_file=stderr_file,
|
||||
static_resource_list=[num_cpus, num_gpus],
|
||||
num_workers=num_workers)
|
||||
if cleanup:
|
||||
@@ -410,7 +431,9 @@ def start_local_scheduler(redis_address,
|
||||
return local_scheduler_name
|
||||
|
||||
def start_objstore(node_ip_address, redis_address, object_manager_port=None,
|
||||
cleanup=True, redirect_output=False, objstore_memory=None):
|
||||
store_stdout_file=None, store_stderr_file=None,
|
||||
manager_stdout_file=None, manager_stderr_file=None,
|
||||
cleanup=True, objstore_memory=None):
|
||||
"""This method starts an object store process.
|
||||
|
||||
Args:
|
||||
@@ -418,11 +441,19 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None,
|
||||
redis_address (str): The address of the Redis instance to connect to.
|
||||
object_manager_port (int): The port to use for the object manager. If this
|
||||
is not provided, one will be generated randomly.
|
||||
store_stdout_file: A file handle opened for writing to redirect stdout to.
|
||||
If no redirection should happen, then this should be None.
|
||||
store_stderr_file: A file handle opened for writing to redirect stderr to.
|
||||
If no redirection should happen, then this should be None.
|
||||
manager_stdout_file: A file handle opened for writing to redirect stdout to.
|
||||
If no redirection should happen, then this should be None.
|
||||
manager_stderr_file: A file handle opened for writing to redirect stderr to.
|
||||
If no redirection should happen, then this should be None.
|
||||
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
|
||||
this process will be killed by serices.cleanup() when the Python process
|
||||
that imported services exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
objstore_memory: The amount of memory (in bytes) to start the object store
|
||||
with.
|
||||
|
||||
Return:
|
||||
A tuple of the Plasma store socket name, the Plasma manager socket name, and
|
||||
@@ -444,20 +475,39 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None,
|
||||
# shm_fs_stats.f_bavail is the number of available blocks.
|
||||
shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail
|
||||
if objstore_memory > shm_avail:
|
||||
print("Warning: Reducing object store memory because /dev/shm has only {} bytes available. You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you may need to pass an argument with the flag '--shm-size' to 'docker run'.".format(shm_avail))
|
||||
print("Warning: Reducing object store memory because /dev/shm has "
|
||||
"only {} bytes available. You may be able to free up space by "
|
||||
"deleting files in /dev/shm. If you are inside a Docker "
|
||||
"container, you may need to pass an argument with the flag "
|
||||
"'--shm-size' to 'docker run'.".format(shm_avail))
|
||||
objstore_memory = int(shm_avail * 0.8)
|
||||
finally:
|
||||
os.close(shm_fd)
|
||||
else:
|
||||
objstore_memory = int(system_memory * 0.8)
|
||||
# Start the Plasma store.
|
||||
plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=objstore_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, redirect_output=redirect_output)
|
||||
plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=objstore_memory,
|
||||
use_profiler=RUN_PLASMA_STORE_PROFILER,
|
||||
stdout_file=store_stdout_file,
|
||||
stderr_file=store_stderr_file)
|
||||
# Start the plasma manager.
|
||||
if object_manager_port is not None:
|
||||
plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, plasma_manager_port=object_manager_port, node_ip_address=node_ip_address, num_retries=1, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output)
|
||||
plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name,
|
||||
redis_address,
|
||||
plasma_manager_port=object_manager_port,
|
||||
node_ip_address=node_ip_address,
|
||||
num_retries=1,
|
||||
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
|
||||
stdout_file=manager_stdout_file,
|
||||
stderr_file=manager_stderr_file)
|
||||
assert plasma_manager_port == object_manager_port
|
||||
else:
|
||||
plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, node_ip_address=node_ip_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output)
|
||||
plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name,
|
||||
redis_address,
|
||||
node_ip_address=node_ip_address,
|
||||
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
|
||||
stdout_file=manager_stdout_file,
|
||||
stderr_file=manager_stderr_file)
|
||||
if cleanup:
|
||||
all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1)
|
||||
all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2)
|
||||
@@ -465,7 +515,9 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None,
|
||||
return ObjectStoreAddress(plasma_store_name, plasma_manager_name,
|
||||
plasma_manager_port)
|
||||
|
||||
def start_worker(node_ip_address, object_store_name, object_store_manager_name, local_scheduler_name, redis_address, worker_path, cleanup=True, redirect_output=False):
|
||||
def start_worker(node_ip_address, object_store_name, object_store_manager_name,
|
||||
local_scheduler_name, redis_address, worker_path,
|
||||
stdout_file=None, stderr_file=None, cleanup=True):
|
||||
"""This method starts a worker process.
|
||||
|
||||
Args:
|
||||
@@ -477,11 +529,13 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name,
|
||||
redis_address (int): The address that the Redis server is listening on.
|
||||
worker_path (str): The path of the source code which the worker process will
|
||||
run.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If no
|
||||
redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If no
|
||||
redirection should happen, then this should be None.
|
||||
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
|
||||
this process will be killed by services.cleanup() when the Python process
|
||||
that imported services exits. This is True by default.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
"""
|
||||
command = ["python",
|
||||
worker_path,
|
||||
@@ -490,10 +544,7 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name,
|
||||
"--object-store-manager-name=" + object_store_manager_name,
|
||||
"--local-scheduler-name=" + local_scheduler_name,
|
||||
"--redis-address=" + str(redis_address)]
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
stderr = FNULL if redirect_output else None
|
||||
p = subprocess.Popen(command, stdout=stdout, stderr=stderr)
|
||||
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
|
||||
if cleanup:
|
||||
all_processes[PROCESS_TYPE_WORKER].append(p)
|
||||
|
||||
@@ -528,8 +579,8 @@ def start_ray_processes(address_info=None,
|
||||
cleanup (bool): If cleanup is true, then the processes started here will be
|
||||
killed by services.cleanup() when the Python process that called this
|
||||
method exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to a
|
||||
file.
|
||||
include_global_scheduler (bool): If include_global_scheduler is True, then
|
||||
start a global scheduler process.
|
||||
include_redis (bool): If include_redis is True, then start a Redis server
|
||||
@@ -568,9 +619,12 @@ def start_ray_processes(address_info=None,
|
||||
# should address the warnings.
|
||||
redis_address = address_info.get("redis_address")
|
||||
if include_redis:
|
||||
redis_stdout_file, redis_stderr_file = new_log_files("redis", redirect_output)
|
||||
if redis_address is None:
|
||||
# Start a Redis server. The start_redis method will choose a random port.
|
||||
redis_port = start_redis(cleanup=cleanup, redirect_output=redirect_output)
|
||||
redis_port = start_redis(stdout_file=redis_stdout_file,
|
||||
stderr_file=redis_stderr_file,
|
||||
cleanup=cleanup)
|
||||
redis_address = address(node_ip_address, redis_port)
|
||||
address_info["redis_address"] = redis_address
|
||||
time.sleep(0.1)
|
||||
@@ -582,8 +636,9 @@ def start_ray_processes(address_info=None,
|
||||
redis_port = get_port(redis_address)
|
||||
new_redis_port = start_redis(port=int(redis_port),
|
||||
num_retries=1,
|
||||
cleanup=cleanup,
|
||||
redirect_output=redirect_output)
|
||||
stdout_file=redis_stdout_file,
|
||||
stderr_file=redis_stderr_file,
|
||||
cleanup=cleanup)
|
||||
assert redis_port == new_redis_port
|
||||
else:
|
||||
if redis_address is None:
|
||||
@@ -591,8 +646,11 @@ def start_ray_processes(address_info=None,
|
||||
|
||||
# Start the global scheduler, if necessary.
|
||||
if include_global_scheduler:
|
||||
start_global_scheduler(redis_address, cleanup=cleanup,
|
||||
redirect_output=redirect_output)
|
||||
global_scheduler_stdout_file, global_scheduler_stderr_file = new_log_files("global_scheduler", redirect_output)
|
||||
start_global_scheduler(redis_address,
|
||||
stdout_file=global_scheduler_stdout_file,
|
||||
stderr_file=global_scheduler_stderr_file,
|
||||
cleanup=cleanup)
|
||||
|
||||
# Initialize with existing services.
|
||||
if "object_store_addresses" not in address_info:
|
||||
@@ -611,10 +669,15 @@ def start_ray_processes(address_info=None,
|
||||
# Start any object stores that do not yet exist.
|
||||
for i in range(num_local_schedulers - len(object_store_addresses)):
|
||||
# Start Plasma.
|
||||
plasma_store_stdout_file, plasma_store_stderr_file = new_log_files("plasma_store_{}".format(i), redirect_output)
|
||||
plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files("plasma_manager_{}".format(i), redirect_output)
|
||||
object_store_address = start_objstore(node_ip_address, redis_address,
|
||||
object_manager_port=object_manager_ports[i],
|
||||
cleanup=cleanup,
|
||||
redirect_output=redirect_output)
|
||||
store_stdout_file=plasma_store_stdout_file,
|
||||
store_stderr_file=plasma_store_stderr_file,
|
||||
manager_stdout_file=plasma_manager_stdout_file,
|
||||
manager_stderr_file=plasma_manager_stderr_file,
|
||||
cleanup=cleanup)
|
||||
object_store_addresses.append(object_store_address)
|
||||
time.sleep(0.1)
|
||||
|
||||
@@ -638,14 +701,16 @@ def start_ray_processes(address_info=None,
|
||||
# not start any workers.
|
||||
num_local_scheduler_workers = 0
|
||||
# Start the local scheduler.
|
||||
local_scheduler_stdout_file, local_scheduler_stderr_file = new_log_files("local_scheduler_{}".format(i), redirect_output)
|
||||
local_scheduler_name = start_local_scheduler(redis_address,
|
||||
node_ip_address,
|
||||
object_store_address.name,
|
||||
object_store_address.manager_name,
|
||||
worker_path,
|
||||
plasma_address=plasma_address,
|
||||
stdout_file=local_scheduler_stdout_file,
|
||||
stderr_file=local_scheduler_stderr_file,
|
||||
cleanup=cleanup,
|
||||
redirect_output=redirect_output,
|
||||
num_cpus=num_cpus[i],
|
||||
num_gpus=num_gpus[i],
|
||||
num_workers=num_local_scheduler_workers)
|
||||
@@ -662,14 +727,16 @@ def start_ray_processes(address_info=None,
|
||||
object_store_address = object_store_addresses[i]
|
||||
local_scheduler_name = local_scheduler_socket_names[i]
|
||||
for j in range(num_local_scheduler_workers):
|
||||
worker_stdout_file, worker_stderr_file = new_log_files("worker_{}_{}".format(i, j), redirect_output)
|
||||
start_worker(node_ip_address,
|
||||
object_store_address.name,
|
||||
object_store_address.manager_name,
|
||||
local_scheduler_name,
|
||||
redis_address,
|
||||
worker_path,
|
||||
cleanup=cleanup,
|
||||
redirect_output=redirect_output)
|
||||
stdout_file=worker_stdout_file,
|
||||
stderr_file=worker_stderr_file,
|
||||
cleanup=cleanup)
|
||||
num_workers_per_local_scheduler[i] -= 1
|
||||
|
||||
# Make sure that we've started all the workers.
|
||||
@@ -677,8 +744,16 @@ def start_ray_processes(address_info=None,
|
||||
|
||||
# Try to start the web UI.
|
||||
if include_webui:
|
||||
successfully_started = start_webui(redis_address, cleanup=cleanup,
|
||||
redirect_output=True)
|
||||
backend_stdout_file, backend_stderr_file = new_log_files("webui_backend",
|
||||
redirect_output=True)
|
||||
polymer_stdout_file, polymer_stderr_file = new_log_files("webui_polymer",
|
||||
redirect_output=True)
|
||||
successfully_started = start_webui(redis_address,
|
||||
backend_stdout_file=backend_stdout_file,
|
||||
backend_stderr_file=backend_stderr_file,
|
||||
polymer_stdout_file=polymer_stdout_file,
|
||||
polymer_stderr_file=polymer_stderr_file,
|
||||
cleanup=cleanup)
|
||||
|
||||
if successfully_started:
|
||||
print("View the web UI at http://localhost:8080.")
|
||||
@@ -715,8 +790,8 @@ def start_ray_node(node_ip_address,
|
||||
cleanup (bool): If cleanup is true, then the processes started here will be
|
||||
killed by services.cleanup() when the Python process that called this
|
||||
method exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to a
|
||||
file.
|
||||
|
||||
Returns:
|
||||
A dictionary of the address information for the processes that were
|
||||
@@ -764,8 +839,8 @@ def start_ray_head(address_info=None,
|
||||
cleanup (bool): If cleanup is true, then the processes started here will be
|
||||
killed by services.cleanup() when the Python process that called this
|
||||
method exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to a
|
||||
file.
|
||||
start_workers_from_local_scheduler (bool): If this flag is True, then start
|
||||
the initial workers from the local scheduler. Else, start them from
|
||||
Python.
|
||||
@@ -789,3 +864,29 @@ def start_ray_head(address_info=None,
|
||||
start_workers_from_local_scheduler=start_workers_from_local_scheduler,
|
||||
num_cpus=num_cpus,
|
||||
num_gpus=num_gpus)
|
||||
|
||||
def new_log_files(name, redirect_output):
|
||||
"""Generate partially randomized filenames for log files.
|
||||
|
||||
Args:
|
||||
name (str): descriptive string for this log file.
|
||||
redirect_output (bool): True if files should be generated for logging stdout
|
||||
and stderr and false if stdout and stderr should not be redirected.
|
||||
|
||||
Returns:
|
||||
If redirect_output is true, this will return a tuple of two filehandles. The
|
||||
first is for redirecting stdout and the second is for redirecting stderr.
|
||||
If redirect_output is false, this will return a tuple of two None objects.
|
||||
"""
|
||||
if not redirect_output:
|
||||
return None, None
|
||||
|
||||
logs_dir = "/tmp/raylogs"
|
||||
if not os.path.exists(logs_dir):
|
||||
os.makedirs(logs_dir)
|
||||
log_id = random.randint(0, 100000)
|
||||
log_stdout = "{}/{}-{:06d}.out".format(logs_dir, name, log_id)
|
||||
log_stderr = "{}/{}-{:06d}.err".format(logs_dir, name, log_id)
|
||||
log_stdout_file = open(log_stdout, "a")
|
||||
log_stderr_file = open(log_stderr, "a")
|
||||
return log_stdout_file, log_stderr_file
|
||||
|
||||
+14
-4
@@ -757,6 +757,7 @@ def _init(address_info=None,
|
||||
num_workers=None,
|
||||
num_local_schedulers=None,
|
||||
driver_mode=SCRIPT_MODE,
|
||||
redirect_output=False,
|
||||
start_workers_from_local_scheduler=True,
|
||||
num_cpus=None,
|
||||
num_gpus=None):
|
||||
@@ -786,6 +787,8 @@ def _init(address_info=None,
|
||||
only provided if start_ray_local is True.
|
||||
driver_mode (bool): The mode in which to start the driver. This should be
|
||||
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
|
||||
redirect_output (bool): True if stdout and stderr for all the processes
|
||||
should be redirected to files and false otherwise.
|
||||
start_workers_from_local_scheduler (bool): If this flag is True, then start
|
||||
the initial workers from the local scheduler. Else, start them from
|
||||
Python. The latter case is for debugging purposes only.
|
||||
@@ -840,6 +843,7 @@ def _init(address_info=None,
|
||||
node_ip_address=node_ip_address,
|
||||
num_workers=num_workers,
|
||||
num_local_schedulers=num_local_schedulers,
|
||||
redirect_output=redirect_output,
|
||||
start_workers_from_local_scheduler=start_workers_from_local_scheduler,
|
||||
num_cpus=num_cpus,
|
||||
num_gpus=num_gpus)
|
||||
@@ -876,7 +880,8 @@ def _init(address_info=None,
|
||||
return address_info
|
||||
|
||||
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
|
||||
num_workers=None, driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None):
|
||||
num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False,
|
||||
num_cpus=None, num_gpus=None):
|
||||
"""Either connect to an existing Ray cluster or start one and connect to it.
|
||||
|
||||
This method handles two cases. Either a Ray cluster already exists and we
|
||||
@@ -897,8 +902,12 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
|
||||
redis_address is not provided.
|
||||
driver_mode (bool): The mode in which to start the driver. This should be
|
||||
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
|
||||
num_cpus (int): Number of cpus the user wishes all local schedulers to be configured with.
|
||||
num_gpus (int): Number of gpus the user wishes all local schedulers to be configured with.
|
||||
redirect_output (bool): True if stdout and stderr for all the processes
|
||||
should be redirected to files and false otherwise.
|
||||
num_cpus (int): Number of cpus the user wishes all local schedulers to be
|
||||
configured with.
|
||||
num_gpus (int): Number of gpus the user wishes all local schedulers to be
|
||||
configured with.
|
||||
|
||||
Returns:
|
||||
Address information about the started processes.
|
||||
@@ -913,7 +922,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
|
||||
}
|
||||
return _init(address_info=info, start_ray_local=(redis_address is None),
|
||||
num_workers=num_workers, driver_mode=driver_mode,
|
||||
num_cpus=num_cpus, num_gpus=num_gpus)
|
||||
redirect_output=redirect_output, num_cpus=num_cpus,
|
||||
num_gpus=num_gpus)
|
||||
|
||||
def cleanup(worker=global_worker):
|
||||
"""Disconnect the driver, and terminate any processes started in init.
|
||||
|
||||
Reference in New Issue
Block a user