diff --git a/doc/source/index.rst b/doc/source/index.rst index d2c6e855b..8635d308d 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -43,6 +43,7 @@ Ray internals-overview.rst serialization.rst fault-tolerance.rst + plasma-object-store.rst .. toctree:: :maxdepth: 1 diff --git a/doc/source/plasma-object-store.rst b/doc/source/plasma-object-store.rst new file mode 100644 index 000000000..5250452d4 --- /dev/null +++ b/doc/source/plasma-object-store.rst @@ -0,0 +1,41 @@ +The Plasma Object Store +======================= + +Plasma is a high-performance shared memory object store originally developed in +Ray and now being developed in `Apache Arrow`_. See the `relevant +documentation`_. + +Using Plasma with Huge Pages +---------------------------- + +On Linux, it is possible to increase the write throughput of the Plasma object +store by using huge pages. You first need to create a file system and activate +huge pages as follows. + +.. code-block:: shell + + sudo mkdir -p /mnt/hugepages + gid=`id -g` + uid=`id -u` + sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages + sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group" + sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages" + +You need root access to create the file system, but not for running the object +store. + +You can then start Ray with huge pages on a single machine as follows. + +.. code-block:: python + + ray.init(huge_pages=True, plasma_directory="/mnt/hugepages") + +In the cluster case, you can do it by passing ``--huge-pages`` and +``--plasma-directory=/mnt/hugepages`` into ``ray start`` on any machines where +huge pages should be enabled. + +See the relevant `Arrow documentation for huge pages`_. + +.. _`Apache Arrow`: https://arrow.apache.org/ +.. _`relevant documentation`: https://arrow.apache.org/docs/python/plasma.html#the-plasma-in-memory-object-store +.. _`Arrow documentation for huge pages`: https://arrow.apache.org/docs/python/plasma.html#using-plasma-with-huge-pages diff --git a/python/ray/plasma/plasma.py b/python/ray/plasma/plasma.py index 64394bde5..1d6569b45 100644 --- a/python/ray/plasma/plasma.py +++ b/python/ray/plasma/plasma.py @@ -5,6 +5,7 @@ from __future__ import print_function import os import random import subprocess +import sys import time @@ -22,25 +23,40 @@ def random_name(): def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=False, use_profiler=False, - stdout_file=None, stderr_file=None): + stdout_file=None, stderr_file=None, + plasma_directory=None, huge_pages=False): """Start a plasma store process. Args: - use_valgrind (bool): True if the plasma store should be started inside of - valgrind. If this is True, use_profiler must be False. - use_profiler (bool): True if the plasma store should be started inside a - profiler. If this is True, use_valgrind must be False. - stdout_file: A file handle opened for writing to redirect stdout to. If - no redirection should happen, then this should be None. - stderr_file: A file handle opened for writing to redirect stderr to. If - no redirection should happen, then this should be None. + use_valgrind (bool): True if the plasma store should be started inside + of valgrind. If this is True, use_profiler must be False. + use_profiler (bool): True if the plasma store should be started inside + a profiler. If this is True, use_valgrind must be False. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: a boolean flag indicating whether to start the + Object Store with hugetlbfs support. Requires plasma_directory. Return: - A tuple of the name of the plasma store socket and the process ID of the - plasma store process. + A tuple of the name of the plasma store socket and the process ID of + the plasma store process. """ if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") + + if huge_pages and not (sys.platform == "linux" or + sys.platform == "linux2"): + raise Exception("The huge_pages argument is only supported on " + "Linux.") + + if huge_pages and plasma_directory is None: + raise Exception("If huge_pages is True, then the " + "plasma_directory argument must be provided.") + plasma_store_executable = os.path.join(os.path.abspath( os.path.dirname(__file__)), "../core/src/plasma/plasma_store") @@ -48,6 +64,10 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] + if plasma_directory is not None: + command += ["-d", plasma_directory] + if huge_pages: + command += ["-h"] if use_valgrind: pid = subprocess.Popen(["valgrind", "--track-origins=yes", @@ -79,24 +99,25 @@ def start_plasma_manager(store_name, redis_address, """Start a plasma manager and return the ports it listens on. Args: - store_name (str): The name of the plasma store socket. - redis_address (str): The address of the Redis server. - node_ip_address (str): The IP address of the node. - plasma_manager_port (int): The port to use for the plasma manager. If - this is not provided, a port will be generated at random. - use_valgrind (bool): True if the Plasma manager should be started inside - of valgrind and False otherwise. - stdout_file: A file handle opened for writing to redirect stdout to. If - no redirection should happen, then this should be None. - stderr_file: A file handle opened for writing to redirect stderr to. If - no redirection should happen, then this should be None. + store_name (str): The name of the plasma store socket. + redis_address (str): The address of the Redis server. + node_ip_address (str): The IP address of the node. + plasma_manager_port (int): The port to use for the plasma manager. If + this is not provided, a port will be generated at random. + use_valgrind (bool): True if the Plasma manager should be started + inside of valgrind and False otherwise. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. - Returns: - A tuple of the Plasma manager socket name, the process ID of the Plasma - manager process, and the port that the manager is listening on. + Returns: + A tuple of the Plasma manager socket name, the process ID of the + Plasma manager process, and the port that the manager is + listening on. Raises: - Exception: An exception is raised if the manager could not be started. + Exception: An exception is raised if the manager could not be started. """ plasma_manager_executable = os.path.join( os.path.abspath(os.path.dirname(__file__)), diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 255ee12dc..d6093b612 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -67,9 +67,14 @@ def cli(): help="provide this argument if the UI should not be started") @click.option("--block", is_flag=True, default=False, help="provide this argument to block forever in this command") +@click.option("--plasma-directory", required=False, type=str, + help="object store directory for memory mapped files") +@click.option("--huge-pages", is_flag=True, default=False, + help="enable support for huge pages in the object store") def start(node_ip_address, redis_address, redis_port, num_redis_shards, object_manager_port, num_workers, num_cpus, num_gpus, - num_custom_resource, head, no_ui, block): + num_custom_resource, head, no_ui, block, plasma_directory, + huge_pages): # Note that we redirect stdout and stderr to /dev/null because otherwise # attempts to print may cause exceptions if a process is started inside of # an SSH connection and the SSH connection dies. TODO(rkn): This is a @@ -106,7 +111,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, num_gpus=num_gpus, num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards, - include_webui=(not no_ui)) + include_webui=(not no_ui), + plasma_directory=plasma_directory, + huge_pages=huge_pages) print(address_info) print("\nStarted Ray on this node. You can add additional nodes to " "the cluster by calling\n\n" @@ -155,7 +162,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redirect_output=True, num_cpus=num_cpus, num_gpus=num_gpus, - num_custom_resource=num_custom_resource) + num_custom_resource=num_custom_resource, + plasma_directory=plasma_directory, + huge_pages=huge_pages) print(address_info) print("\nStarted Ray on this node. If you wish to terminate the " "processes that have been started, run\n\n" diff --git a/python/ray/services.py b/python/ray/services.py index acaa6d62b..0bfe9e53c 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -598,7 +598,8 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None, store_stdout_file=None, store_stderr_file=None, manager_stdout_file=None, manager_stderr_file=None, objstore_memory=None, - cleanup=True): + cleanup=True, plasma_directory=None, + huge_pages=False): """This method starts an object store process. Args: @@ -622,6 +623,10 @@ def start_objstore(node_ip_address, redis_address, cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: Boolean flag indicating whether to start the Object + Store with hugetlbfs support. Requires plasma_directory. Return: A tuple of the Plasma store socket name, the Plasma manager socket @@ -661,7 +666,9 @@ def start_objstore(node_ip_address, redis_address, plasma_store_memory=objstore_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, stdout_file=store_stdout_file, - stderr_file=store_stderr_file) + stderr_file=store_stderr_file, + plasma_directory=plasma_directory, + huge_pages=huge_pages) # Start the plasma manager. if object_manager_port is not None: (plasma_manager_name, p2, @@ -777,7 +784,9 @@ def start_ray_processes(address_info=None, start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None, - num_custom_resource=None): + num_custom_resource=None, + plasma_directory=None, + huge_pages=False): """Helper method to start Ray processes. Args: @@ -824,6 +833,10 @@ def start_ray_processes(address_info=None, num_custom_resource: A list of length num_local_schedulers containing the quantity of a user-defined custom resource that each local scheduler should be configured with. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: Boolean flag indicating whether to start the Object + Store with hugetlbfs support. Requires plasma_directory. Returns: A dictionary of the address information for the processes that were @@ -939,7 +952,8 @@ def start_ray_processes(address_info=None, manager_stdout_file=plasma_manager_stdout_file, manager_stderr_file=plasma_manager_stderr_file, objstore_memory=object_store_memory, - cleanup=cleanup) + cleanup=cleanup, plasma_directory=plasma_directory, + huge_pages=huge_pages) object_store_addresses.append(object_store_address) time.sleep(0.1) @@ -1028,7 +1042,9 @@ def start_ray_node(node_ip_address, redirect_output=False, num_cpus=None, num_gpus=None, - num_custom_resource=None): + num_custom_resource=None, + plasma_directory=None, + huge_pages=False): """Start the Ray processes for a single node. This assumes that the Ray processes on some master node have already been @@ -1051,6 +1067,10 @@ def start_ray_node(node_ip_address, called this method exits. redirect_output (bool): True if stdout and stderr should be redirected to a file. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: Boolean flag indicating whether to start the Object + Store with hugetlbfs support. Requires plasma_directory. Returns: A dictionary of the address information for the processes that were @@ -1068,7 +1088,9 @@ def start_ray_node(node_ip_address, redirect_output=redirect_output, num_cpus=num_cpus, num_gpus=num_gpus, - num_custom_resource=num_custom_resource) + num_custom_resource=num_custom_resource, + plasma_directory=plasma_directory, + huge_pages=huge_pages) def start_ray_head(address_info=None, @@ -1085,7 +1107,9 @@ def start_ray_head(address_info=None, num_gpus=None, num_custom_resource=None, num_redis_shards=None, - include_webui=True): + include_webui=True, + plasma_directory=None, + huge_pages=False): """Start Ray in local mode. Args: @@ -1121,6 +1145,10 @@ def start_ray_head(address_info=None, num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. include_webui: True if the UI should be started and false otherwise. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: Boolean flag indicating whether to start the Object + Store with hugetlbfs support. Requires plasma_directory. Returns: A dictionary of the address information for the processes that were @@ -1144,7 +1172,9 @@ def start_ray_head(address_info=None, num_cpus=num_cpus, num_gpus=num_gpus, num_custom_resource=num_custom_resource, - num_redis_shards=num_redis_shards) + num_redis_shards=num_redis_shards, + plasma_directory=plasma_directory, + huge_pages=huge_pages) def try_to_create_directory(directory_path): diff --git a/python/ray/worker.py b/python/ray/worker.py index a24f1b9af..61b8f1709 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1137,7 +1137,9 @@ def _init(address_info=None, num_cpus=None, num_gpus=None, num_custom_resource=None, - num_redis_shards=None): + num_redis_shards=None, + plasma_directory=None, + huge_pages=False): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -1181,6 +1183,10 @@ def _init(address_info=None, with. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: Boolean flag indicating whether to start the Object + Store with hugetlbfs support. Requires plasma_directory. Returns: Address information about the started processes. @@ -1239,7 +1245,9 @@ def _init(address_info=None, num_cpus=num_cpus, num_gpus=num_gpus, num_custom_resource=num_custom_resource, - num_redis_shards=num_redis_shards) + num_redis_shards=num_redis_shards, + plasma_directory=plasma_directory, + huge_pages=huge_pages) else: if redis_address is None: raise Exception("When connecting to an existing cluster, " @@ -1261,6 +1269,12 @@ def _init(address_info=None, if object_store_memory is not None: raise Exception("When connecting to an existing cluster, " "object_store_memory must not be provided.") + if plasma_directory is not None: + raise Exception("When connecting to an existing cluster, " + "plasma_directory must not be provided.") + if huge_pages: + raise Exception("When connecting to an existing cluster, " + "huge_pages must not be provided.") # Get the node IP address if one is not provided. if node_ip_address is None: node_ip_address = services.get_node_ip_address(redis_address) @@ -1293,7 +1307,8 @@ def _init(address_info=None, def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False, num_cpus=None, num_gpus=None, num_custom_resource=None, - num_redis_shards=None): + num_redis_shards=None, + plasma_directory=None, huge_pages=False): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -1326,6 +1341,10 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, flag is experimental and is subject to changes in the future. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: Boolean flag indicating whether to start the Object + Store with hugetlbfs support. Requires plasma_directory. Returns: Address information about the started processes. @@ -1340,7 +1359,9 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=num_workers, driver_mode=driver_mode, redirect_output=redirect_output, num_cpus=num_cpus, num_gpus=num_gpus, num_custom_resource=num_custom_resource, - num_redis_shards=num_redis_shards) + num_redis_shards=num_redis_shards, + plasma_directory=plasma_directory, + huge_pages=huge_pages) def cleanup(worker=global_worker):