From 9868af4c7cd87ba46af3e30b3a25ecc85fbbef63 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 28 Oct 2018 20:09:06 -0700 Subject: [PATCH] Use /tmp instead of /dev/shm for object store on Linux if /dev/shm is too small. (#3149) * Use /tmp instead of /dev/shm for object store on Linux if /dev/shm is too small. * Add logging statement and address comments. * Fix --- python/ray/services.py | 117 ++++++++++++++++++++++++++++------------- python/ray/utils.py | 22 ++++++++ python/ray/worker.py | 6 +-- test/stress_tests.py | 4 +- 4 files changed, 107 insertions(+), 42 deletions(-) diff --git a/python/ray/services.py b/python/ray/services.py index 651cbb053..5a94f1479 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -977,12 +977,80 @@ def start_raylet(redis_address, return raylet_name +def determine_plasma_store_config(object_store_memory=None, + plasma_directory=None, + huge_pages=False): + """Figure out how to configure the plasma object store. + + This will determine which directory to use for the plasma store (e.g., + /tmp or /dev/shm) and how much memory to start the store with. On Linux, + we will try to use /dev/shm unless the shared memory file system is too + small, in which case we will fall back to /tmp. If any of the object store + memory or plasma directory parameters are specified by the user, then those + values will be preserved. + + Args: + object_store_memory (int): The user-specified object store memory + parameter. + plasma_directory (str): The user-specified plasma directory parameter. + huge_pages (bool): The user-specified huge pages parameter. + + Returns: + A tuple of the object store memory to use and the plasma directory to + use. If either of these values is specified by the user, then that + value will be preserved. + """ + system_memory = ray.utils.get_system_memory() + + # Choose a default object store size. + if object_store_memory is None: + object_store_memory = int(system_memory * 0.4) + + if plasma_directory is not None: + plasma_directory = os.path.abspath(plasma_directory) + + # Determine which directory to use. By default, use /tmp on MacOS and + # /dev/shm on Linux, unless the shared-memory file system is too small, + # in which case we default to /tmp on Linux. + if plasma_directory is None: + if sys.platform == "linux" or sys.platform == "linux2": + shm_avail = ray.utils.get_shared_memory_bytes() + # Compare the requested memory size to the memory available in + # /dev/shm. + if shm_avail > object_store_memory: + plasma_directory = "/dev/shm" + else: + plasma_directory = "/tmp" + logger.warning( + "WARNING: The object store is using /tmp instead of " + "/dev/shm because /dev/shm has only {} bytes available. " + "This may slow down performance! You may be able to free " + "up space by deleting files in /dev/shm or terminating " + "any running plasma_store_server processes. If you are " + "inside a Docker container, you may need to pass an " + "argument with the flag '--shm-size' to 'docker run'." + .format(shm_avail)) + else: + plasma_directory = "/tmp" + + # Do some sanity checks. + if object_store_memory > system_memory: + raise Exception("The requested object store memory size is greater " + "than the total available memory.") + + if not os.path.isdir(plasma_directory): + raise Exception("The file {} does not exist or is not a directory." + .format(plasma_directory)) + + return object_store_memory, plasma_directory + + def start_plasma_store(node_ip_address, redis_address, object_manager_port=None, store_stdout_file=None, store_stderr_file=None, - objstore_memory=None, + object_store_memory=None, cleanup=True, plasma_directory=None, huge_pages=False, @@ -1000,8 +1068,8 @@ def start_plasma_store(node_ip_address, to. If no redirection should happen, then this should be None. store_stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. - objstore_memory: The amount of memory (in bytes) to start the object - store with. + object_store_memory: The amount of memory (in bytes) to start the + object store with. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. @@ -1014,41 +1082,16 @@ def start_plasma_store(node_ip_address, Return: The Plasma store socket name. """ - if objstore_memory is None: - # Compute a fraction of the system memory for the Plasma store to use. - system_memory = ray.utils.get_system_memory() - if sys.platform == "linux" or sys.platform == "linux2": - # On linux we use /dev/shm, its size is half the size of the - # physical memory. To not overflow it, we set the plasma memory - # limit to 0.4 times the size of the physical memory. - objstore_memory = int(system_memory * 0.4) - # Compare the requested memory size to the memory available in - # /dev/shm. - shm_fd = os.open("/dev/shm", os.O_RDONLY) - try: - shm_fs_stats = os.fstatvfs(shm_fd) - # The value shm_fs_stats.f_bsize is the block size and the - # value shm_fs_stats.f_bavail is the number of available - # blocks. - shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail - if objstore_memory > shm_avail: - logger.warning( - "Warning: Reducing object store memory because " - "/dev/shm has only {} bytes available. You may be " - "able to free up space by deleting files in " - "/dev/shm. If you are inside a Docker container, " - "you may need to pass an argument with the flag " - "'--shm-size' to 'docker run'.".format(shm_avail)) - objstore_memory = int(shm_avail * 0.8) - finally: - os.close(shm_fd) - else: - objstore_memory = int(system_memory * 0.8) + object_store_memory, plasma_directory = determine_plasma_store_config( + object_store_memory, plasma_directory, huge_pages) + + # Print the object store memory using two decimal places. + object_store_memory_str = (object_store_memory / 10**7) / 10**2 + logger.info("Starting the Plasma object store with {} GB memory " + "using {}.".format(object_store_memory_str, plasma_directory)) # Start the Plasma store. - logger.info("Starting the Plasma object store with {0:.2f} GB memory." - .format(objstore_memory / 10**9)) plasma_store_name, p1 = ray.plasma.start_plasma_store( - plasma_store_memory=objstore_memory, + plasma_store_memory=object_store_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, stdout_file=store_stdout_file, stderr_file=store_stderr_file, @@ -1389,7 +1432,7 @@ def start_ray_processes(address_info=None, redis_address, store_stdout_file=plasma_store_stdout_file, store_stderr_file=plasma_store_stderr_file, - objstore_memory=object_store_memory, + object_store_memory=object_store_memory, cleanup=cleanup, plasma_directory=plasma_directory, huge_pages=huge_pages, diff --git a/python/ray/utils.py b/python/ray/utils.py index 83e2ae2f7..a568cb9a2 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -324,6 +324,28 @@ def get_system_memory(): return sysctl(["sysctl", "hw.memsize"]) +def get_shared_memory_bytes(): + """Get the size of the shared memory file system. + + Returns: + The size of the shared memory file system in bytes. + """ + # Make sure this is only called on Linux. + assert sys.platform == "linux" or sys.platform == "linux2" + + shm_fd = os.open("/dev/shm", os.O_RDONLY) + try: + shm_fs_stats = os.fstatvfs(shm_fd) + # The value shm_fs_stats.f_bsize is the block size and the + # value shm_fs_stats.f_bavail is the number of available + # blocks. + shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail + finally: + os.close(shm_fd) + + return shm_avail + + def check_oversized_pickle(pickled, name, obj_type, worker): """Send a warning message if the pickled object is too large. diff --git a/python/ray/worker.py b/python/ray/worker.py index 0b042dc11..e4376fa07 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -721,7 +721,7 @@ class Worker(object): arguments.append(argument) return arguments - def _store_outputs_in_objstore(self, object_ids, outputs): + def _store_outputs_in_object_store(self, object_ids, outputs): """Store the outputs of a remote function in the local object store. This stores the values that were returned by a remote function in the @@ -819,7 +819,7 @@ class Worker(object): num_returns = len(return_object_ids) if num_returns == 1: outputs = (outputs, ) - self._store_outputs_in_objstore(return_object_ids, outputs) + self._store_outputs_in_object_store(return_object_ids, outputs) except Exception as e: self._handle_process_task_failure( function_id, function_name, return_object_ids, e, @@ -831,7 +831,7 @@ class Worker(object): failure_objects = [ failure_object for _ in range(len(return_object_ids)) ] - self._store_outputs_in_objstore(return_object_ids, failure_objects) + self._store_outputs_in_object_store(return_object_ids, failure_objects) # Log the error message. ray.utils.push_error_to_driver( self, diff --git a/test/stress_tests.py b/test/stress_tests.py index 6dd79d015..8f6edf082 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -170,7 +170,7 @@ def ray_start_reconstruction(request): # Start the Plasma store instances with a total of 1GB memory. plasma_store_memory = 10**9 plasma_addresses = [] - objstore_memory = plasma_store_memory // num_local_schedulers + object_store_memory = plasma_store_memory // num_local_schedulers for i in range(num_local_schedulers): store_stdout_file, store_stderr_file = ( ray.tempfile_services.new_plasma_store_log_file(i, True)) @@ -178,7 +178,7 @@ def ray_start_reconstruction(request): ray.services.start_plasma_store( node_ip_address, redis_address, - objstore_memory=objstore_memory, + object_store_memory=object_store_memory, store_stdout_file=store_stdout_file, store_stderr_file=store_stderr_file))