Use /tmp instead of /dev/shm for object store on Linux if /dev/shm is too small. (#3149)

* Use /tmp instead of /dev/shm for object store on Linux if /dev/shm is too small.

* Add logging statement and address comments.

* Fix
This commit is contained in:
Robert Nishihara
2018-10-28 20:09:06 -07:00
committed by Philipp Moritz
parent 08fc9e5bcd
commit 9868af4c7c
4 changed files with 107 additions and 42 deletions
+80 -37
View File
@@ -977,12 +977,80 @@ def start_raylet(redis_address,
return raylet_name
def determine_plasma_store_config(object_store_memory=None,
plasma_directory=None,
huge_pages=False):
"""Figure out how to configure the plasma object store.
This will determine which directory to use for the plasma store (e.g.,
/tmp or /dev/shm) and how much memory to start the store with. On Linux,
we will try to use /dev/shm unless the shared memory file system is too
small, in which case we will fall back to /tmp. If any of the object store
memory or plasma directory parameters are specified by the user, then those
values will be preserved.
Args:
object_store_memory (int): The user-specified object store memory
parameter.
plasma_directory (str): The user-specified plasma directory parameter.
huge_pages (bool): The user-specified huge pages parameter.
Returns:
A tuple of the object store memory to use and the plasma directory to
use. If either of these values is specified by the user, then that
value will be preserved.
"""
system_memory = ray.utils.get_system_memory()
# Choose a default object store size.
if object_store_memory is None:
object_store_memory = int(system_memory * 0.4)
if plasma_directory is not None:
plasma_directory = os.path.abspath(plasma_directory)
# Determine which directory to use. By default, use /tmp on MacOS and
# /dev/shm on Linux, unless the shared-memory file system is too small,
# in which case we default to /tmp on Linux.
if plasma_directory is None:
if sys.platform == "linux" or sys.platform == "linux2":
shm_avail = ray.utils.get_shared_memory_bytes()
# Compare the requested memory size to the memory available in
# /dev/shm.
if shm_avail > object_store_memory:
plasma_directory = "/dev/shm"
else:
plasma_directory = "/tmp"
logger.warning(
"WARNING: The object store is using /tmp instead of "
"/dev/shm because /dev/shm has only {} bytes available. "
"This may slow down performance! You may be able to free "
"up space by deleting files in /dev/shm or terminating "
"any running plasma_store_server processes. If you are "
"inside a Docker container, you may need to pass an "
"argument with the flag '--shm-size' to 'docker run'."
.format(shm_avail))
else:
plasma_directory = "/tmp"
# Do some sanity checks.
if object_store_memory > system_memory:
raise Exception("The requested object store memory size is greater "
"than the total available memory.")
if not os.path.isdir(plasma_directory):
raise Exception("The file {} does not exist or is not a directory."
.format(plasma_directory))
return object_store_memory, plasma_directory
def start_plasma_store(node_ip_address,
redis_address,
object_manager_port=None,
store_stdout_file=None,
store_stderr_file=None,
objstore_memory=None,
object_store_memory=None,
cleanup=True,
plasma_directory=None,
huge_pages=False,
@@ -1000,8 +1068,8 @@ def start_plasma_store(node_ip_address,
to. If no redirection should happen, then this should be None.
store_stderr_file: A file handle opened for writing to redirect stderr
to. If no redirection should happen, then this should be None.
objstore_memory: The amount of memory (in bytes) to start the object
store with.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
@@ -1014,41 +1082,16 @@ def start_plasma_store(node_ip_address,
Return:
The Plasma store socket name.
"""
if objstore_memory is None:
# Compute a fraction of the system memory for the Plasma store to use.
system_memory = ray.utils.get_system_memory()
if sys.platform == "linux" or sys.platform == "linux2":
# On linux we use /dev/shm, its size is half the size of the
# physical memory. To not overflow it, we set the plasma memory
# limit to 0.4 times the size of the physical memory.
objstore_memory = int(system_memory * 0.4)
# Compare the requested memory size to the memory available in
# /dev/shm.
shm_fd = os.open("/dev/shm", os.O_RDONLY)
try:
shm_fs_stats = os.fstatvfs(shm_fd)
# The value shm_fs_stats.f_bsize is the block size and the
# value shm_fs_stats.f_bavail is the number of available
# blocks.
shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail
if objstore_memory > shm_avail:
logger.warning(
"Warning: Reducing object store memory because "
"/dev/shm has only {} bytes available. You may be "
"able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, "
"you may need to pass an argument with the flag "
"'--shm-size' to 'docker run'.".format(shm_avail))
objstore_memory = int(shm_avail * 0.8)
finally:
os.close(shm_fd)
else:
objstore_memory = int(system_memory * 0.8)
object_store_memory, plasma_directory = determine_plasma_store_config(
object_store_memory, plasma_directory, huge_pages)
# Print the object store memory using two decimal places.
object_store_memory_str = (object_store_memory / 10**7) / 10**2
logger.info("Starting the Plasma object store with {} GB memory "
"using {}.".format(object_store_memory_str, plasma_directory))
# Start the Plasma store.
logger.info("Starting the Plasma object store with {0:.2f} GB memory."
.format(objstore_memory / 10**9))
plasma_store_name, p1 = ray.plasma.start_plasma_store(
plasma_store_memory=objstore_memory,
plasma_store_memory=object_store_memory,
use_profiler=RUN_PLASMA_STORE_PROFILER,
stdout_file=store_stdout_file,
stderr_file=store_stderr_file,
@@ -1389,7 +1432,7 @@ def start_ray_processes(address_info=None,
redis_address,
store_stdout_file=plasma_store_stdout_file,
store_stderr_file=plasma_store_stderr_file,
objstore_memory=object_store_memory,
object_store_memory=object_store_memory,
cleanup=cleanup,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
+22
View File
@@ -324,6 +324,28 @@ def get_system_memory():
return sysctl(["sysctl", "hw.memsize"])
def get_shared_memory_bytes():
"""Get the size of the shared memory file system.
Returns:
The size of the shared memory file system in bytes.
"""
# Make sure this is only called on Linux.
assert sys.platform == "linux" or sys.platform == "linux2"
shm_fd = os.open("/dev/shm", os.O_RDONLY)
try:
shm_fs_stats = os.fstatvfs(shm_fd)
# The value shm_fs_stats.f_bsize is the block size and the
# value shm_fs_stats.f_bavail is the number of available
# blocks.
shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail
finally:
os.close(shm_fd)
return shm_avail
def check_oversized_pickle(pickled, name, obj_type, worker):
"""Send a warning message if the pickled object is too large.
+3 -3
View File
@@ -721,7 +721,7 @@ class Worker(object):
arguments.append(argument)
return arguments
def _store_outputs_in_objstore(self, object_ids, outputs):
def _store_outputs_in_object_store(self, object_ids, outputs):
"""Store the outputs of a remote function in the local object store.
This stores the values that were returned by a remote function in the
@@ -819,7 +819,7 @@ class Worker(object):
num_returns = len(return_object_ids)
if num_returns == 1:
outputs = (outputs, )
self._store_outputs_in_objstore(return_object_ids, outputs)
self._store_outputs_in_object_store(return_object_ids, outputs)
except Exception as e:
self._handle_process_task_failure(
function_id, function_name, return_object_ids, e,
@@ -831,7 +831,7 @@ class Worker(object):
failure_objects = [
failure_object for _ in range(len(return_object_ids))
]
self._store_outputs_in_objstore(return_object_ids, failure_objects)
self._store_outputs_in_object_store(return_object_ids, failure_objects)
# Log the error message.
ray.utils.push_error_to_driver(
self,