mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 13:37:39 +08:00
Remove memory quota enforcement from actors (#11480)
* wip * fix * deprecate
This commit is contained in:
@@ -397,11 +397,6 @@ cdef execute_task(
|
||||
next_title = f"ray::{class_name}"
|
||||
pid = os.getpid()
|
||||
worker_name = f"ray_{class_name}_{pid}"
|
||||
if c_resources.find(b"memory") != c_resources.end():
|
||||
worker.memory_monitor.set_heap_limit(
|
||||
worker_name,
|
||||
ray_constants.from_memory_units(
|
||||
dereference(c_resources.find(b"memory")).second))
|
||||
if c_resources.find(b"object_store_memory") != c_resources.end():
|
||||
worker.core_worker.set_object_store_client_options(
|
||||
worker_name,
|
||||
|
||||
@@ -78,8 +78,6 @@ class MemoryMonitor:
|
||||
# throttle this check at most once a second or so.
|
||||
self.check_interval = check_interval
|
||||
self.last_checked = 0
|
||||
self.heap_limit = None
|
||||
self.worker_name = None
|
||||
try:
|
||||
self.error_threshold = float(
|
||||
os.getenv("RAY_MEMORY_MONITOR_ERROR_THRESHOLD"))
|
||||
@@ -98,10 +96,6 @@ class MemoryMonitor:
|
||||
"`pip install psutil` (or ray[debug]) to enable "
|
||||
"debugging of memory-related crashes.")
|
||||
|
||||
def set_heap_limit(self, worker_name, limit_bytes):
|
||||
self.heap_limit = limit_bytes
|
||||
self.worker_name = worker_name
|
||||
|
||||
def get_memory_usage(self):
|
||||
psutil_mem = psutil.virtual_memory()
|
||||
total_gb = psutil_mem.total / (1024**3)
|
||||
@@ -140,17 +134,3 @@ class MemoryMonitor:
|
||||
self.error_threshold))
|
||||
else:
|
||||
logger.debug(f"Memory usage is {used_gb} / {total_gb}")
|
||||
|
||||
if self.heap_limit:
|
||||
mem_info = psutil.Process(os.getpid()).memory_info()
|
||||
heap_size = get_rss(mem_info)
|
||||
if heap_size > self.heap_limit:
|
||||
raise RayOutOfMemoryError(
|
||||
"Heap memory usage for {} is {} / {} GiB limit".format(
|
||||
self.worker_name, round(heap_size / (1024**3), 4),
|
||||
round(self.heap_limit / (1024**3), 4)))
|
||||
elif heap_size > 0.8 * self.heap_limit:
|
||||
logger.warning(
|
||||
"Heap memory usage for {} is {} / {} GiB limit".format(
|
||||
self.worker_name, round(heap_size / (1024**3), 4),
|
||||
round(self.heap_limit / (1024**3), 4)))
|
||||
|
||||
@@ -68,26 +68,6 @@ class TestMemoryScheduling(unittest.TestCase):
|
||||
finally:
|
||||
ray.shutdown()
|
||||
|
||||
def testTuneDriverHeapLimit(self):
|
||||
try:
|
||||
ray.init(num_cpus=4, _memory=100 * MB)
|
||||
_register_all()
|
||||
result = tune.run(
|
||||
"PG",
|
||||
stop={"timesteps_total": 10000},
|
||||
config={
|
||||
"env": "CartPole-v0",
|
||||
"memory": 100 * 1024 * 1024, # too little
|
||||
"framework": "tf",
|
||||
},
|
||||
raise_on_failed_trial=False)
|
||||
self.assertEqual(result.trials[0].status, "ERROR")
|
||||
self.assertTrue(
|
||||
"RayOutOfMemoryError: Heap memory usage for ray_PG_" in
|
||||
result.trials[0].error_msg)
|
||||
finally:
|
||||
ray.shutdown()
|
||||
|
||||
def testTuneDriverStoreLimit(self):
|
||||
try:
|
||||
ray.init(
|
||||
@@ -111,27 +91,6 @@ class TestMemoryScheduling(unittest.TestCase):
|
||||
finally:
|
||||
ray.shutdown()
|
||||
|
||||
def testTuneWorkerHeapLimit(self):
|
||||
try:
|
||||
ray.init(num_cpus=4, _memory=100 * MB)
|
||||
_register_all()
|
||||
result = tune.run(
|
||||
"PG",
|
||||
stop={"timesteps_total": 10000},
|
||||
config={
|
||||
"env": "CartPole-v0",
|
||||
"num_workers": 1,
|
||||
"memory_per_worker": 100 * 1024 * 1024, # too little
|
||||
"framework": "tf",
|
||||
},
|
||||
raise_on_failed_trial=False)
|
||||
self.assertEqual(result.trials[0].status, "ERROR")
|
||||
self.assertTrue(
|
||||
"RayOutOfMemoryError: Heap memory usage for ray_Rollout" in
|
||||
result.trials[0].error_msg)
|
||||
finally:
|
||||
ray.shutdown()
|
||||
|
||||
def testTuneWorkerStoreLimit(self):
|
||||
try:
|
||||
ray.init(
|
||||
|
||||
Reference in New Issue
Block a user