diff --git a/ci/long_running_tests/config.yaml b/ci/long_running_tests/config.yaml index 76254385f..fa3361f15 100644 --- a/ci/long_running_tests/config.yaml +++ b/ci/long_running_tests/config.yaml @@ -13,7 +13,7 @@ auth: ssh_user: ubuntu head_node: - InstanceType: m5.2xlarge + InstanceType: m5.xlarge ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI. # Set primary volume to 25 GiB diff --git a/ci/long_running_tests/start_workloads.sh b/ci/long_running_tests/start_workloads.sh index f68a774a2..21fdf2bb4 100755 --- a/ci/long_running_tests/start_workloads.sh +++ b/ci/long_running_tests/start_workloads.sh @@ -60,5 +60,12 @@ done echo "" echo "" +echo "To shut down all instances, run the following." +echo " $ROOT_DIR/shut_down_workloads.sh" + +echo "" +echo "" + echo "To check up on the scripts, run the following." -echo " $ROOT_DIR/check_workloads.sh" +echo " $ROOT_DIR/check_workloads.sh --load" +echo " $ROOT_DIR/check_workloads.sh --logs" diff --git a/ci/long_running_tests/workloads/actor_deaths.py b/ci/long_running_tests/workloads/actor_deaths.py new file mode 100644 index 000000000..55f05c34c --- /dev/null +++ b/ci/long_running_tests/workloads/actor_deaths.py @@ -0,0 +1,107 @@ +# This workload tests repeatedly killing actors and submitting tasks to them. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import sys +import time + +import ray +from ray.tests.cluster_utils import Cluster + +num_redis_shards = 1 +redis_max_memory = 10**8 +object_store_memory = 10**8 +num_nodes = 2 + +message = ("Make sure there is enough memory on this machine to run this " + "workload. We divide the system memory by 2 to provide a buffer.") +assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory < + ray.utils.get_system_memory() / 2) + +# Simulate a cluster on one machine. + +cluster = Cluster() +for i in range(num_nodes): + cluster.add_node( + redis_port=6379 if i == 0 else None, + num_redis_shards=num_redis_shards if i == 0 else None, + num_cpus=8, + num_gpus=0, + resources={str(i): 2}, + object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory) +ray.init(redis_address=cluster.redis_address) + +# Run the workload. + +num_parents = 5 +num_children = 5 +death_probability = 0.95 + + +@ray.remote +class Child(object): + def __init__(self, death_probability): + self.death_probability = death_probability + + def ping(self): + # Exit process with some probability. + exit_chance = np.random.rand() + if exit_chance > self.death_probability: + sys.exit(-1) + + +@ray.remote +class Parent(object): + def __init__(self, num_children, death_probability): + self.death_probability = death_probability + self.children = [ + Child.remote(death_probability) for _ in range(num_children) + ] + + def ping(self, num_pings): + children_outputs = [] + for _ in range(num_pings): + children_outputs += [ + child.ping.remote() for child in self.children + ] + try: + ray.get(children_outputs) + except Exception: + # Replace the children if one of them died. + self.__init__(len(self.children), self.death_probability) + + def kill(self): + # Clean up children. + ray.get([child.__ray_terminate__.remote() for child in self.children]) + + +parents = [ + Parent.remote(num_children, death_probability) for _ in range(num_parents) +] + +iteration = 0 +start_time = time.time() +previous_time = start_time +while True: + ray.get([parent.ping.remote(10) for parent in parents]) + + # Kill a parent actor with some probability. + exit_chance = np.random.rand() + if exit_chance > death_probability: + parent_index = np.random.randint(len(parents)) + parents[parent_index].kill.remote() + parents[parent_index] = Parent.remote(num_children, death_probability) + + new_time = time.time() + print("Iteration {}:\n" + " - Iteration time: {}.\n" + " - Absolute time: {}.\n" + " - Total elapsed time: {}.".format( + iteration, new_time - previous_time, new_time, + new_time - start_time)) + previous_time = new_time + iteration += 1 diff --git a/ci/long_running_tests/workloads/workload_apex.py b/ci/long_running_tests/workloads/apex.py similarity index 100% rename from ci/long_running_tests/workloads/workload_apex.py rename to ci/long_running_tests/workloads/apex.py diff --git a/ci/long_running_tests/workloads/workload_impala.py b/ci/long_running_tests/workloads/impala.py similarity index 100% rename from ci/long_running_tests/workloads/workload_impala.py rename to ci/long_running_tests/workloads/impala.py diff --git a/ci/long_running_tests/workloads/workload2.py b/ci/long_running_tests/workloads/many_actor_tasks.py similarity index 100% rename from ci/long_running_tests/workloads/workload2.py rename to ci/long_running_tests/workloads/many_actor_tasks.py diff --git a/ci/long_running_tests/workloads/workload1.py b/ci/long_running_tests/workloads/many_tasks.py similarity index 100% rename from ci/long_running_tests/workloads/workload1.py rename to ci/long_running_tests/workloads/many_tasks.py diff --git a/ci/long_running_tests/workloads/workload3.py b/ci/long_running_tests/workloads/node_failures.py similarity index 100% rename from ci/long_running_tests/workloads/workload3.py rename to ci/long_running_tests/workloads/node_failures.py diff --git a/ci/long_running_tests/workloads/workload_pbt.py b/ci/long_running_tests/workloads/pbt.py similarity index 100% rename from ci/long_running_tests/workloads/workload_pbt.py rename to ci/long_running_tests/workloads/pbt.py