hugepage + plasma directory support plumbing + documentation (#1030)

* hugepage + plasma directory support plumbing + documentation

* Indentation fix.

* huge_pages_enabled --> huge_pages

* One more change
This commit is contained in:
Alexey Tumanov
2017-09-30 09:56:52 -07:00
committed by Robert Nishihara
parent ce278aa06a
commit 2d0f439b7b
6 changed files with 164 additions and 41 deletions
+47 -26
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
import os
import random
import subprocess
import sys
import time
@@ -22,25 +23,40 @@ def random_name():
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=False, use_profiler=False,
stdout_file=None, stderr_file=None):
stdout_file=None, stderr_file=None,
plasma_directory=None, huge_pages=False):
"""Start a plasma store process.
Args:
use_valgrind (bool): True if the plasma store should be started inside of
valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside a
profiler. If this is True, use_valgrind must be False.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
use_valgrind (bool): True if the plasma store should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside
a profiler. If this is True, use_valgrind must be False.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: a boolean flag indicating whether to start the
Object Store with hugetlbfs support. Requires plasma_directory.
Return:
A tuple of the name of the plasma store socket and the process ID of the
plasma store process.
A tuple of the name of the plasma store socket and the process ID of
the plasma store process.
"""
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
if huge_pages and not (sys.platform == "linux" or
sys.platform == "linux2"):
raise Exception("The huge_pages argument is only supported on "
"Linux.")
if huge_pages and plasma_directory is None:
raise Exception("If huge_pages is True, then the "
"plasma_directory argument must be provided.")
plasma_store_executable = os.path.join(os.path.abspath(
os.path.dirname(__file__)),
"../core/src/plasma/plasma_store")
@@ -48,6 +64,10 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
command = [plasma_store_executable,
"-s", plasma_store_name,
"-m", str(plasma_store_memory)]
if plasma_directory is not None:
command += ["-d", plasma_directory]
if huge_pages:
command += ["-h"]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
@@ -79,24 +99,25 @@ def start_plasma_manager(store_name, redis_address,
"""Start a plasma manager and return the ports it listens on.
Args:
store_name (str): The name of the plasma store socket.
redis_address (str): The address of the Redis server.
node_ip_address (str): The IP address of the node.
plasma_manager_port (int): The port to use for the plasma manager. If
this is not provided, a port will be generated at random.
use_valgrind (bool): True if the Plasma manager should be started inside
of valgrind and False otherwise.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
store_name (str): The name of the plasma store socket.
redis_address (str): The address of the Redis server.
node_ip_address (str): The IP address of the node.
plasma_manager_port (int): The port to use for the plasma manager. If
this is not provided, a port will be generated at random.
use_valgrind (bool): True if the Plasma manager should be started
inside of valgrind and False otherwise.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
Returns:
A tuple of the Plasma manager socket name, the process ID of the Plasma
manager process, and the port that the manager is listening on.
Returns:
A tuple of the Plasma manager socket name, the process ID of the
Plasma manager process, and the port that the manager is
listening on.
Raises:
Exception: An exception is raised if the manager could not be started.
Exception: An exception is raised if the manager could not be started.
"""
plasma_manager_executable = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
+12 -3
View File
@@ -67,9 +67,14 @@ def cli():
help="provide this argument if the UI should not be started")
@click.option("--block", is_flag=True, default=False,
help="provide this argument to block forever in this command")
@click.option("--plasma-directory", required=False, type=str,
help="object store directory for memory mapped files")
@click.option("--huge-pages", is_flag=True, default=False,
help="enable support for huge pages in the object store")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
object_manager_port, num_workers, num_cpus, num_gpus,
num_custom_resource, head, no_ui, block):
num_custom_resource, head, no_ui, block, plasma_directory,
huge_pages):
# Note that we redirect stdout and stderr to /dev/null because otherwise
# attempts to print may cause exceptions if a process is started inside of
# an SSH connection and the SSH connection dies. TODO(rkn): This is a
@@ -106,7 +111,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards,
include_webui=(not no_ui))
include_webui=(not no_ui),
plasma_directory=plasma_directory,
huge_pages=huge_pages)
print(address_info)
print("\nStarted Ray on this node. You can add additional nodes to "
"the cluster by calling\n\n"
@@ -155,7 +162,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redirect_output=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource)
num_custom_resource=num_custom_resource,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
print(address_info)
print("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
+38 -8
View File
@@ -598,7 +598,8 @@ def start_objstore(node_ip_address, redis_address,
object_manager_port=None, store_stdout_file=None,
store_stderr_file=None, manager_stdout_file=None,
manager_stderr_file=None, objstore_memory=None,
cleanup=True):
cleanup=True, plasma_directory=None,
huge_pages=False):
"""This method starts an object store process.
Args:
@@ -622,6 +623,10 @@ def start_objstore(node_ip_address, redis_address,
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
Return:
A tuple of the Plasma store socket name, the Plasma manager socket
@@ -661,7 +666,9 @@ def start_objstore(node_ip_address, redis_address,
plasma_store_memory=objstore_memory,
use_profiler=RUN_PLASMA_STORE_PROFILER,
stdout_file=store_stdout_file,
stderr_file=store_stderr_file)
stderr_file=store_stderr_file,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
# Start the plasma manager.
if object_manager_port is not None:
(plasma_manager_name, p2,
@@ -777,7 +784,9 @@ def start_ray_processes(address_info=None,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None):
num_custom_resource=None,
plasma_directory=None,
huge_pages=False):
"""Helper method to start Ray processes.
Args:
@@ -824,6 +833,10 @@ def start_ray_processes(address_info=None,
num_custom_resource: A list of length num_local_schedulers containing
the quantity of a user-defined custom resource that each local
scheduler should be configured with.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
Returns:
A dictionary of the address information for the processes that were
@@ -939,7 +952,8 @@ def start_ray_processes(address_info=None,
manager_stdout_file=plasma_manager_stdout_file,
manager_stderr_file=plasma_manager_stderr_file,
objstore_memory=object_store_memory,
cleanup=cleanup)
cleanup=cleanup, plasma_directory=plasma_directory,
huge_pages=huge_pages)
object_store_addresses.append(object_store_address)
time.sleep(0.1)
@@ -1028,7 +1042,9 @@ def start_ray_node(node_ip_address,
redirect_output=False,
num_cpus=None,
num_gpus=None,
num_custom_resource=None):
num_custom_resource=None,
plasma_directory=None,
huge_pages=False):
"""Start the Ray processes for a single node.
This assumes that the Ray processes on some master node have already been
@@ -1051,6 +1067,10 @@ def start_ray_node(node_ip_address,
called this method exits.
redirect_output (bool): True if stdout and stderr should be redirected
to a file.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
Returns:
A dictionary of the address information for the processes that were
@@ -1068,7 +1088,9 @@ def start_ray_node(node_ip_address,
redirect_output=redirect_output,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource)
num_custom_resource=num_custom_resource,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
def start_ray_head(address_info=None,
@@ -1085,7 +1107,9 @@ def start_ray_head(address_info=None,
num_gpus=None,
num_custom_resource=None,
num_redis_shards=None,
include_webui=True):
include_webui=True,
plasma_directory=None,
huge_pages=False):
"""Start Ray in local mode.
Args:
@@ -1121,6 +1145,10 @@ def start_ray_head(address_info=None,
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
include_webui: True if the UI should be started and false otherwise.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
Returns:
A dictionary of the address information for the processes that were
@@ -1144,7 +1172,9 @@ def start_ray_head(address_info=None,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards)
num_redis_shards=num_redis_shards,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
def try_to_create_directory(directory_path):
+25 -4
View File
@@ -1137,7 +1137,9 @@ def _init(address_info=None,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
num_redis_shards=None):
num_redis_shards=None,
plasma_directory=None,
huge_pages=False):
"""Helper method to connect to an existing Ray cluster or start a new one.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1181,6 +1183,10 @@ def _init(address_info=None,
with.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
Returns:
Address information about the started processes.
@@ -1239,7 +1245,9 @@ def _init(address_info=None,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards)
num_redis_shards=num_redis_shards,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
else:
if redis_address is None:
raise Exception("When connecting to an existing cluster, "
@@ -1261,6 +1269,12 @@ def _init(address_info=None,
if object_store_memory is not None:
raise Exception("When connecting to an existing cluster, "
"object_store_memory must not be provided.")
if plasma_directory is not None:
raise Exception("When connecting to an existing cluster, "
"plasma_directory must not be provided.")
if huge_pages:
raise Exception("When connecting to an existing cluster, "
"huge_pages must not be provided.")
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address(redis_address)
@@ -1293,7 +1307,8 @@ def _init(address_info=None,
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False,
num_cpus=None, num_gpus=None, num_custom_resource=None,
num_redis_shards=None):
num_redis_shards=None,
plasma_directory=None, huge_pages=False):
"""Connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1326,6 +1341,10 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
flag is experimental and is subject to changes in the future.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
Returns:
Address information about the started processes.
@@ -1340,7 +1359,9 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=num_workers, driver_mode=driver_mode,
redirect_output=redirect_output, num_cpus=num_cpus,
num_gpus=num_gpus, num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards)
num_redis_shards=num_redis_shards,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
def cleanup(worker=global_worker):