diff --git a/release/RELEASE_CHECKLIST.md b/release/RELEASE_CHECKLIST.md index da2d9145a..0c742a94d 100644 --- a/release/RELEASE_CHECKLIST.md +++ b/release/RELEASE_CHECKLIST.md @@ -62,8 +62,14 @@ This checklist is meant to be used in conjunction with the RELEASE_PROCESS.rst d - [ ] K8s operator test - [ ] Data processing tests - [ ] streaming_shuffle -- [x] Tune tests - - [x] ignore for now +- [ ] Tune tests + - [ ] test_bookkeeping_overhead + - [x] test_result_throughput_cluster (ignore final time) + - [x] test_result_throughput_single_node (ignore final time) + - [x] test_network_overhead (ignore final time) + - [ ] test_long_running_large_checkpoints + - [ ] test_xgboost_sweep + - [ ] test_durable_trainable - [ ] XGBoost Tests - [ ] distributed_api_test - [ ] train_small diff --git a/release/RELEASE_PROCESS.rst b/release/RELEASE_PROCESS.rst index f1decb4b6..2502a0865 100644 --- a/release/RELEASE_PROCESS.rst +++ b/release/RELEASE_PROCESS.rst @@ -167,8 +167,14 @@ is generally the easiest way to run release tests. General Ray Tune functionality is implicitly tested via RLLib and XGBoost release tests. We are in the process of introducing scalability envelopes for Ray Tune. - This is an ongoing effort and will only be introduced in the next release. - For now, **you can ignore the tune_tests directory**. + + Of the seven existing tests, three are currently not reaching their target time. + These three tests (test_result_throughput_cluster, test_result_throughput_single_node, and + test_network_overhead) are marked in the release checklist and don't have to be run at this time. + + The other release tests are expected to run through without errors and to pass within a pre-specified time. + The time is checked in the test function and the output will let you know if a run was fast enough and + thus passed the test. 10. **XGBoost release tests** diff --git a/release/tune_tests/scalability_tests/cluster.yaml b/release/tune_tests/scalability_tests/cluster.yaml deleted file mode 100644 index fd966898b..000000000 --- a/release/tune_tests/scalability_tests/cluster.yaml +++ /dev/null @@ -1,31 +0,0 @@ -cluster_name: ray-tune-scalability-tests - -min_workers: 15 -max_workers: 15 - -idle_timeout_minutes: 15 - -docker: - image: anyscale/ray:nightly - container_name: ray_container - pull_before_run: true - -provider: - type: aws - region: us-west-2 - availability_zone: us-west-2a - cache_stopped_nodes: false - -auth: - ssh_user: ubuntu - -head_node: - # 64 CPUs - InstanceType: m5.16xlarge - -worker_nodes: - # 64 CPUs - InstanceType: m5.16xlarge - -setup_commands: - - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl diff --git a/release/tune_tests/scalability_tests/cluster_16x2.yaml b/release/tune_tests/scalability_tests/cluster_16x2.yaml new file mode 100644 index 000000000..e5e56e7c9 --- /dev/null +++ b/release/tune_tests/scalability_tests/cluster_16x2.yaml @@ -0,0 +1,47 @@ +cluster_name: ray-tune-scalability-tests-16x2 + +max_workers: 15 +upscaling_speed: 15 + +idle_timeout_minutes: 0 + +docker: + image: anyscale/ray:nightly + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_2_ondemand: + node_config: + InstanceType: m5.large + resources: {"CPU": 2} + min_workers: 0 + max_workers: 0 + cpu_2_spot: + node_config: + InstanceType: m5.large + InstanceMarketOptions: + MarketType: spot + resources: {"CPU": 2} + min_workers: 15 + max_workers: 15 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_2_ondemand +worker_default_node_type: cpu_2_spot + +setup_commands: + - ray install-nightly + - pip install -U awscli + +file_mounts: { + "~/release-automation-tune_scalability_tests": "." +} diff --git a/release/tune_tests/scalability_tests/cluster_16x64.yaml b/release/tune_tests/scalability_tests/cluster_16x64.yaml new file mode 100644 index 000000000..fbe954b6c --- /dev/null +++ b/release/tune_tests/scalability_tests/cluster_16x64.yaml @@ -0,0 +1,42 @@ +cluster_name: ray-tune-scalability-tests-16x64 + +max_workers: 15 +upscaling_speed: 15 + +idle_timeout_minutes: 0 + +docker: + image: anyscale/ray:nightly + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_64_ondemand: + node_config: + InstanceType: m5.16xlarge + resources: {"CPU": 64} + min_workers: 0 + max_workers: 0 + cpu_64_spot: + node_config: + InstanceType: m5.16xlarge + InstanceMarketOptions: + MarketType: spot + resources: {"CPU": 64} + min_workers: 15 + max_workers: 15 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_64_ondemand +worker_default_node_type: cpu_64_spot + +setup_commands: + - ray install-nightly diff --git a/release/tune_tests/scalability_tests/cluster_16x64_data.yaml b/release/tune_tests/scalability_tests/cluster_16x64_data.yaml new file mode 100644 index 000000000..56db5a349 --- /dev/null +++ b/release/tune_tests/scalability_tests/cluster_16x64_data.yaml @@ -0,0 +1,53 @@ +cluster_name: ray-tune-scalability-tests-16x64_data + +max_workers: 16 +upscaling_speed: 16 + +idle_timeout_minutes: 0 + +docker: + image: anyscale/ray:nightly + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_64_ondemand: + node_config: + InstanceType: m5.16xlarge + resources: {"CPU": 64} + min_workers: 0 + max_workers: 0 + cpu_64_spot: + node_config: + InstanceType: m5.16xlarge + InstanceMarketOptions: + MarketType: spot + resources: {"CPU": 64} + min_workers: 15 + max_workers: 15 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_64_ondemand +worker_default_node_type: cpu_64_spot + +file_mounts: { + "~/release-automation-tune_scalability_tests": "." +} + +setup_commands: + - ray install-nightly + - pip install pytest xgboost_ray + - mkdir -p ~/data || true + - rm -rf ~/data/train.parquet || true + - rm -rf ~/data/test.parquet || true + - cp -R /tmp/ray_tmp_mount/release-automation-tune_scalability_tests ~/release-automation-tune_scalability_tests || echo "Copy failed" + - python ~/release-automation-tune_scalability_tests/create_test_data.py ~/data/train.parquet --seed 1234 --num-rows 40000000 --num-cols 40 --num-partitions 128 --num-classes 2 + - python ~/release-automation-tune_scalability_tests/create_test_data.py ~/data/test.parquet --seed 1234 --num-rows 10000000 --num-cols 40 --num-partitions 128 --num-classes 2 diff --git a/release/tune_tests/scalability_tests/cluster_1x16.yaml b/release/tune_tests/scalability_tests/cluster_1x16.yaml new file mode 100644 index 000000000..a40e0d0a0 --- /dev/null +++ b/release/tune_tests/scalability_tests/cluster_1x16.yaml @@ -0,0 +1,34 @@ +cluster_name: ray-tune-scalability-tests-1x16 + +max_workers: 0 +upscaling_speed: 1 + +idle_timeout_minutes: 0 + +docker: + image: anyscale/ray:nightly + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_4_ondemand: + node_config: + InstanceType: m5.xlarge + resources: {"CPU": 4} + min_workers: 0 + max_workers: 0 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_4_ondemand +worker_default_node_type: cpu_4_ondemand + +setup_commands: + - ray install-nightly diff --git a/release/tune_tests/scalability_tests/cluster_1x32_hd.yaml b/release/tune_tests/scalability_tests/cluster_1x32_hd.yaml new file mode 100644 index 000000000..e909c138c --- /dev/null +++ b/release/tune_tests/scalability_tests/cluster_1x32_hd.yaml @@ -0,0 +1,40 @@ +cluster_name: ray-tune-scalability-tests-1x32_hd + +max_workers: 0 +upscaling_speed: 1 + +idle_timeout_minutes: 0 + +docker: + image: anyscale/ray:nightly + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_32_hd_ondemand: + node_config: + InstanceType: m5.8xlarge + + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 160 + + resources: {"CPU": 32} # 128 GB memory + min_workers: 0 + max_workers: 0 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_32_hd_ondemand +worker_default_node_type: cpu_32_hd_ondemand + +setup_commands: + - ray install-nightly diff --git a/release/tune_tests/scalability_tests/cluster_1x96.yaml b/release/tune_tests/scalability_tests/cluster_1x96.yaml new file mode 100644 index 000000000..ec01ede17 --- /dev/null +++ b/release/tune_tests/scalability_tests/cluster_1x96.yaml @@ -0,0 +1,34 @@ +cluster_name: ray-tune-scalability-tests-1x96 + +max_workers: 0 +upscaling_speed: 1 + +idle_timeout_minutes: 0 + +docker: + image: anyscale/ray:nightly + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_96_ondemand: + node_config: + InstanceType: m5.24xlarge + resources: {"CPU": 96} + min_workers: 0 + max_workers: 0 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_96_ondemand +worker_default_node_type: cpu_96_ondemand + +setup_commands: + - ray install-nightly diff --git a/release/tune_tests/scalability_tests/cluster_200x2.yaml b/release/tune_tests/scalability_tests/cluster_200x2.yaml new file mode 100644 index 000000000..143505ab2 --- /dev/null +++ b/release/tune_tests/scalability_tests/cluster_200x2.yaml @@ -0,0 +1,42 @@ +cluster_name: ray-tune-scalability-tests-200x2 + +max_workers: 199 +upscaling_speed: 199 + +idle_timeout_minutes: 0 + +docker: + image: anyscale/ray:nightly + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_2_ondemand: + node_config: + InstanceType: m5.large + resources: {"CPU": 2} + min_workers: 0 + max_workers: 0 + cpu_2_spot: + node_config: + InstanceType: m5.large + InstanceMarketOptions: + MarketType: spot + resources: {"CPU": 2} + min_workers: 199 + max_workers: 199 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_2_ondemand +worker_default_node_type: cpu_2_spot + +setup_commands: + - ray install-nightly diff --git a/release/tune_tests/scalability_tests/create_test_data.py b/release/tune_tests/scalability_tests/create_test_data.py new file mode 100644 index 000000000..f7a450105 --- /dev/null +++ b/release/tune_tests/scalability_tests/create_test_data.py @@ -0,0 +1,61 @@ +import argparse +import numpy as np +import os + +from xgboost_ray.tests.utils import create_parquet + +if __name__ == "__main__": + if "OMP_NUM_THREADS" in os.environ: + del os.environ["OMP_NUM_THREADS"] + + parser = argparse.ArgumentParser(description="Create fake data.") + parser.add_argument( + "filename", type=str, default="/data/parted.parquet/", help="ray/dask") + parser.add_argument( + "-r", + "--num-rows", + required=False, + type=int, + default=1e8, + help="num rows") + parser.add_argument( + "-p", + "--num-partitions", + required=False, + type=int, + default=100, + help="num partitions") + parser.add_argument( + "-c", + "--num-cols", + required=False, + type=int, + default=4, + help="num columns (features)") + parser.add_argument( + "-C", + "--num-classes", + required=False, + type=int, + default=2, + help="num classes") + parser.add_argument( + "-s", + "--seed", + required=False, + type=int, + default=1234, + help="random seed") + + args = parser.parse_args() + + if os.path.exists(args.filename): + print(f"File already exists: {args.filename}. Skipping creation.") + + np.random.seed(args.seed) + create_parquet( + args.filename, + num_rows=int(args.num_rows), + num_partitions=int(args.num_partitions), + num_features=int(args.num_cols), + num_classes=int(args.num_classes)) diff --git a/release/tune_tests/scalability_tests/run.sh b/release/tune_tests/scalability_tests/run.sh index e4f5698aa..6c7172bfc 100755 --- a/release/tune_tests/scalability_tests/run.sh +++ b/release/tune_tests/scalability_tests/run.sh @@ -1,6 +1,7 @@ #!/usr/bin/env bash -ray_version="" +nodes="" +ray_version="" commit="" ray_branch="" @@ -8,9 +9,11 @@ for i in "$@" do echo "$i" case "$i" in + --nodes=*) + nodes="${i#*=}" + ;; --ray-version=*) ray_version="${i#*=}" - ;; --commit=*) commit="${i#*=}" @@ -32,25 +35,22 @@ case "$i" in esac done -if [[ $ray_version == "" || $commit == "" || $ray_branch == "" ]] +if [[ $nodes == "" || $ray_version == "" || $commit == "" || $ray_branch == "" ]] then - echo "Provide --ray-version, --commit, and --ray-branch" + echo "Provide --nodes --ray-version, --commit, and --ray-branch" exit 1 fi +echo "nodes: $nodes" echo "version: $ray_version" echo "commit: $commit" echo "branch: $ray_branch" echo "workload: ignored" -wheel="https://s3-us-west-2.amazonaws.com/ray-wheels/$ray_branch/$commit/ray-$ray_version-cp37-cp37m-manylinux2014_x86_64.whl" +# wheel="https://s3-us-west-2.amazonaws.com/ray-wheels/$ray_branch/$commit/ray-$ray_version-cp37-cp37m-manylinux2014_x86_64.whl" +# pip install -U "$wheel" -pip install -U pip -pip install -U "$wheel" -pip install "ray[tune]" "ray" -pip install boto3==1.4.8 cython==0.29.0 - -if ! python "wait_cluster.py" 16 450; then +if ! python "wait_cluster.py" "$nodes" 600; then echo "Cluster did not come up in time. Aborting test." exit 1 fi diff --git a/release/tune_tests/scalability_tests/workloads/_trainable.py b/release/tune_tests/scalability_tests/workloads/_trainable.py new file mode 100644 index 000000000..c5ce8c005 --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/_trainable.py @@ -0,0 +1,153 @@ +import os +import time + +import numpy as np +import pickle + +from ray import tune + +from ray.tune.durable_trainable import DurableTrainable + + +class TestDurableTrainable(DurableTrainable): + def __init__(self, remote_checkpoint_dir, config, logger_creator=None): + self.setup_env() + + super(TestDurableTrainable, self).__init__( + remote_checkpoint_dir, + config=config, + logger_creator=logger_creator) + + def setup_env(self): + pass + + def setup(self, config): + self._num_iters = int(config["num_iters"]) + self._sleep_time = config["sleep_time"] + self._score = config["score"] + + self._checkpoint_iters = config["checkpoint_iters"] + self._checkpoint_size_b = config["checkpoint_size_b"] + self._checkpoint_num_items = self._checkpoint_size_b // 8 # np.float64 + + self._iter = 0 + + def step(self): + if self._iter > 0: + time.sleep(self._sleep_time) + + res = dict(score=self._iter + self._score) + + if self._iter >= self._num_iters: + res["done"] = True + + self._iter += 1 + return res + + def save_checkpoint(self, tmp_checkpoint_dir): + checkpoint_file = os.path.join(tmp_checkpoint_dir, "bogus.ckpt") + checkpoint_data = np.random.uniform( + 0, 1, size=self._checkpoint_num_items) + with open(checkpoint_file, "wb") as fp: + pickle.dump(checkpoint_data, fp) + return checkpoint_file + + def load_checkpoint(self, checkpoint): + pass + + +def function_trainable(config): + num_iters = int(config["num_iters"]) + sleep_time = config["sleep_time"] + score = config["score"] + + checkpoint_iters = config["checkpoint_iters"] + checkpoint_size_b = config["checkpoint_size_b"] + checkpoint_num_items = checkpoint_size_b // 8 # np.float64 + + for i in range(num_iters): + if checkpoint_iters >= 0 and checkpoint_size_b > 0 and \ + i % checkpoint_iters == 0: + with tune.checkpoint_dir(step=i) as dir: + checkpoint_file = os.path.join(dir, "bogus.ckpt") + checkpoint_data = np.random.uniform( + 0, 1, size=checkpoint_num_items) + with open(checkpoint_file, "wb") as fp: + pickle.dump(checkpoint_data, fp) + + tune.report(score=i + score) + time.sleep(sleep_time) + + +def timed_tune_run(name: str, + num_samples: int, + results_per_second: int = 1, + trial_length_s: int = 1, + max_runtime: int = 300, + checkpoint_freq_s: int = -1, + checkpoint_size_b: int = 0, + **tune_kwargs): + durable = "sync_config" in tune_kwargs and \ + tune_kwargs["sync_config"].upload_dir.startswith("s3://") + + sleep_time = 1. / results_per_second + num_iters = int(trial_length_s / sleep_time) + checkpoint_iters = -1 + if checkpoint_freq_s >= 0: + checkpoint_iters = int(checkpoint_freq_s / sleep_time) + + config = { + "score": tune.uniform(0., 1.), + "num_iters": num_iters, + "sleep_time": sleep_time, + "checkpoint_iters": checkpoint_iters, + "checkpoint_size_b": checkpoint_size_b, + } + + print(f"Starting benchmark with config: {config}") + + run_kwargs = {"reuse_actors": True, "verbose": 2} + run_kwargs.update(tune_kwargs) + + _train = function_trainable + + aws_key_id = os.getenv("AWS_ACCESS_KEY_ID", "") + aws_secret = os.getenv("AWS_SECRET_ACCESS_KEY", "") + aws_session = os.getenv("AWS_SESSION_TOKEN", "") + + if durable: + + class AwsDurableTrainable(TestDurableTrainable): + AWS_ACCESS_KEY_ID = aws_key_id + AWS_SECRET_ACCESS_KEY = aws_secret + AWS_SESSION_TOKEN = aws_session + + def setup_env(self): + os.environ["AWS_ACCESS_KEY_ID"] = self.AWS_ACCESS_KEY_ID + os.environ[ + "AWS_SECRET_ACCESS_KEY"] = self.AWS_SECRET_ACCESS_KEY + os.environ["AWS_SESSION_TOKEN"] = self.AWS_SESSION_TOKEN + + _train = AwsDurableTrainable + run_kwargs["checkpoint_freq"] = checkpoint_iters + + start_time = time.monotonic() + tune.run( + _train, + config=config, + num_samples=num_samples, + raise_on_failed_trial=False, + **run_kwargs) + time_taken = time.monotonic() - start_time + + assert time_taken < max_runtime, \ + f"The {name} test took {time_taken:.2f} seconds, but should not " \ + f"have exceeded {max_runtime:.2f} seconds. Test failed. \n\n" \ + f"--- FAILED: {name.upper()} ::: " \ + f"{time_taken:.2f} > {max_runtime:.2f} ---" + + print(f"The {name} test took {time_taken:.2f} seconds, which " + f"is below the budget of {max_runtime:.2f} seconds. " + f"Test successful. \n\n" + f"--- PASSED: {name.upper()} ::: " + f"{time_taken:.2f} <= {max_runtime:.2f} ---") diff --git a/release/tune_tests/scalability_tests/workloads/test_bookkeeping_overhead.py b/release/tune_tests/scalability_tests/workloads/test_bookkeeping_overhead.py new file mode 100644 index 000000000..2792c18d8 --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/test_bookkeeping_overhead.py @@ -0,0 +1,42 @@ +"""Bookkeeping overhead (1 node, 10k trials) + +In this run, we will start a large number of trials (10k) that take just a +second to run. We thus measure overhead that comes with dealing with a +large number of trials, e.g. experiment checkpointing. + +Cluster: cluster_1x16.yaml + +Test owner: krfricke + +Acceptance criteria: Should run faster than 800 seconds. + +Theoretical minimum time: 10000/16 = 625 seconds +""" +import os + +import ray + +from _trainable import timed_tune_run + + +def main(): + os.environ["TUNE_GLOBAL_CHECKPOINT_S"] = "100" # Tweak + + ray.init(address="auto") + + num_samples = 10000 + results_per_second = 1 + trial_length_s = 1 + + max_runtime = 800 + + timed_tune_run( + name="bookkeeping overhead", + num_samples=num_samples, + results_per_second=results_per_second, + trial_length_s=trial_length_s, + max_runtime=max_runtime) + + +if __name__ == "__main__": + main() diff --git a/release/tune_tests/scalability_tests/workloads/test_durable_trainable.py b/release/tune_tests/scalability_tests/workloads/test_durable_trainable.py new file mode 100644 index 000000000..b37fd596f --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/test_durable_trainable.py @@ -0,0 +1,47 @@ +"""Durable trainable (16 trials, checkpoint to cloud) + +In this run, we will start 16 trials on a cluster. The trials create +10 MB checkpoints every 10 seconds and should only keep 2 of these. This test +ensures that durable checkpoints don't slow down experiment progress too much. + +Cluster: cluster_16x2.yaml + +Test owner: krfricke + +Acceptance criteria: Should run faster than 500 seconds. + +Theoretical minimum time: 300 seconds +""" +import ray +from ray import tune + +from _trainable import timed_tune_run + + +def main(): + ray.init(address="auto") + + num_samples = 16 + results_per_second = 10 / 60 + trial_length_s = 300 + + max_runtime = 500 + + timed_tune_run( + name="durable trainable", + num_samples=num_samples, + results_per_second=results_per_second, + trial_length_s=trial_length_s, + max_runtime=max_runtime, + checkpoint_freq_s=10, # Once every 10 seconds + checkpoint_size_b=int(10 * 1000**2), # 10 MB + keep_checkpoints_num=2, + resources_per_trial={"cpu": 2}, + sync_config=tune.SyncConfig( + sync_to_driver=False, + upload_dir="s3://ray-tune-scalability-test/durable/", + )) + + +if __name__ == "__main__": + main() diff --git a/release/tune_tests/scalability_tests/workloads/test_long_running_large_checkpoints.py b/release/tune_tests/scalability_tests/workloads/test_long_running_large_checkpoints.py new file mode 100644 index 000000000..05484431c --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/test_long_running_large_checkpoints.py @@ -0,0 +1,44 @@ +"""Large checkpoints in long running trials (16 trials, 4 GB checkpoints). + +In this run, we will start 16 trials on a single node. The trials create +4 GB checkpoints every 15 minutes and should only keep 2 of these. This test +ensures that handling large checkpoints don't lead to much overhead. + +Cluster: cluster_1x32_hd.yaml + +Test owner: krfricke + +Acceptance criteria: Should run faster than 90,000 seconds. + +Theoretical minimum time: 86,400 seconds +""" +import ray +from ray import tune + +from _trainable import timed_tune_run + + +def main(): + ray.init(address="auto") + + num_samples = 16 + results_per_second = 1 / 60 + trial_length_s = 86400 + + max_runtime = 90000 + + timed_tune_run( + name="long running large checkpoints", + num_samples=num_samples, + results_per_second=results_per_second, + trial_length_s=trial_length_s, + max_runtime=max_runtime, + checkpoint_freq_s=900, # Once every 15 minutes + checkpoint_size_b=int(3.75 * 1000**3), + keep_checkpoints_num=2, # 2 * 16 * 4 = 128 GB + resources_per_trial={"cpu": 1}, + sync_config=tune.SyncConfig(sync_to_driver=True)) + + +if __name__ == "__main__": + main() diff --git a/release/tune_tests/scalability_tests/workloads/test_network_overhead.py b/release/tune_tests/scalability_tests/workloads/test_network_overhead.py new file mode 100644 index 000000000..3222b6eca --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/test_network_overhead.py @@ -0,0 +1,41 @@ +"""Networking overhead (200 trials on 200 nodes) + +In this run, we will start 200 trials and run them on 200 different nodes. +This test will thus measure the overhead that comes with network communication +and specifically log synchronization. + +Cluster: cluster_200x2.yaml + +Test owner: krfricke + +Acceptance criteria: Should run faster than 500 seconds. + +Theoretical minimum time: 300 seconds +""" +import ray +from ray import tune + +from _trainable import timed_tune_run + + +def main(): + ray.init(address="auto") + + num_samples = 200 + results_per_second = 1 + trial_length_s = 300 + + max_runtime = 500 + + timed_tune_run( + name="result network overhead", + num_samples=num_samples, + results_per_second=results_per_second, + trial_length_s=trial_length_s, + max_runtime=max_runtime, + resources_per_trial={"cpu": 2}, # One per node + sync_config=tune.SyncConfig(sync_to_driver=True)) + + +if __name__ == "__main__": + main() diff --git a/release/tune_tests/scalability_tests/workloads/test_result_buffering.py b/release/tune_tests/scalability_tests/workloads/test_result_buffering.py deleted file mode 100644 index e6ea1762f..000000000 --- a/release/tune_tests/scalability_tests/workloads/test_result_buffering.py +++ /dev/null @@ -1,54 +0,0 @@ -import time - -import ray -from ray import tune -from ray.tune.cluster_info import is_ray_cluster - - -def my_naive_trainable(config): - for i in range(int(config["num_iters"])): - tune.report(score=i + config["score"]) - time.sleep(config["sleep_time"]) - - -def main(): - ray.init(address="auto") - - num_samples = 1000 - - sleep_time = 0.1 - num_iters = 300 - - expected_run_time = num_iters * sleep_time - - # Allow minimum of 20 % overhead (or 10 seconds for short runs) - expected_run_time += max(expected_run_time * 0.2, 10.) - - if is_ray_cluster(): - # Add constant overhead for SSH connection - expected_run_time += 0.3 * num_samples - - start_time = time.time() - tune.run( - my_naive_trainable, - config={ - "score": tune.uniform(0., 1.), - "num_iters": num_iters, - "sleep_time": sleep_time - }, - reuse_actors=True, - verbose=2, - num_samples=num_samples) - time_taken = time.time() - start_time - - assert time_taken < expected_run_time, \ - f"The buffering test took {time_taken:.2f} seconds, but should not " \ - f"have exceeded {expected_run_time:.2f} seconds. Test failed." - - print(f"The buffering test took {time_taken:.2f} seconds, which " - f"is below the budget of {expected_run_time:.2f} seconds. " - f"Test successful.") - - -if __name__ == "__main__": - main() diff --git a/release/tune_tests/scalability_tests/workloads/test_result_throughput_cluster.py b/release/tune_tests/scalability_tests/workloads/test_result_throughput_cluster.py new file mode 100644 index 000000000..8a3ba682c --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/test_result_throughput_cluster.py @@ -0,0 +1,49 @@ +"""Result throughput on a cluster + +In this run, we will start 1000 trials concurrently that report often +(10 results per second). We thus measure the amount of overhead incurred when +dealing with a large number of results from distributed trials. + +Cluster: cluster_16x64.yaml + +Test owner: krfricke + +Acceptance criteria: Should run faster than 120 seconds. + +Theoretical minimum time: 100 seconds +""" +import os + +import ray +from ray import tune +from ray.tune.cluster_info import is_ray_cluster + +from _trainable import timed_tune_run + + +def main(): + os.environ["TUNE_DISABLE_AUTO_CALLBACK_LOGGERS"] = "1" # Tweak + + ray.init(address="auto") + + num_samples = 1000 + results_per_second = 10 + trial_length_s = 100 + + max_runtime = 120 + + if is_ray_cluster(): + # Add constant overhead for SSH connection + max_runtime = 120 + + timed_tune_run( + name="result throughput cluster", + num_samples=num_samples, + results_per_second=results_per_second, + trial_length_s=trial_length_s, + max_runtime=max_runtime, + sync_config=tune.SyncConfig(sync_to_driver=False)) # Tweak! + + +if __name__ == "__main__": + main() diff --git a/release/tune_tests/scalability_tests/workloads/test_result_throughput_single_node.py b/release/tune_tests/scalability_tests/workloads/test_result_throughput_single_node.py new file mode 100644 index 000000000..288b28d5f --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/test_result_throughput_single_node.py @@ -0,0 +1,42 @@ +"""Result throughput on a single node + +In this run, we will start 96 trials concurrently that report very often +(500 results per second). We thus measure the amount of overhead incurred when +dealing with a large number of results. + +Cluster: cluster_1x96.yaml + +Test owner: krfricke + +Acceptance criteria: Should run faster than 120 seconds. + +Theoretical minimum time: 100 seconds +""" +import os + +import ray + +from _trainable import timed_tune_run + + +def main(): + os.environ["TUNE_DISABLE_AUTO_CALLBACK_LOGGERS"] = "1" # Tweak + + ray.init(address="auto") + + num_samples = 96 + results_per_second = 500 + trial_length_s = 100 + + max_runtime = 120 + + timed_tune_run( + name="result throughput single node", + num_samples=num_samples, + results_per_second=results_per_second, + trial_length_s=trial_length_s, + max_runtime=max_runtime) + + +if __name__ == "__main__": + main() diff --git a/release/tune_tests/scalability_tests/workloads/test_xgboost_sweep.py b/release/tune_tests/scalability_tests/workloads/test_xgboost_sweep.py new file mode 100644 index 000000000..16a1f2616 --- /dev/null +++ b/release/tune_tests/scalability_tests/workloads/test_xgboost_sweep.py @@ -0,0 +1,98 @@ +"""Large-scale XGBoost parameter sweep + +In this run, we will start 32 trials of 32 actors each running distributed +XGBoost training. This test is more about making sure that the run succeeds +than about total runtime. However, it is expected that this is faster than +1 hour. + +We fix the max_depth to 4 and the number of boosting rounds to 100. The +fastest observed training time for 32 actors (1 CPU each) was about 2000 +seconds. We allow up to 10 minutes of slack, so aim for 2600 seconds total +tuning time. + +Cluster: cluster_16x64_data.yaml + +Test owner: krfricke + +Acceptance criteria: Should run faster than 2600 seconds. Should run without +errors. +""" +import os +import time + +import ray +from ray import tune + +from xgboost_ray import train, RayParams, RayDMatrix + + +def xgboost_train(config, num_actors=128, num_boost_round=200): + train_set = RayDMatrix( + os.path.expanduser("~/data/train.parquet"), "labels") + test_set = RayDMatrix(os.path.expanduser("~/data/test.parquet"), "labels") + + evals_result = {} + + bst = train( + params=config, + dtrain=train_set, + evals=[(test_set, "eval")], + evals_result=evals_result, + ray_params=RayParams( + max_actor_restarts=1, + gpus_per_actor=0, + cpus_per_actor=1, + num_actors=num_actors), + verbose_eval=False, + num_boost_round=num_boost_round) + + model_path = "tuned.xgb" + bst.save_model(model_path) + print("Final validation error: {:.4f}".format( + evals_result["eval"]["error"][-1])) + + +def main(): + name = "large xgboost sweep" + + ray.init(address="auto") + + num_samples = 32 + num_actors_per_sample = 32 + + max_runtime = 2600 + + config = { + "tree_method": "approx", + "objective": "binary:logistic", + "eval_metric": ["logloss", "error"], + "eta": tune.loguniform(1e-4, 1e-1), + "subsample": tune.uniform(0.5, 1.0), + "max_depth": 4 + } + + start_time = time.monotonic() + tune.run( + tune.with_parameters( + xgboost_train, + num_actors=num_actors_per_sample, + num_boost_round=100), + config=config, + num_samples=num_samples) + time_taken = time.monotonic() - start_time + + assert time_taken < max_runtime, \ + f"The {name} test took {time_taken:.2f} seconds, but should not " \ + f"have exceeded {max_runtime:.2f} seconds. Test failed. \n\n" \ + f"--- FAILED: {name.upper()} ::: " \ + f"{time_taken:.2f} > {max_runtime:.2f} ---" + + print(f"The {name} test took {time_taken:.2f} seconds, which " + f"is below the budget of {max_runtime:.2f} seconds. " + f"Test successful. \n\n" + f"--- PASSED: {name.upper()} ::: " + f"{time_taken:.2f} <= {max_runtime:.2f} ---") + + +if __name__ == "__main__": + main()