mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 22:58:00 +08:00
Remove flaky object manager test that's no longer needed
This commit is contained in:
@@ -7,7 +7,6 @@ import warnings
|
||||
|
||||
import ray
|
||||
from ray.cluster_utils import Cluster
|
||||
from ray.test_utils import wait_for_condition
|
||||
|
||||
if (multiprocessing.cpu_count() < 40
|
||||
or ray.utils.get_system_memory() < 50 * 10**9):
|
||||
@@ -194,61 +193,6 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
|
||||
assert all(value == 1 for value in send_counts.values())
|
||||
|
||||
|
||||
# The purpose of this test is to make sure that an object that was already been
|
||||
# transferred to a node can be transferred again.
|
||||
def test_object_transfer_retry(ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
|
||||
# Force the sending object manager to allow duplicate pushes again sooner.
|
||||
# Also, force the receiving object manager to retry the pull sooner. We
|
||||
# make the chunk size smaller in order to make it easier to test objects
|
||||
# with multiple chunks.
|
||||
config = {
|
||||
"object_manager_default_chunk_size": 1000,
|
||||
"object_store_full_max_retries": 1,
|
||||
}
|
||||
object_store_memory = 150 * 1024 * 1024
|
||||
cluster.add_node(
|
||||
object_store_memory=object_store_memory, _system_config=config)
|
||||
cluster.add_node(num_gpus=1, object_store_memory=object_store_memory)
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
@ray.remote(num_gpus=1)
|
||||
def f(size):
|
||||
return np.zeros(size, dtype=np.uint8)
|
||||
|
||||
# Transfer an object to warm up the object manager.
|
||||
ray.get(f.remote(10**6))
|
||||
|
||||
x_id = f.remote(10**6)
|
||||
assert not ray.worker.global_worker.core_worker.object_exists(x_id)
|
||||
|
||||
# Get the objects locally to cause them to be transferred. This is the
|
||||
# first time the objects are getting transferred, so it should happen
|
||||
# quickly.
|
||||
ray.get(x_id)
|
||||
|
||||
def not_exists():
|
||||
return not ray.worker.global_worker.core_worker.object_exists(x_id)
|
||||
|
||||
def force_eviction():
|
||||
refs = []
|
||||
for _ in range(20):
|
||||
try:
|
||||
refs.append(
|
||||
ray.put(
|
||||
np.zeros(object_store_memory // 10, dtype=np.uint8)))
|
||||
except Exception:
|
||||
break
|
||||
wait_for_condition(not_exists)
|
||||
|
||||
# Force the object to be evicted from the local node.
|
||||
force_eviction()
|
||||
|
||||
# Get the object again and make sure it gets transferred.
|
||||
ray.get(x_id)
|
||||
|
||||
|
||||
# The purpose of this test is to make sure we can transfer many objects. In the
|
||||
# past, this has caused failures in which object managers create too many open
|
||||
# files and run out of resources.
|
||||
|
||||
Reference in New Issue
Block a user