mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 11:10:25 +08:00
Add warning/error if object store memory exceeds available memory (#4893)
* exclude * format * add warning * hatch * reduce mem usage * reduce object store mem * set obj mem
This commit is contained in:
@@ -24,21 +24,26 @@ class RayOutOfMemoryError(Exception):
|
||||
proc_stats = []
|
||||
for pid in pids:
|
||||
proc = psutil.Process(pid)
|
||||
proc_stats.append((proc.memory_info().rss, pid, proc.cmdline()))
|
||||
proc_stats.append(
|
||||
(proc.memory_info().rss - proc.memory_info().shared, pid,
|
||||
proc.cmdline()))
|
||||
proc_str = "PID\tMEM\tCOMMAND"
|
||||
for rss, pid, cmdline in sorted(proc_stats, reverse=True)[:5]:
|
||||
for rss, pid, cmdline in sorted(proc_stats, reverse=True)[:10]:
|
||||
proc_str += "\n{}\t{}GB\t{}".format(
|
||||
pid, round(rss / 1e9, 2), " ".join(cmdline)[:100].strip())
|
||||
return ("More than {}% of the memory on ".format(int(
|
||||
100 * threshold)) + "node {} is used ({} / {} GB). ".format(
|
||||
os.uname()[1], round(used_gb, 2), round(total_gb, 2)) +
|
||||
"The top 5 memory consumers are:\n\n{}".format(proc_str) +
|
||||
"\n\nIn addition, ~{} GB of shared memory is ".format(
|
||||
"The top 10 memory consumers are:\n\n{}".format(proc_str) +
|
||||
"\n\nIn addition, up to {} GB of shared memory is ".format(
|
||||
round(psutil.virtual_memory().shared / 1e9, 2)) +
|
||||
"currently being used by the Ray object store. You can set "
|
||||
"the object store size with the `object_store_memory` "
|
||||
"parameter when starting Ray, and the max Redis size with "
|
||||
"`redis_max_memory`.")
|
||||
"`redis_max_memory`. Note that Ray assumes all system "
|
||||
"memory is available for use by workers. If your system "
|
||||
"has other applications running, you should manually set "
|
||||
"these memory limits to a lower value.")
|
||||
|
||||
|
||||
class MemoryMonitor(object):
|
||||
|
||||
@@ -1300,6 +1300,32 @@ def determine_plasma_store_config(object_store_memory=None,
|
||||
object_store_memory = (
|
||||
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES)
|
||||
|
||||
# Other applications may also be using a lot of memory on the same
|
||||
# node. Try to detect when this is happening and log a warning or
|
||||
# error in more severe cases.
|
||||
avail_memory = ray.utils.estimate_available_memory()
|
||||
object_store_fraction = object_store_memory / avail_memory
|
||||
# Escape hatch, undocumented for now.
|
||||
no_check = os.environ.get("RAY_DEBUG_DISABLE_MEM_CHECKS", False)
|
||||
if object_store_fraction > 0.9 and not no_check:
|
||||
raise ValueError(
|
||||
"The default object store size of {} GB "
|
||||
"will use more than 90% of the available memory on this node "
|
||||
"({} GB). Please reduce the object store memory size "
|
||||
"to avoid memory contention with other applications, or "
|
||||
"shut down the applications using this memory.".format(
|
||||
round(object_store_memory / 1e9, 2),
|
||||
round(avail_memory / 1e9, 2)))
|
||||
elif object_store_fraction > 0.5:
|
||||
logger.warning(
|
||||
"WARNING: The default object store size of {} GB "
|
||||
"will use more than 50% of the available memory on this node "
|
||||
"({} GB). Consider setting the object store memory manually "
|
||||
"to a smaller size to avoid memory contention with other "
|
||||
"applications.".format(
|
||||
round(object_store_memory / 1e9, 2),
|
||||
round(avail_memory / 1e9, 2)))
|
||||
|
||||
# Determine which directory to use. By default, use /tmp on MacOS and
|
||||
# /dev/shm on Linux, unless the shared-memory file system is too small,
|
||||
# in which case we default to /tmp on Linux.
|
||||
|
||||
@@ -506,7 +506,11 @@ def test_resource_assignment(shutdown_only):
|
||||
"""Test to make sure that we assign resource to actors at instantiation."""
|
||||
# This test will create 16 actors. Declaring this many CPUs initially will
|
||||
# speed up the test because the workers will be started ahead of time.
|
||||
ray.init(num_cpus=16, num_gpus=1, resources={"Custom": 1})
|
||||
ray.init(
|
||||
num_cpus=16,
|
||||
num_gpus=1,
|
||||
resources={"Custom": 1},
|
||||
object_store_memory=int(10**8))
|
||||
|
||||
class Actor(object):
|
||||
def __init__(self):
|
||||
@@ -1262,7 +1266,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
def test_actors_and_tasks_with_gpus_version_two(shutdown_only):
|
||||
# Create tasks and actors that both use GPUs and make sure that they
|
||||
# are given different GPUs
|
||||
ray.init(num_cpus=10, num_gpus=10)
|
||||
ray.init(num_cpus=10, num_gpus=10, object_store_memory=int(10**8))
|
||||
|
||||
@ray.remote(num_gpus=1)
|
||||
def f():
|
||||
|
||||
@@ -16,7 +16,7 @@ def get_ray_result(cython_func, *args):
|
||||
|
||||
class CythonTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
ray.init()
|
||||
ray.init(object_store_memory=int(10**8))
|
||||
|
||||
def tearDown(self):
|
||||
ray.shutdown()
|
||||
|
||||
@@ -13,7 +13,7 @@ from ray.tests.cluster_utils import Cluster
|
||||
import ray.ray_constants as ray_constants
|
||||
|
||||
|
||||
@pytest.fixture(params=[1, 20])
|
||||
@pytest.fixture(params=[1, 4])
|
||||
def ray_start_sharded(request):
|
||||
num_redis_shards = request.param
|
||||
|
||||
@@ -24,7 +24,10 @@ def ray_start_sharded(request):
|
||||
|
||||
# Start the Ray processes.
|
||||
ray.init(
|
||||
num_cpus=10, num_redis_shards=num_redis_shards, redis_max_memory=10**7)
|
||||
object_store_memory=int(0.1 * 10**9),
|
||||
num_cpus=10,
|
||||
num_redis_shards=num_redis_shards,
|
||||
redis_max_memory=10**7)
|
||||
|
||||
yield None
|
||||
|
||||
|
||||
@@ -400,6 +400,31 @@ def get_system_memory():
|
||||
return memory_in_bytes
|
||||
|
||||
|
||||
def estimate_available_memory():
|
||||
"""Return the currently available amount of system memory in bytes.
|
||||
|
||||
Returns:
|
||||
The total amount of available memory in bytes. It may be an
|
||||
overestimate if psutil is not installed.
|
||||
"""
|
||||
|
||||
# Use psutil if it is available.
|
||||
try:
|
||||
import psutil
|
||||
return psutil.virtual_memory().available
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Handle Linux.
|
||||
if sys.platform == "linux" or sys.platform == "linux2":
|
||||
bytes_in_kilobyte = 1024
|
||||
return (
|
||||
vmstat("total memory") - vmstat("used memory")) * bytes_in_kilobyte
|
||||
|
||||
# Give up
|
||||
return get_system_memory()
|
||||
|
||||
|
||||
def get_shared_memory_bytes():
|
||||
"""Get the size of the shared memory file system.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user