[xgboost] Add XGBoost release tests (#13456)

* Add XGBoost release tests

* Add more xgboost release tests

* Use failure state manager

* Add release test documentation

* Fix wording

* Automate fault tolerance tests
This commit is contained in:
Kai Fricke
2021-01-20 18:40:23 +01:00
committed by GitHub
parent e6412efdf5
commit 8804758409
17 changed files with 852 additions and 0 deletions
@@ -0,0 +1,37 @@
cluster_name: ray-xgboost-release-cpu-moderate
min_workers: 31
max_workers: 31
initial_workers: 31
target_utilization_fraction: 0.8
idle_timeout_minutes: 15
docker:
image: anyscale/ray:latest
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
auth:
ssh_user: ubuntu
head_node:
# 64 CPUs
InstanceType: m5.xlarge
worker_nodes:
# 64 CPUs
InstanceType: m5.xlarge
setup_commands:
- pip install pytest xgboost_ray
- sudo mkdir -p /data || true
- sudo chown ray:1000 /data || true
- rm -rf /data/classification.parquet || true
- python ~/release-automation-xgboost_tests/create_test_data.py /data/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2
@@ -0,0 +1,37 @@
cluster_name: ray-xgboost-release-cpu-small
min_workers: 3
max_workers: 3
initial_workers: 3
target_utilization_fraction: 0.8
idle_timeout_minutes: 15
docker:
image: anyscale/ray:latest
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
auth:
ssh_user: ubuntu
head_node:
# 64 CPUs
InstanceType: m5.xlarge
worker_nodes:
# 64 CPUs
InstanceType: m5.xlarge
setup_commands:
- pip install pytest xgboost_ray
- sudo mkdir -p /data || true
- sudo chown ray:1000 /data || true
- rm -rf /data/classification.parquet || true
- python ~/release-automation-xgboost_tests/create_test_data.py /data/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2
@@ -0,0 +1,37 @@
cluster_name: ray-xgboost-release-gpu-small
min_workers: 4
max_workers: 4
initial_workers: 4
target_utilization_fraction: 0.8
idle_timeout_minutes: 15
docker:
image: anyscale/ray:latest-gpu
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
auth:
ssh_user: ubuntu
head_node:
# 64 CPUs
InstanceType: m5.xlarge
worker_nodes:
# 64 CPUs
InstanceType: p2.xlarge
setup_commands:
- pip install pytest xgboost_ray
- sudo mkdir -p /data || true
- sudo chown ray:1000 /data || true
- rm -rf /data/classification.parquet || true
- python ~/release-automation-xgboost_tests/create_test_data.py /data/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2
+58
View File
@@ -0,0 +1,58 @@
import argparse
import numpy as np
import os
from xgboost_ray.tests.utils import create_parquet
if __name__ == "__main__":
if "OMP_NUM_THREADS" in os.environ:
del os.environ["OMP_NUM_THREADS"]
parser = argparse.ArgumentParser(description="Create fake data.")
parser.add_argument(
"filename", type=str, default="/data/parted.parquet/", help="ray/dask")
parser.add_argument(
"-r",
"--num-rows",
required=False,
type=int,
default=1e8,
help="num rows")
parser.add_argument(
"-p",
"--num-partitions",
required=False,
type=int,
default=100,
help="num partitions")
parser.add_argument(
"-c",
"--num-cols",
required=False,
type=int,
default=4,
help="num columns (features)")
parser.add_argument(
"-C",
"--num-classes",
required=False,
type=int,
default=2,
help="num classes")
parser.add_argument(
"-s",
"--seed",
required=False,
type=int,
default=1234,
help="random seed")
args = parser.parse_args()
np.random.seed(args.seed)
create_parquet(
args.filename,
num_rows=int(args.num_rows),
num_partitions=int(args.num_partitions),
num_features=int(args.num_cols),
num_classes=int(args.num_classes))
+2
View File
@@ -0,0 +1,2 @@
ray[tune]
xgboost_ray
+58
View File
@@ -0,0 +1,58 @@
#!/usr/bin/env bash
nodes=""
ray_version=""
commit=""
ray_branch=""
for i in "$@"
do
echo "$i"
case "$i" in
--nodes=*)
nodes="${i#*=}"
;;
--ray-version=*)
ray_version="${i#*=}"
;;
--commit=*)
commit="${i#*=}"
;;
--ray-branch=*)
ray_branch="${i#*=}"
;;
--workload=*)
workload="${i#*=}"
;;
--help)
usage
exit
;;
*)
echo "unknown arg, $i"
exit 1
;;
esac
done
if [[ $nodes == "" || $ray_version == "" || $commit == "" || $ray_branch == "" ]]
then
echo "Provide --nodes --ray-version, --commit, and --ray-branch"
exit 1
fi
echo "nodes: $nodes"
echo "version: $ray_version"
echo "commit: $commit"
echo "branch: $ray_branch"
echo "workload: ignored"
# wheel="https://s3-us-west-2.amazonaws.com/ray-wheels/$ray_branch/$commit/ray-$ray_version-cp37-cp37m-manylinux2014_x86_64.whl"
# pip install -U "$wheel"
if ! python "wait_cluster.py" "$nodes" 600; then
echo "Cluster did not come up in time. Aborting test."
exit 1
fi
python "workloads/$workload.py"
+49
View File
@@ -0,0 +1,49 @@
import argparse
import time
import ray
ray.init(address="auto")
parser = argparse.ArgumentParser()
parser.add_argument(
"num_nodes",
type=int,
help="Wait for this number of nodes (includes head)")
parser.add_argument(
"max_time_s", type=int, help="Wait for this number of seconds")
parser.add_argument(
"--feedback_interval_s",
type=int,
default=10,
help="Wait for this number of seconds")
args = parser.parse_args()
curr_nodes = 0
start = time.time()
next_feedback = start
max_time = start + args.max_time_s
while not curr_nodes >= args.num_nodes:
now = time.time()
if now >= max_time:
raise RuntimeError(
f"Maximum wait time reached, but only "
f"{curr_nodes}/{args.num_nodes} nodes came up. Aborting.")
if now >= next_feedback:
passed = now - start
print(f"Waiting for more nodes to come up: "
f"{curr_nodes}/{args.num_nodes} "
f"({passed:.0f} seconds passed)")
next_feedback = now + args.feedback_interval_s
time.sleep(5)
curr_nodes = len(ray.nodes())
passed = time.time() - start
print(f"Cluster is up: {curr_nodes}/{args.num_nodes} nodes online after "
f"{passed:.0f} seconds")
+94
View File
@@ -0,0 +1,94 @@
import glob
import os
import time
from xgboost_ray import train, RayDMatrix, RayFileType, \
RayDeviceQuantileDMatrix, RayParams
if "OMP_NUM_THREADS" in os.environ:
del os.environ["OMP_NUM_THREADS"]
def train_ray(path,
num_workers,
num_boost_rounds,
num_files=0,
regression=False,
use_gpu=False,
ray_params=None,
xgboost_params=None,
**kwargs):
if not os.path.exists(path):
raise ValueError(f"Path does not exist: {path}")
if num_files:
files = sorted(glob.glob(f"{path}/**/*.parquet"))
while num_files > len(files):
files = files + files
path = files[0:num_files]
use_device_matrix = False
if use_gpu:
try:
import cupy # noqa: F401
use_device_matrix = True
except ImportError:
use_device_matrix = False
if use_device_matrix:
dtrain = RayDeviceQuantileDMatrix(
path,
num_actors=num_workers,
label="labels",
ignore=["partition"],
filetype=RayFileType.PARQUET)
else:
dtrain = RayDMatrix(
path,
num_actors=num_workers,
label="labels",
ignore=["partition"],
filetype=RayFileType.PARQUET)
config = {"tree_method": "hist" if not use_gpu else "gpu_hist"}
if not regression:
# Classification
config.update({
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
})
else:
# Regression
config.update({
"objective": "reg:squarederror",
"eval_metric": ["logloss", "rmse"],
})
if xgboost_params:
config.update(xgboost_params)
start = time.time()
evals_result = {}
additional_results = {}
bst = train(
config,
dtrain,
evals_result=evals_result,
additional_results=additional_results,
num_boost_round=num_boost_rounds,
ray_params=ray_params or RayParams(
max_actor_restarts=2,
num_actors=num_workers,
cpus_per_actor=1,
gpus_per_actor=1 if not use_gpu else 1),
evals=[(dtrain, "train")],
**kwargs)
taken = time.time() - start
print(f"TRAIN TIME TAKEN: {taken:.2f} seconds")
bst.save_model("benchmark_{}.xgb".format("cpu" if not use_gpu else "gpu"))
print("Final training error: {:.4f}".format(
evals_result["train"]["error"][-1]))
return bst, additional_results, taken
@@ -0,0 +1,26 @@
"""Distributed XGBoost API test
This test runs unit tests on a distributed cluster. This will confirm that
XGBoost API features like custom metrics/objectives work with remote
trainables.
Test owner: krfricke
Acceptance criteria: Unit tests should pass (requires pytest).
"""
import ray
from xgboost_ray.tests.test_xgboost_api import XGBoostAPITest
class XGBoostDistributedAPITest(XGBoostAPITest):
def _init_ray(self):
if not ray.is_initialized():
ray.init(address="auto")
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", f"{__file__}::XGBoostDistributedAPITest"]))
@@ -0,0 +1,61 @@
"""Fault tolerance test (small cluster, elastic training)
In this run, two training actors will die after some time. It is expected that
in both cases xgboost_ray stops training, but continues right away with the
remaining three actors. Shortly after, the actors will be restarted and
re-integrated into the training loop. Training should finish with all four
actors.
Test owner: krfricke
Acceptance criteria: Should run through and report final results. Intermediate
output should show that training continues with fewer actors when an
actor died. The test will fail if elastic training didn't work.
Notes: This test seems to be somewhat flaky. This might be due to
race conditions in handling dead actors. This is likely a problem of
the xgboost_ray implementation and not of this test.
"""
import ray
from xgboost_ray import RayParams
from _train import train_ray
from ft_small_non_elastic import FailureState, FailureInjection, \
TrackingCallback
if __name__ == "__main__":
ray.init(address="auto")
failure_state = FailureState.remote()
ray_params = RayParams(
elastic_training=True,
max_failed_actors=2,
max_actor_restarts=3,
num_actors=4,
cpus_per_actor=4,
gpus_per_actor=0)
_, additional_results, _ = train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=200,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=None,
callbacks=[
TrackingCallback(),
FailureInjection(
id="first_fail", state=failure_state, ranks=[2], iteration=14),
FailureInjection(
id="second_fail", state=failure_state, ranks=[0], iteration=34)
])
actor_1_world_size = set(additional_results["callback_returns"][1])
assert 3 in actor_1_world_size, \
"No training with only 3 actors observed, but this was elastic " \
"training. Please check if additional actors died (e.g. via " \
"node failure), run test again, and report to test owner otherwise."
@@ -0,0 +1,113 @@
"""Fault tolerance test (small cluster, non-elastic training)
In this run, two training actors will die after some time. It is expected that
in both cases xgboost_ray stops training, restarts the dead actors, and
continues training with all four actors.
Test owner: krfricke
Acceptance criteria: Should run through and report final results. Intermediate
output should show that training halts wenn an actor dies and continues only
when all four actors are available again. The test will fail if fault
tolerance did not work correctly.
Notes: This test seems to be somewhat flaky. This might be due to
race conditions in handling dead actors. This is likely a problem of
the xgboost_ray implementation and not of this test.
"""
import os
import time
import ray
from ray.services import get_node_ip_address
from xgboost_ray import RayParams
from xgboost_ray.session import get_actor_rank, put_queue
from xgboost.callback import TrainingCallback
from xgboost.rabit import get_world_size
from _train import train_ray
@ray.remote
class FailureState:
def __init__(self):
self._failed_ids = set()
def set_failed(self, id):
if id in self._failed_ids:
return False
self._failed_ids.add(id)
return True
def has_failed(self, id):
return id in self._failed_ids
class FailureInjection(TrainingCallback):
def __init__(self, id, state, ranks, iteration, allow_ips=None):
self._id = id
self._state = state
self._ranks = ranks or []
self._iteration = iteration
self._allow_ips = allow_ips
super(FailureInjection).__init__()
def after_iteration(self, model, epoch, evals_log):
if self._allow_ips and get_node_ip_address() not in self._allow_ips:
return
if epoch == self._iteration:
rank = get_actor_rank()
if rank in self._ranks:
if not ray.get(self._state.has_failed.remote(id)):
success = ray.get(self._state.set_failed.remote(id))
if not success:
# Another rank is already about to fail
return
pid = os.getpid()
print(f"Killing process: {pid} for actor rank {rank}")
time.sleep(1)
os.kill(pid, 9)
class TrackingCallback(TrainingCallback):
def after_iteration(self, model, epoch, evals_log):
put_queue(get_world_size())
if __name__ == "__main__":
ray.init(address="auto")
failure_state = FailureState.remote()
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=4,
gpus_per_actor=0)
_, additional_results, _ = train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=200,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=None,
callbacks=[
TrackingCallback(),
FailureInjection(
id="first_fail", state=failure_state, ranks=[2], iteration=14),
FailureInjection(
id="second_fail", state=failure_state, ranks=[0], iteration=34)
])
actor_1_world_size = set(additional_results["callback_returns"][1])
assert len(actor_1_world_size) == 1 and 4 in actor_1_world_size, \
"Training with fewer than 4 actors observed, but this was " \
"non-elastic training. Please report to test owner."
@@ -0,0 +1,46 @@
"""Training on a GPU cluster.
This will train a small dataset on a distributed GPU cluster.
Test owner: krfricke
Acceptance criteria: Should run through and report final results.
Notes: The test will report output such as this:
```
[05:14:49] WARNING: ../src/gbm/gbtree.cc:350: Loading from a raw memory buffer
on CPU only machine. Changing tree_method to hist.
[05:14:49] WARNING: ../src/learner.cc:222: No visible GPU is found, setting
`gpu_id` to -1
```
This is _not_ an error. This is due to the checkpoints being loaded on the
XGBoost driver, and since the driver lives on the head node (which has no
GPU), XGBoost warns that it can't use the GPU. Training still happened using
the GPUs.
"""
import ray
from xgboost_ray import RayParams
from _train import train_ray
if __name__ == "__main__":
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=4,
gpus_per_actor=1)
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=25,
regression=False,
use_gpu=True,
ray_params=ray_params,
xgboost_params=None,
)
@@ -0,0 +1,33 @@
"""Moderate cluster training
This training run will start 32 workers on 32 nodes (including head node).
Test owner: krfricke
Acceptance criteria: Should run through and report final results.
"""
import ray
from xgboost_ray import RayParams
from _train import train_ray
if __name__ == "__main__":
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=32,
cpus_per_actor=4,
gpus_per_actor=0)
train_ray(
path="/data/classification.parquet",
num_workers=32,
num_boost_rounds=100,
num_files=128,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=None,
)
@@ -0,0 +1,33 @@
"""Small cluster training
This training run will start 4 workers on 4 nodes (including head node).
Test owner: krfricke
Acceptance criteria: Should run through and report final results.
"""
import ray
from xgboost_ray import RayParams
from _train import train_ray
if __name__ == "__main__":
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=4,
gpus_per_actor=0)
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=25,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=None,
)
@@ -0,0 +1,56 @@
"""Moderate Ray Tune run (32 trials, 4 actors).
This training run will start 32 Ray Tune trials, each starting 4 actors.
The cluster comprises 32 nodes.
Test owner: krfricke
Acceptance criteria: Should run through and report final results, as well
as the Ray Tune results table. No trials should error. All trials should
run in parallel.
"""
import ray
from ray import tune
from xgboost_ray import RayParams
from _train import train_ray
def train_wrapper(config):
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=1,
gpus_per_actor=0)
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=64,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=config,
)
if __name__ == "__main__":
search_space = {
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
ray.init(address="auto")
analysis = tune.run(
train_wrapper,
config=search_space,
num_samples=32,
resources_per_trial={
"cpu": 1,
"extra_cpu": 3
})
@@ -0,0 +1,56 @@
"""Moderate Ray Tune run (4 trials, 32 actors).
This training run will start 4 Ray Tune trials, each starting 32 actors.
The cluster comprises 32 nodes.
Test owner: krfricke
Acceptance criteria: Should run through and report final results, as well
as the Ray Tune results table. No trials should error. All trials should
run in parallel.
"""
import ray
from ray import tune
from xgboost_ray import RayParams
from _train import train_ray
def train_wrapper(config):
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=32,
cpus_per_actor=1,
gpus_per_actor=0)
train_ray(
path="/data/classification.parquet",
num_workers=32,
num_boost_rounds=100,
num_files=128,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=config,
)
if __name__ == "__main__":
search_space = {
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
ray.init(address="auto")
analysis = tune.run(
train_wrapper,
config=search_space,
num_samples=4,
resources_per_trial={
"cpu": 1,
"extra_cpu": 31
})
@@ -0,0 +1,56 @@
"""Small Ray Tune run (4 trials, 4 actors).
This training run will start 4 Ray Tune Trials, each starting 4 actors.
The cluster comprises 4 nodes.
Test owner: krfricke
Acceptance criteria: Should run through and report final results, as well
as the Ray Tune results table. No trials should error. All trials should
run in parallel.
"""
import ray
from ray import tune
from xgboost_ray import RayParams
from _train import train_ray
def train_wrapper(config):
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=1,
gpus_per_actor=0)
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=25,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=config,
)
if __name__ == "__main__":
search_space = {
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
ray.init(address="auto")
analysis = tune.run(
train_wrapper,
config=search_space,
num_samples=4,
resources_per_trial={
"cpu": 1,
"extra_cpu": 3
})