mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 07:23:55 +08:00
Refresh ObjectIDs in raylet for stopgap GC (#6109)
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
# coding: utf-8
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import numpy as np
|
||||
import time
|
||||
import logging
|
||||
|
||||
import ray
|
||||
import ray.tests.cluster_utils
|
||||
import ray.tests.utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_basic_gc(shutdown_only):
|
||||
ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True)
|
||||
|
||||
@ray.remote
|
||||
def shuffle(input):
|
||||
return np.random.shuffle(input)
|
||||
|
||||
@ray.remote
|
||||
class Actor(object):
|
||||
def __init__(self):
|
||||
# Hold a long-lived reference to a ray.put object. This should not
|
||||
# be garbage collected while the actor is alive.
|
||||
self.large_object = ray.put(
|
||||
np.zeros(25 * 1024 * 1024, dtype=np.uint8), weakref=True)
|
||||
|
||||
def get_large_object(self):
|
||||
return ray.get(self.large_object)
|
||||
|
||||
actor = Actor.remote()
|
||||
|
||||
# Fill up the object store with short-lived objects. These should be
|
||||
# evicted before the long-lived object whose reference is held by
|
||||
# the actor.
|
||||
for batch in range(10):
|
||||
intermediate_result = shuffle.remote(
|
||||
np.zeros(10 * 1024 * 1024, dtype=np.uint8))
|
||||
ray.get(intermediate_result)
|
||||
|
||||
# The ray.get below would fail with only LRU eviction, as the object
|
||||
# that was ray.put by the actor would have been evicted.
|
||||
ray.get(actor.get_large_object.remote())
|
||||
|
||||
|
||||
def test_pending_task_dependency(shutdown_only):
|
||||
ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True)
|
||||
|
||||
@ray.remote
|
||||
def pending(input1, input2):
|
||||
return
|
||||
|
||||
@ray.remote
|
||||
def slow():
|
||||
time.sleep(5)
|
||||
|
||||
# The object that is ray.put here will go out of scope immediately, so if
|
||||
# pending task dependencies aren't considered, it will be evicted before
|
||||
# the ray.get below due to the subsequent ray.puts that fill up the object
|
||||
# store.
|
||||
np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
oid = pending.remote(ray.put(np_array), slow.remote())
|
||||
|
||||
for _ in range(2):
|
||||
ray.put(np_array)
|
||||
|
||||
ray.get(oid)
|
||||
@@ -66,10 +66,14 @@ class TestMemoryLimits(unittest.TestCase):
|
||||
z = ray.put("hi", weakref=True)
|
||||
a = LightActor._remote(object_store_memory=a_quota)
|
||||
b = GreedyActor._remote(object_store_memory=b_quota)
|
||||
oids = [z]
|
||||
for _ in range(5):
|
||||
r_a = a.sample.remote()
|
||||
for _ in range(20):
|
||||
ray.get(b.sample.remote())
|
||||
new_oid = b.sample.remote()
|
||||
oids.append(new_oid)
|
||||
ray.get(new_oid)
|
||||
oids.append(r_a)
|
||||
ray.get(r_a)
|
||||
ray.get(z)
|
||||
except Exception as e:
|
||||
|
||||
@@ -33,8 +33,12 @@ class TestUnreconstructableErrors(unittest.TestCase):
|
||||
|
||||
x_id = f.remote(None)
|
||||
ray.get(x_id)
|
||||
# Hold references to the ray.put objects so they aren't LRU'd.
|
||||
oids = []
|
||||
for _ in range(400):
|
||||
ray.get([f.remote(np.zeros(10000)) for _ in range(50)])
|
||||
new_oids = [f.remote(np.zeros(10000)) for _ in range(50)]
|
||||
oids.extend(new_oids)
|
||||
ray.get(new_oids)
|
||||
self.assertRaises(ray.exceptions.UnreconstructableError,
|
||||
lambda: ray.get(x_id))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user