[tune] add scalability release tests (#13986)

* Add scalability tests

* Network overhead cluster

* Update xgboost tests

* Document release tests

* Don't raise on failed trial

* Update to multi node yamls

* Update yamls

* Revert xgboost test changes

* Fix import

* Update release/tune_tests/scalability_tests/workloads/test_bookkeeping_overhead.py

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* Pass aws credentials (WIP)

* Update durable trainable example

* Update xgboost sweep

* Change xgboost scope, fix durable trainable stop condition

* Fix max depth to limit total test length

* Add cluster information to test descriptions. Update release checklist/process docs

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Kai Fricke
2021-02-10 17:16:31 +01:00
committed by GitHub
parent 81e7434091
commit 1ef2a6790c
21 changed files with 896 additions and 100 deletions
+8 -2
View File
@@ -62,8 +62,14 @@ This checklist is meant to be used in conjunction with the RELEASE_PROCESS.rst d
- [ ] K8s operator test
- [ ] Data processing tests
- [ ] streaming_shuffle
- [x] Tune tests
- [x] ignore for now
- [ ] Tune tests
- [ ] test_bookkeeping_overhead
- [x] test_result_throughput_cluster (ignore final time)
- [x] test_result_throughput_single_node (ignore final time)
- [x] test_network_overhead (ignore final time)
- [ ] test_long_running_large_checkpoints
- [ ] test_xgboost_sweep
- [ ] test_durable_trainable
- [ ] XGBoost Tests
- [ ] distributed_api_test
- [ ] train_small
+8 -2
View File
@@ -167,8 +167,14 @@ is generally the easiest way to run release tests.
General Ray Tune functionality is implicitly tested via RLLib and XGBoost release tests.
We are in the process of introducing scalability envelopes for Ray Tune.
This is an ongoing effort and will only be introduced in the next release.
For now, **you can ignore the tune_tests directory**.
Of the seven existing tests, three are currently not reaching their target time.
These three tests (test_result_throughput_cluster, test_result_throughput_single_node, and
test_network_overhead) are marked in the release checklist and don't have to be run at this time.
The other release tests are expected to run through without errors and to pass within a pre-specified time.
The time is checked in the test function and the output will let you know if a run was fast enough and
thus passed the test.
10. **XGBoost release tests**
@@ -1,31 +0,0 @@
cluster_name: ray-tune-scalability-tests
min_workers: 15
max_workers: 15
idle_timeout_minutes: 15
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
auth:
ssh_user: ubuntu
head_node:
# 64 CPUs
InstanceType: m5.16xlarge
worker_nodes:
# 64 CPUs
InstanceType: m5.16xlarge
setup_commands:
- pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl
@@ -0,0 +1,47 @@
cluster_name: ray-tune-scalability-tests-16x2
max_workers: 15
upscaling_speed: 15
idle_timeout_minutes: 0
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_2_ondemand:
node_config:
InstanceType: m5.large
resources: {"CPU": 2}
min_workers: 0
max_workers: 0
cpu_2_spot:
node_config:
InstanceType: m5.large
InstanceMarketOptions:
MarketType: spot
resources: {"CPU": 2}
min_workers: 15
max_workers: 15
auth:
ssh_user: ubuntu
head_node_type: cpu_2_ondemand
worker_default_node_type: cpu_2_spot
setup_commands:
- ray install-nightly
- pip install -U awscli
file_mounts: {
"~/release-automation-tune_scalability_tests": "."
}
@@ -0,0 +1,42 @@
cluster_name: ray-tune-scalability-tests-16x64
max_workers: 15
upscaling_speed: 15
idle_timeout_minutes: 0
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_64_ondemand:
node_config:
InstanceType: m5.16xlarge
resources: {"CPU": 64}
min_workers: 0
max_workers: 0
cpu_64_spot:
node_config:
InstanceType: m5.16xlarge
InstanceMarketOptions:
MarketType: spot
resources: {"CPU": 64}
min_workers: 15
max_workers: 15
auth:
ssh_user: ubuntu
head_node_type: cpu_64_ondemand
worker_default_node_type: cpu_64_spot
setup_commands:
- ray install-nightly
@@ -0,0 +1,53 @@
cluster_name: ray-tune-scalability-tests-16x64_data
max_workers: 16
upscaling_speed: 16
idle_timeout_minutes: 0
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_64_ondemand:
node_config:
InstanceType: m5.16xlarge
resources: {"CPU": 64}
min_workers: 0
max_workers: 0
cpu_64_spot:
node_config:
InstanceType: m5.16xlarge
InstanceMarketOptions:
MarketType: spot
resources: {"CPU": 64}
min_workers: 15
max_workers: 15
auth:
ssh_user: ubuntu
head_node_type: cpu_64_ondemand
worker_default_node_type: cpu_64_spot
file_mounts: {
"~/release-automation-tune_scalability_tests": "."
}
setup_commands:
- ray install-nightly
- pip install pytest xgboost_ray
- mkdir -p ~/data || true
- rm -rf ~/data/train.parquet || true
- rm -rf ~/data/test.parquet || true
- cp -R /tmp/ray_tmp_mount/release-automation-tune_scalability_tests ~/release-automation-tune_scalability_tests || echo "Copy failed"
- python ~/release-automation-tune_scalability_tests/create_test_data.py ~/data/train.parquet --seed 1234 --num-rows 40000000 --num-cols 40 --num-partitions 128 --num-classes 2
- python ~/release-automation-tune_scalability_tests/create_test_data.py ~/data/test.parquet --seed 1234 --num-rows 10000000 --num-cols 40 --num-partitions 128 --num-classes 2
@@ -0,0 +1,34 @@
cluster_name: ray-tune-scalability-tests-1x16
max_workers: 0
upscaling_speed: 1
idle_timeout_minutes: 0
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_4_ondemand:
node_config:
InstanceType: m5.xlarge
resources: {"CPU": 4}
min_workers: 0
max_workers: 0
auth:
ssh_user: ubuntu
head_node_type: cpu_4_ondemand
worker_default_node_type: cpu_4_ondemand
setup_commands:
- ray install-nightly
@@ -0,0 +1,40 @@
cluster_name: ray-tune-scalability-tests-1x32_hd
max_workers: 0
upscaling_speed: 1
idle_timeout_minutes: 0
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_32_hd_ondemand:
node_config:
InstanceType: m5.8xlarge
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 160
resources: {"CPU": 32} # 128 GB memory
min_workers: 0
max_workers: 0
auth:
ssh_user: ubuntu
head_node_type: cpu_32_hd_ondemand
worker_default_node_type: cpu_32_hd_ondemand
setup_commands:
- ray install-nightly
@@ -0,0 +1,34 @@
cluster_name: ray-tune-scalability-tests-1x96
max_workers: 0
upscaling_speed: 1
idle_timeout_minutes: 0
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_96_ondemand:
node_config:
InstanceType: m5.24xlarge
resources: {"CPU": 96}
min_workers: 0
max_workers: 0
auth:
ssh_user: ubuntu
head_node_type: cpu_96_ondemand
worker_default_node_type: cpu_96_ondemand
setup_commands:
- ray install-nightly
@@ -0,0 +1,42 @@
cluster_name: ray-tune-scalability-tests-200x2
max_workers: 199
upscaling_speed: 199
idle_timeout_minutes: 0
docker:
image: anyscale/ray:nightly
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_2_ondemand:
node_config:
InstanceType: m5.large
resources: {"CPU": 2}
min_workers: 0
max_workers: 0
cpu_2_spot:
node_config:
InstanceType: m5.large
InstanceMarketOptions:
MarketType: spot
resources: {"CPU": 2}
min_workers: 199
max_workers: 199
auth:
ssh_user: ubuntu
head_node_type: cpu_2_ondemand
worker_default_node_type: cpu_2_spot
setup_commands:
- ray install-nightly
@@ -0,0 +1,61 @@
import argparse
import numpy as np
import os
from xgboost_ray.tests.utils import create_parquet
if __name__ == "__main__":
if "OMP_NUM_THREADS" in os.environ:
del os.environ["OMP_NUM_THREADS"]
parser = argparse.ArgumentParser(description="Create fake data.")
parser.add_argument(
"filename", type=str, default="/data/parted.parquet/", help="ray/dask")
parser.add_argument(
"-r",
"--num-rows",
required=False,
type=int,
default=1e8,
help="num rows")
parser.add_argument(
"-p",
"--num-partitions",
required=False,
type=int,
default=100,
help="num partitions")
parser.add_argument(
"-c",
"--num-cols",
required=False,
type=int,
default=4,
help="num columns (features)")
parser.add_argument(
"-C",
"--num-classes",
required=False,
type=int,
default=2,
help="num classes")
parser.add_argument(
"-s",
"--seed",
required=False,
type=int,
default=1234,
help="random seed")
args = parser.parse_args()
if os.path.exists(args.filename):
print(f"File already exists: {args.filename}. Skipping creation.")
np.random.seed(args.seed)
create_parquet(
args.filename,
num_rows=int(args.num_rows),
num_partitions=int(args.num_partitions),
num_features=int(args.num_cols),
num_classes=int(args.num_classes))
+11 -11
View File
@@ -1,6 +1,7 @@
#!/usr/bin/env bash
ray_version=""
nodes=""
ray_version=""
commit=""
ray_branch=""
@@ -8,9 +9,11 @@ for i in "$@"
do
echo "$i"
case "$i" in
--nodes=*)
nodes="${i#*=}"
;;
--ray-version=*)
ray_version="${i#*=}"
;;
--commit=*)
commit="${i#*=}"
@@ -32,25 +35,22 @@ case "$i" in
esac
done
if [[ $ray_version == "" || $commit == "" || $ray_branch == "" ]]
if [[ $nodes == "" || $ray_version == "" || $commit == "" || $ray_branch == "" ]]
then
echo "Provide --ray-version, --commit, and --ray-branch"
echo "Provide --nodes --ray-version, --commit, and --ray-branch"
exit 1
fi
echo "nodes: $nodes"
echo "version: $ray_version"
echo "commit: $commit"
echo "branch: $ray_branch"
echo "workload: ignored"
wheel="https://s3-us-west-2.amazonaws.com/ray-wheels/$ray_branch/$commit/ray-$ray_version-cp37-cp37m-manylinux2014_x86_64.whl"
# wheel="https://s3-us-west-2.amazonaws.com/ray-wheels/$ray_branch/$commit/ray-$ray_version-cp37-cp37m-manylinux2014_x86_64.whl"
# pip install -U "$wheel"
pip install -U pip
pip install -U "$wheel"
pip install "ray[tune]" "ray"
pip install boto3==1.4.8 cython==0.29.0
if ! python "wait_cluster.py" 16 450; then
if ! python "wait_cluster.py" "$nodes" 600; then
echo "Cluster did not come up in time. Aborting test."
exit 1
fi
@@ -0,0 +1,153 @@
import os
import time
import numpy as np
import pickle
from ray import tune
from ray.tune.durable_trainable import DurableTrainable
class TestDurableTrainable(DurableTrainable):
def __init__(self, remote_checkpoint_dir, config, logger_creator=None):
self.setup_env()
super(TestDurableTrainable, self).__init__(
remote_checkpoint_dir,
config=config,
logger_creator=logger_creator)
def setup_env(self):
pass
def setup(self, config):
self._num_iters = int(config["num_iters"])
self._sleep_time = config["sleep_time"]
self._score = config["score"]
self._checkpoint_iters = config["checkpoint_iters"]
self._checkpoint_size_b = config["checkpoint_size_b"]
self._checkpoint_num_items = self._checkpoint_size_b // 8 # np.float64
self._iter = 0
def step(self):
if self._iter > 0:
time.sleep(self._sleep_time)
res = dict(score=self._iter + self._score)
if self._iter >= self._num_iters:
res["done"] = True
self._iter += 1
return res
def save_checkpoint(self, tmp_checkpoint_dir):
checkpoint_file = os.path.join(tmp_checkpoint_dir, "bogus.ckpt")
checkpoint_data = np.random.uniform(
0, 1, size=self._checkpoint_num_items)
with open(checkpoint_file, "wb") as fp:
pickle.dump(checkpoint_data, fp)
return checkpoint_file
def load_checkpoint(self, checkpoint):
pass
def function_trainable(config):
num_iters = int(config["num_iters"])
sleep_time = config["sleep_time"]
score = config["score"]
checkpoint_iters = config["checkpoint_iters"]
checkpoint_size_b = config["checkpoint_size_b"]
checkpoint_num_items = checkpoint_size_b // 8 # np.float64
for i in range(num_iters):
if checkpoint_iters >= 0 and checkpoint_size_b > 0 and \
i % checkpoint_iters == 0:
with tune.checkpoint_dir(step=i) as dir:
checkpoint_file = os.path.join(dir, "bogus.ckpt")
checkpoint_data = np.random.uniform(
0, 1, size=checkpoint_num_items)
with open(checkpoint_file, "wb") as fp:
pickle.dump(checkpoint_data, fp)
tune.report(score=i + score)
time.sleep(sleep_time)
def timed_tune_run(name: str,
num_samples: int,
results_per_second: int = 1,
trial_length_s: int = 1,
max_runtime: int = 300,
checkpoint_freq_s: int = -1,
checkpoint_size_b: int = 0,
**tune_kwargs):
durable = "sync_config" in tune_kwargs and \
tune_kwargs["sync_config"].upload_dir.startswith("s3://")
sleep_time = 1. / results_per_second
num_iters = int(trial_length_s / sleep_time)
checkpoint_iters = -1
if checkpoint_freq_s >= 0:
checkpoint_iters = int(checkpoint_freq_s / sleep_time)
config = {
"score": tune.uniform(0., 1.),
"num_iters": num_iters,
"sleep_time": sleep_time,
"checkpoint_iters": checkpoint_iters,
"checkpoint_size_b": checkpoint_size_b,
}
print(f"Starting benchmark with config: {config}")
run_kwargs = {"reuse_actors": True, "verbose": 2}
run_kwargs.update(tune_kwargs)
_train = function_trainable
aws_key_id = os.getenv("AWS_ACCESS_KEY_ID", "")
aws_secret = os.getenv("AWS_SECRET_ACCESS_KEY", "")
aws_session = os.getenv("AWS_SESSION_TOKEN", "")
if durable:
class AwsDurableTrainable(TestDurableTrainable):
AWS_ACCESS_KEY_ID = aws_key_id
AWS_SECRET_ACCESS_KEY = aws_secret
AWS_SESSION_TOKEN = aws_session
def setup_env(self):
os.environ["AWS_ACCESS_KEY_ID"] = self.AWS_ACCESS_KEY_ID
os.environ[
"AWS_SECRET_ACCESS_KEY"] = self.AWS_SECRET_ACCESS_KEY
os.environ["AWS_SESSION_TOKEN"] = self.AWS_SESSION_TOKEN
_train = AwsDurableTrainable
run_kwargs["checkpoint_freq"] = checkpoint_iters
start_time = time.monotonic()
tune.run(
_train,
config=config,
num_samples=num_samples,
raise_on_failed_trial=False,
**run_kwargs)
time_taken = time.monotonic() - start_time
assert time_taken < max_runtime, \
f"The {name} test took {time_taken:.2f} seconds, but should not " \
f"have exceeded {max_runtime:.2f} seconds. Test failed. \n\n" \
f"--- FAILED: {name.upper()} ::: " \
f"{time_taken:.2f} > {max_runtime:.2f} ---"
print(f"The {name} test took {time_taken:.2f} seconds, which "
f"is below the budget of {max_runtime:.2f} seconds. "
f"Test successful. \n\n"
f"--- PASSED: {name.upper()} ::: "
f"{time_taken:.2f} <= {max_runtime:.2f} ---")
@@ -0,0 +1,42 @@
"""Bookkeeping overhead (1 node, 10k trials)
In this run, we will start a large number of trials (10k) that take just a
second to run. We thus measure overhead that comes with dealing with a
large number of trials, e.g. experiment checkpointing.
Cluster: cluster_1x16.yaml
Test owner: krfricke
Acceptance criteria: Should run faster than 800 seconds.
Theoretical minimum time: 10000/16 = 625 seconds
"""
import os
import ray
from _trainable import timed_tune_run
def main():
os.environ["TUNE_GLOBAL_CHECKPOINT_S"] = "100" # Tweak
ray.init(address="auto")
num_samples = 10000
results_per_second = 1
trial_length_s = 1
max_runtime = 800
timed_tune_run(
name="bookkeeping overhead",
num_samples=num_samples,
results_per_second=results_per_second,
trial_length_s=trial_length_s,
max_runtime=max_runtime)
if __name__ == "__main__":
main()
@@ -0,0 +1,47 @@
"""Durable trainable (16 trials, checkpoint to cloud)
In this run, we will start 16 trials on a cluster. The trials create
10 MB checkpoints every 10 seconds and should only keep 2 of these. This test
ensures that durable checkpoints don't slow down experiment progress too much.
Cluster: cluster_16x2.yaml
Test owner: krfricke
Acceptance criteria: Should run faster than 500 seconds.
Theoretical minimum time: 300 seconds
"""
import ray
from ray import tune
from _trainable import timed_tune_run
def main():
ray.init(address="auto")
num_samples = 16
results_per_second = 10 / 60
trial_length_s = 300
max_runtime = 500
timed_tune_run(
name="durable trainable",
num_samples=num_samples,
results_per_second=results_per_second,
trial_length_s=trial_length_s,
max_runtime=max_runtime,
checkpoint_freq_s=10, # Once every 10 seconds
checkpoint_size_b=int(10 * 1000**2), # 10 MB
keep_checkpoints_num=2,
resources_per_trial={"cpu": 2},
sync_config=tune.SyncConfig(
sync_to_driver=False,
upload_dir="s3://ray-tune-scalability-test/durable/",
))
if __name__ == "__main__":
main()
@@ -0,0 +1,44 @@
"""Large checkpoints in long running trials (16 trials, 4 GB checkpoints).
In this run, we will start 16 trials on a single node. The trials create
4 GB checkpoints every 15 minutes and should only keep 2 of these. This test
ensures that handling large checkpoints don't lead to much overhead.
Cluster: cluster_1x32_hd.yaml
Test owner: krfricke
Acceptance criteria: Should run faster than 90,000 seconds.
Theoretical minimum time: 86,400 seconds
"""
import ray
from ray import tune
from _trainable import timed_tune_run
def main():
ray.init(address="auto")
num_samples = 16
results_per_second = 1 / 60
trial_length_s = 86400
max_runtime = 90000
timed_tune_run(
name="long running large checkpoints",
num_samples=num_samples,
results_per_second=results_per_second,
trial_length_s=trial_length_s,
max_runtime=max_runtime,
checkpoint_freq_s=900, # Once every 15 minutes
checkpoint_size_b=int(3.75 * 1000**3),
keep_checkpoints_num=2, # 2 * 16 * 4 = 128 GB
resources_per_trial={"cpu": 1},
sync_config=tune.SyncConfig(sync_to_driver=True))
if __name__ == "__main__":
main()
@@ -0,0 +1,41 @@
"""Networking overhead (200 trials on 200 nodes)
In this run, we will start 200 trials and run them on 200 different nodes.
This test will thus measure the overhead that comes with network communication
and specifically log synchronization.
Cluster: cluster_200x2.yaml
Test owner: krfricke
Acceptance criteria: Should run faster than 500 seconds.
Theoretical minimum time: 300 seconds
"""
import ray
from ray import tune
from _trainable import timed_tune_run
def main():
ray.init(address="auto")
num_samples = 200
results_per_second = 1
trial_length_s = 300
max_runtime = 500
timed_tune_run(
name="result network overhead",
num_samples=num_samples,
results_per_second=results_per_second,
trial_length_s=trial_length_s,
max_runtime=max_runtime,
resources_per_trial={"cpu": 2}, # One per node
sync_config=tune.SyncConfig(sync_to_driver=True))
if __name__ == "__main__":
main()
@@ -1,54 +0,0 @@
import time
import ray
from ray import tune
from ray.tune.cluster_info import is_ray_cluster
def my_naive_trainable(config):
for i in range(int(config["num_iters"])):
tune.report(score=i + config["score"])
time.sleep(config["sleep_time"])
def main():
ray.init(address="auto")
num_samples = 1000
sleep_time = 0.1
num_iters = 300
expected_run_time = num_iters * sleep_time
# Allow minimum of 20 % overhead (or 10 seconds for short runs)
expected_run_time += max(expected_run_time * 0.2, 10.)
if is_ray_cluster():
# Add constant overhead for SSH connection
expected_run_time += 0.3 * num_samples
start_time = time.time()
tune.run(
my_naive_trainable,
config={
"score": tune.uniform(0., 1.),
"num_iters": num_iters,
"sleep_time": sleep_time
},
reuse_actors=True,
verbose=2,
num_samples=num_samples)
time_taken = time.time() - start_time
assert time_taken < expected_run_time, \
f"The buffering test took {time_taken:.2f} seconds, but should not " \
f"have exceeded {expected_run_time:.2f} seconds. Test failed."
print(f"The buffering test took {time_taken:.2f} seconds, which "
f"is below the budget of {expected_run_time:.2f} seconds. "
f"Test successful.")
if __name__ == "__main__":
main()
@@ -0,0 +1,49 @@
"""Result throughput on a cluster
In this run, we will start 1000 trials concurrently that report often
(10 results per second). We thus measure the amount of overhead incurred when
dealing with a large number of results from distributed trials.
Cluster: cluster_16x64.yaml
Test owner: krfricke
Acceptance criteria: Should run faster than 120 seconds.
Theoretical minimum time: 100 seconds
"""
import os
import ray
from ray import tune
from ray.tune.cluster_info import is_ray_cluster
from _trainable import timed_tune_run
def main():
os.environ["TUNE_DISABLE_AUTO_CALLBACK_LOGGERS"] = "1" # Tweak
ray.init(address="auto")
num_samples = 1000
results_per_second = 10
trial_length_s = 100
max_runtime = 120
if is_ray_cluster():
# Add constant overhead for SSH connection
max_runtime = 120
timed_tune_run(
name="result throughput cluster",
num_samples=num_samples,
results_per_second=results_per_second,
trial_length_s=trial_length_s,
max_runtime=max_runtime,
sync_config=tune.SyncConfig(sync_to_driver=False)) # Tweak!
if __name__ == "__main__":
main()
@@ -0,0 +1,42 @@
"""Result throughput on a single node
In this run, we will start 96 trials concurrently that report very often
(500 results per second). We thus measure the amount of overhead incurred when
dealing with a large number of results.
Cluster: cluster_1x96.yaml
Test owner: krfricke
Acceptance criteria: Should run faster than 120 seconds.
Theoretical minimum time: 100 seconds
"""
import os
import ray
from _trainable import timed_tune_run
def main():
os.environ["TUNE_DISABLE_AUTO_CALLBACK_LOGGERS"] = "1" # Tweak
ray.init(address="auto")
num_samples = 96
results_per_second = 500
trial_length_s = 100
max_runtime = 120
timed_tune_run(
name="result throughput single node",
num_samples=num_samples,
results_per_second=results_per_second,
trial_length_s=trial_length_s,
max_runtime=max_runtime)
if __name__ == "__main__":
main()
@@ -0,0 +1,98 @@
"""Large-scale XGBoost parameter sweep
In this run, we will start 32 trials of 32 actors each running distributed
XGBoost training. This test is more about making sure that the run succeeds
than about total runtime. However, it is expected that this is faster than
1 hour.
We fix the max_depth to 4 and the number of boosting rounds to 100. The
fastest observed training time for 32 actors (1 CPU each) was about 2000
seconds. We allow up to 10 minutes of slack, so aim for 2600 seconds total
tuning time.
Cluster: cluster_16x64_data.yaml
Test owner: krfricke
Acceptance criteria: Should run faster than 2600 seconds. Should run without
errors.
"""
import os
import time
import ray
from ray import tune
from xgboost_ray import train, RayParams, RayDMatrix
def xgboost_train(config, num_actors=128, num_boost_round=200):
train_set = RayDMatrix(
os.path.expanduser("~/data/train.parquet"), "labels")
test_set = RayDMatrix(os.path.expanduser("~/data/test.parquet"), "labels")
evals_result = {}
bst = train(
params=config,
dtrain=train_set,
evals=[(test_set, "eval")],
evals_result=evals_result,
ray_params=RayParams(
max_actor_restarts=1,
gpus_per_actor=0,
cpus_per_actor=1,
num_actors=num_actors),
verbose_eval=False,
num_boost_round=num_boost_round)
model_path = "tuned.xgb"
bst.save_model(model_path)
print("Final validation error: {:.4f}".format(
evals_result["eval"]["error"][-1]))
def main():
name = "large xgboost sweep"
ray.init(address="auto")
num_samples = 32
num_actors_per_sample = 32
max_runtime = 2600
config = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": 4
}
start_time = time.monotonic()
tune.run(
tune.with_parameters(
xgboost_train,
num_actors=num_actors_per_sample,
num_boost_round=100),
config=config,
num_samples=num_samples)
time_taken = time.monotonic() - start_time
assert time_taken < max_runtime, \
f"The {name} test took {time_taken:.2f} seconds, but should not " \
f"have exceeded {max_runtime:.2f} seconds. Test failed. \n\n" \
f"--- FAILED: {name.upper()} ::: " \
f"{time_taken:.2f} > {max_runtime:.2f} ---"
print(f"The {name} test took {time_taken:.2f} seconds, which "
f"is below the budget of {max_runtime:.2f} seconds. "
f"Test successful. \n\n"
f"--- PASSED: {name.upper()} ::: "
f"{time_taken:.2f} <= {max_runtime:.2f} ---")
if __name__ == "__main__":
main()