From 7c174d0ffee54a9a9c1f9be9c2d88ccfa313db2b Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 5 Mar 2020 20:51:24 -0800 Subject: [PATCH] Make the ref counting test more stressful (#7473) --- .../workloads/many_tasks_serialized_ids.py | 43 +++++++++++++++---- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/ci/long_running_tests/workloads/many_tasks_serialized_ids.py b/ci/long_running_tests/workloads/many_tasks_serialized_ids.py index bb300428f..02830b065 100644 --- a/ci/long_running_tests/workloads/many_tasks_serialized_ids.py +++ b/ci/long_running_tests/workloads/many_tasks_serialized_ids.py @@ -2,6 +2,8 @@ # returning serialized ObjectIDs. import time +import json +import random import numpy as np @@ -20,6 +22,9 @@ assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory < # Simulate a cluster on one machine. +config = json.dumps({ + "distributed_ref_counting_enabled": 1, +}) cluster = Cluster() for i in range(num_nodes): cluster.add_node( @@ -30,20 +35,31 @@ for i in range(num_nodes): resources={str(i): 2}, object_store_memory=object_store_memory, redis_max_memory=redis_max_memory, - webui_host="0.0.0.0") + webui_host="0.0.0.0", + _internal_config=config, + ) ray.init(address=cluster.address) # Run the workload. @ray.remote(max_retries=0) -def child(*xs): - return np.zeros(1024, dtype=np.uint8) +def churn(): + return ray.put(np.zeros(1024 * 1024, dtype=np.uint8)) @ray.remote(max_retries=0) -def f(xs): - return child.remote(*xs) +def child(*xs): + oid = ray.put(np.zeros(1024 * 1024, dtype=np.uint8)) + return oid + + +@ray.remote(max_retries=0) +def f(*xs): + if xs: + return random.choice(xs) + else: + return child.remote(*xs) iteration = 0 @@ -53,13 +69,24 @@ previous_time = start_time while True: for _ in range(50): new_constrained_ids = [ - f._remote(args=[[ids]], resources={str(i % num_nodes): 1}) + f._remote(args=ids, resources={str(i % num_nodes): 1}) for i in range(25) ] - new_unconstrained_ids = [f.remote([ids]) for _ in range(25)] + new_unconstrained_ids = [f.remote(*ids) for _ in range(25)] ids = new_constrained_ids + new_unconstrained_ids - ray.get(ids) + # Fill the object store while the tasks are running. + for i in range(num_nodes): + for _ in range(10): + [ + churn._remote(args=[], resources={str(i % num_nodes): 1}) + for _ in range(10) + ] + + # Make sure that the objects are still available. + child_ids = ray.get(ids) + for child_id in child_ids: + ray.get(child_id) new_time = time.time() print("Iteration {}:\n"