diff --git a/python/ray/node.py b/python/ray/node.py index 7f76a5031..ef82a931a 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -566,11 +566,11 @@ class Node: stdout_file, stderr_file = self.new_log_files("plasma_store") process_info = ray.services.start_plasma_store( self.get_resource_spec(), + self._plasma_store_socket_name, stdout_file=stdout_file, stderr_file=stderr_file, plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, - plasma_store_socket_name=self._plasma_store_socket_name, fate_share=self.kernel_fate_share) assert ( ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes) diff --git a/python/ray/services.py b/python/ray/services.py index 05bf4988c..f82d1ab16 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1481,6 +1481,14 @@ def determine_plasma_store_config(object_store_memory, The plasma directory to use. If it is specified by the user, then that value will be preserved. """ + if not isinstance(object_store_memory, int): + object_store_memory = int(object_store_memory) + + if huge_pages and not (sys.platform == "linux" + or sys.platform == "linux2"): + raise ValueError("The huge_pages argument is only supported on " + "Linux.") + system_memory = ray.utils.get_system_memory() # Determine which directory to use. By default, use /tmp on MacOS and @@ -1522,90 +1530,39 @@ def determine_plasma_store_config(object_store_memory, "The file {} does not exist or is not a directory.".format( plasma_directory)) - return plasma_directory - - -def _start_plasma_store(plasma_store_memory, - use_valgrind=False, - use_profiler=False, - stdout_file=None, - stderr_file=None, - plasma_directory=None, - huge_pages=False, - socket_name=None, - fate_share=None): - """Start a plasma store process. - - Args: - plasma_store_memory (int): The amount of memory in bytes to start the - plasma store with. - use_valgrind (bool): True if the plasma store should be started inside - of valgrind. If this is True, use_profiler must be False. - use_profiler (bool): True if the plasma store should be started inside - a profiler. If this is True, use_valgrind must be False. - stdout_file: A file handle opened for writing to redirect stdout to. If - no redirection should happen, then this should be None. - stderr_file: A file handle opened for writing to redirect stderr to. If - no redirection should happen, then this should be None. - plasma_directory: A directory where the Plasma memory mapped files will - be created. - huge_pages: a boolean flag indicating whether to start the - Object Store with hugetlbfs support. Requires plasma_directory. - socket_name (str): If provided, it will specify the socket - name used by the plasma store. - - Return: - A tuple of the name of the plasma store socket and ProcessInfo for the - plasma store process. - """ - if use_valgrind and use_profiler: - raise ValueError("Cannot use valgrind and profiler at the same time.") - - if huge_pages and not (sys.platform == "linux" - or sys.platform == "linux2"): - raise ValueError("The huge_pages argument is only supported on " - "Linux.") - if huge_pages and plasma_directory is None: raise ValueError("If huge_pages is True, then the " "plasma_directory argument must be provided.") - if not isinstance(plasma_store_memory, int): - plasma_store_memory = int(plasma_store_memory) + if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES: + raise ValueError("Attempting to cap object store memory usage at {} " + "bytes, but the minimum allowed is {} bytes.".format( + object_store_memory, + ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES)) - command = [ - PLASMA_STORE_EXECUTABLE, - "-s", - socket_name, - "-m", - str(plasma_store_memory), - ] - if plasma_directory is not None: - command += ["-d", plasma_directory] - if huge_pages: - command += ["-h"] - process_info = start_ray_process( - command, - ray_constants.PROCESS_TYPE_PLASMA_STORE, - use_valgrind=use_valgrind, - use_valgrind_profiler=use_profiler, - stdout_file=stdout_file, - stderr_file=stderr_file, - fate_share=fate_share) - return process_info + # Print the object store memory using two decimal places. + logger.debug( + "Determine to start the Plasma object store with {} GB memory " + "using {}.".format( + round(object_store_memory / 10**9, 2), plasma_directory)) + + return plasma_directory, object_store_memory def start_plasma_store(resource_spec, + plasma_store_socket_name, stdout_file=None, stderr_file=None, plasma_directory=None, huge_pages=False, - plasma_store_socket_name=None, - fate_share=None): + fate_share=None, + use_valgrind=False): """This method starts an object store process. Args: resource_spec (ResourceSpec): Resources for the node. + plasma_store_socket_name (str): The path/name of the plasma + store socket. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr @@ -1618,33 +1575,33 @@ def start_plasma_store(resource_spec, Returns: ProcessInfo for the process that was started. """ - assert resource_spec.resolved() - object_store_memory = resource_spec.object_store_memory - plasma_directory = determine_plasma_store_config( - object_store_memory, plasma_directory, huge_pages) - - if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES: - raise ValueError("Attempting to cap object store memory usage at {} " - "bytes, but the minimum allowed is {} bytes.".format( - object_store_memory, - ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES)) - - # Print the object store memory using two decimal places. - object_store_memory_str = (object_store_memory / 10**7) / 10**2 - logger.debug("Starting the Plasma object store with {} GB memory " - "using {}.".format( - round(object_store_memory_str, 2), plasma_directory)) # Start the Plasma store. - process_info = _start_plasma_store( - object_store_memory, - use_profiler=RUN_PLASMA_STORE_PROFILER, + if use_valgrind and RUN_PLASMA_STORE_PROFILER: + raise ValueError("Cannot use valgrind and profiler at the same time.") + + assert resource_spec.resolved() + plasma_directory, object_store_memory = determine_plasma_store_config( + resource_spec.object_store_memory, plasma_directory, huge_pages) + + command = [ + PLASMA_STORE_EXECUTABLE, + "-s", + plasma_store_socket_name, + "-m", + str(object_store_memory), + ] + if plasma_directory is not None: + command += ["-d", plasma_directory] + if huge_pages: + command += ["-h"] + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_PLASMA_STORE, + use_valgrind=use_valgrind, + use_valgrind_profiler=RUN_PLASMA_STORE_PROFILER, stdout_file=stdout_file, stderr_file=stderr_file, - plasma_directory=plasma_directory, - huge_pages=huge_pages, - socket_name=plasma_store_socket_name, fate_share=fate_share) - return process_info