mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 05:11:18 +08:00
Make the ref counting test more stressful (#7473)
This commit is contained in:
@@ -2,6 +2,8 @@
|
||||
# returning serialized ObjectIDs.
|
||||
|
||||
import time
|
||||
import json
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
|
||||
@@ -20,6 +22,9 @@ assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory <
|
||||
|
||||
# Simulate a cluster on one machine.
|
||||
|
||||
config = json.dumps({
|
||||
"distributed_ref_counting_enabled": 1,
|
||||
})
|
||||
cluster = Cluster()
|
||||
for i in range(num_nodes):
|
||||
cluster.add_node(
|
||||
@@ -30,20 +35,31 @@ for i in range(num_nodes):
|
||||
resources={str(i): 2},
|
||||
object_store_memory=object_store_memory,
|
||||
redis_max_memory=redis_max_memory,
|
||||
webui_host="0.0.0.0")
|
||||
webui_host="0.0.0.0",
|
||||
_internal_config=config,
|
||||
)
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
# Run the workload.
|
||||
|
||||
|
||||
@ray.remote(max_retries=0)
|
||||
def child(*xs):
|
||||
return np.zeros(1024, dtype=np.uint8)
|
||||
def churn():
|
||||
return ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
|
||||
|
||||
|
||||
@ray.remote(max_retries=0)
|
||||
def f(xs):
|
||||
return child.remote(*xs)
|
||||
def child(*xs):
|
||||
oid = ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
|
||||
return oid
|
||||
|
||||
|
||||
@ray.remote(max_retries=0)
|
||||
def f(*xs):
|
||||
if xs:
|
||||
return random.choice(xs)
|
||||
else:
|
||||
return child.remote(*xs)
|
||||
|
||||
|
||||
iteration = 0
|
||||
@@ -53,13 +69,24 @@ previous_time = start_time
|
||||
while True:
|
||||
for _ in range(50):
|
||||
new_constrained_ids = [
|
||||
f._remote(args=[[ids]], resources={str(i % num_nodes): 1})
|
||||
f._remote(args=ids, resources={str(i % num_nodes): 1})
|
||||
for i in range(25)
|
||||
]
|
||||
new_unconstrained_ids = [f.remote([ids]) for _ in range(25)]
|
||||
new_unconstrained_ids = [f.remote(*ids) for _ in range(25)]
|
||||
ids = new_constrained_ids + new_unconstrained_ids
|
||||
|
||||
ray.get(ids)
|
||||
# Fill the object store while the tasks are running.
|
||||
for i in range(num_nodes):
|
||||
for _ in range(10):
|
||||
[
|
||||
churn._remote(args=[], resources={str(i % num_nodes): 1})
|
||||
for _ in range(10)
|
||||
]
|
||||
|
||||
# Make sure that the objects are still available.
|
||||
child_ids = ray.get(ids)
|
||||
for child_id in child_ids:
|
||||
ray.get(child_id)
|
||||
|
||||
new_time = time.time()
|
||||
print("Iteration {}:\n"
|
||||
|
||||
Reference in New Issue
Block a user