mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 21:38:18 +08:00
[tune] All examples to use ConcurrencyLimiter (#10662)
Co-authored-by: Kai Fricke <kai@anyscale.com>
This commit is contained in:
@@ -7,6 +7,7 @@ import time
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.schedulers import AsyncHyperBandScheduler
|
||||
from ray.tune.suggest import ConcurrencyLimiter
|
||||
from ray.tune.suggest.bayesopt import BayesOptSearch
|
||||
|
||||
|
||||
@@ -43,18 +44,18 @@ if __name__ == "__main__":
|
||||
"height": tune.uniform(-100, 100)
|
||||
}
|
||||
}
|
||||
algo = BayesOptSearch(
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
utility_kwargs={
|
||||
"kind": "ucb",
|
||||
"kappa": 2.5,
|
||||
"xi": 0.0
|
||||
})
|
||||
scheduler = AsyncHyperBandScheduler(metric="mean_loss", mode="min")
|
||||
algo = BayesOptSearch(utility_kwargs={
|
||||
"kind": "ucb",
|
||||
"kappa": 2.5,
|
||||
"xi": 0.0
|
||||
})
|
||||
algo = ConcurrencyLimiter(algo, max_concurrent=4)
|
||||
scheduler = AsyncHyperBandScheduler()
|
||||
tune.run(
|
||||
easy_objective,
|
||||
name="my_exp",
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
search_alg=algo,
|
||||
scheduler=scheduler,
|
||||
**tune_kwargs)
|
||||
|
||||
@@ -11,6 +11,7 @@ import time
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.suggest import ConcurrencyLimiter
|
||||
from ray.tune.schedulers import AsyncHyperBandScheduler
|
||||
from ray.tune.suggest.dragonfly import DragonflySearch
|
||||
|
||||
@@ -70,12 +71,14 @@ if __name__ == "__main__":
|
||||
optimizer="bandit",
|
||||
domain="euclidean",
|
||||
# space=space, # If you want to set the space manually
|
||||
metric="objective",
|
||||
mode="max")
|
||||
)
|
||||
df_search = ConcurrencyLimiter(df_search, max_concurrent=4)
|
||||
|
||||
scheduler = AsyncHyperBandScheduler(metric="objective", mode="max")
|
||||
scheduler = AsyncHyperBandScheduler()
|
||||
tune.run(
|
||||
objective,
|
||||
metric="objective",
|
||||
mode="max",
|
||||
name="dragonfly_search",
|
||||
search_alg=df_search,
|
||||
scheduler=scheduler,
|
||||
|
||||
@@ -3,16 +3,15 @@
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
from ray.tune import Trainable, run, sample_from
|
||||
from ray import tune
|
||||
from ray.tune.schedulers import HyperBandScheduler
|
||||
|
||||
|
||||
class MyTrainableClass(Trainable):
|
||||
class MyTrainableClass(tune.Trainable):
|
||||
"""Example agent whose learning curve is a random sigmoid.
|
||||
|
||||
The dummy hyperparameters "width" and "height" determine the slope and
|
||||
@@ -58,13 +57,14 @@ if __name__ == "__main__":
|
||||
mode="max",
|
||||
max_t=200)
|
||||
|
||||
run(MyTrainableClass,
|
||||
tune.run(
|
||||
MyTrainableClass,
|
||||
name="hyperband_test",
|
||||
num_samples=20,
|
||||
stop={"training_iteration": 1 if args.smoke_test else 99999},
|
||||
config={
|
||||
"width": sample_from(lambda spec: 10 + int(90 * random.random())),
|
||||
"height": sample_from(lambda spec: int(100 * random.random()))
|
||||
"width": tune.randint(10, 90),
|
||||
"height": tune.randint(0, 100)
|
||||
},
|
||||
scheduler=hyperband,
|
||||
fail_fast=True)
|
||||
|
||||
@@ -6,6 +6,7 @@ import time
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.suggest import ConcurrencyLimiter
|
||||
from ray.tune.schedulers import AsyncHyperBandScheduler
|
||||
from ray.tune.suggest.hyperopt import HyperOptSearch
|
||||
|
||||
@@ -58,8 +59,14 @@ if __name__ == "__main__":
|
||||
"activation": tune.choice(["relu", "tanh"])
|
||||
}
|
||||
}
|
||||
algo = HyperOptSearch(
|
||||
metric="mean_loss", mode="min", points_to_evaluate=current_best_params)
|
||||
scheduler = AsyncHyperBandScheduler(metric="mean_loss", mode="min")
|
||||
algo = HyperOptSearch(points_to_evaluate=current_best_params)
|
||||
algo = ConcurrencyLimiter(algo, max_concurrent=4)
|
||||
|
||||
scheduler = AsyncHyperBandScheduler()
|
||||
tune.run(
|
||||
easy_objective, search_alg=algo, scheduler=scheduler, **tune_kwargs)
|
||||
easy_objective,
|
||||
search_alg=algo,
|
||||
scheduler=scheduler,
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
**tune_kwargs)
|
||||
|
||||
@@ -44,6 +44,8 @@ if __name__ == "__main__":
|
||||
from ray.tune.schedulers import ASHAScheduler
|
||||
tune.run(
|
||||
train_breast_cancer,
|
||||
metric="binary_error",
|
||||
mode="min",
|
||||
config=config,
|
||||
num_samples=2,
|
||||
scheduler=ASHAScheduler(metric="binary_error", mode="min"))
|
||||
scheduler=ASHAScheduler())
|
||||
|
||||
@@ -9,7 +9,6 @@ start a MLFlow run inside the Trainable function/class.
|
||||
import mlflow
|
||||
from mlflow.tracking import MlflowClient
|
||||
import time
|
||||
import random
|
||||
|
||||
from ray import tune
|
||||
from ray.tune.logger import MLFLowLogger, DEFAULT_LOGGERS
|
||||
@@ -44,9 +43,8 @@ if __name__ == "__main__":
|
||||
"logger_config": {
|
||||
"mlflow_experiment_id": experiment_id,
|
||||
},
|
||||
"width": tune.sample_from(
|
||||
lambda spec: 10 + int(90 * random.random())),
|
||||
"height": tune.sample_from(lambda spec: int(100 * random.random()))
|
||||
"width": tune.randint(10, 100),
|
||||
"height": tune.randint(0, 100),
|
||||
})
|
||||
|
||||
df = mlflow.search_runs([experiment_id])
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# Original Code here:
|
||||
# https://github.com/pytorch/examples/blob/master/mnist/main.py
|
||||
import os
|
||||
import numpy as np
|
||||
import argparse
|
||||
from filelock import FileLock
|
||||
import torch
|
||||
@@ -89,7 +88,7 @@ def get_data_loaders():
|
||||
|
||||
|
||||
def train_mnist(config):
|
||||
use_cuda = config.get("use_gpu") and torch.cuda.is_available()
|
||||
use_cuda = torch.cuda.is_available()
|
||||
device = torch.device("cuda" if use_cuda else "cpu")
|
||||
train_loader, test_loader = get_data_loaders()
|
||||
model = ConvNet().to(device)
|
||||
@@ -100,6 +99,7 @@ def train_mnist(config):
|
||||
while True:
|
||||
train(model, optimizer, train_loader, device)
|
||||
acc = test(model, test_loader, device)
|
||||
# Set this to run Tune.
|
||||
tune.report(mean_accuracy=acc)
|
||||
|
||||
|
||||
@@ -120,10 +120,14 @@ if __name__ == "__main__":
|
||||
ray.init(address=args.ray_address)
|
||||
else:
|
||||
ray.init(num_cpus=2 if args.smoke_test else None)
|
||||
sched = AsyncHyperBandScheduler(
|
||||
time_attr="training_iteration", metric="mean_accuracy", mode="max")
|
||||
|
||||
# for early stopping
|
||||
sched = AsyncHyperBandScheduler()
|
||||
|
||||
analysis = tune.run(
|
||||
train_mnist,
|
||||
metric="mean_accuracy",
|
||||
mode="max",
|
||||
name="exp",
|
||||
scheduler=sched,
|
||||
stop={
|
||||
@@ -132,14 +136,12 @@ if __name__ == "__main__":
|
||||
},
|
||||
resources_per_trial={
|
||||
"cpu": 2,
|
||||
"gpu": int(args.cuda)
|
||||
"gpu": int(args.cuda) # set this for GPUs
|
||||
},
|
||||
num_samples=1 if args.smoke_test else 50,
|
||||
config={
|
||||
"lr": tune.sample_from(lambda spec: 10**(-10 * np.random.rand())),
|
||||
"lr": tune.loguniform(1e-4, 1e-2),
|
||||
"momentum": tune.uniform(0.1, 0.9),
|
||||
"use_gpu": int(args.cuda)
|
||||
})
|
||||
|
||||
print("Best config is:",
|
||||
analysis.get_best_config(metric="mean_accuracy", mode="max"))
|
||||
print("Best config is:", analysis.best_config)
|
||||
|
||||
@@ -6,6 +6,7 @@ import time
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.suggest import ConcurrencyLimiter
|
||||
from ray.tune.schedulers import AsyncHyperBandScheduler
|
||||
from ray.tune.suggest.nevergrad import NevergradSearch
|
||||
|
||||
@@ -57,13 +58,15 @@ if __name__ == "__main__":
|
||||
algo = NevergradSearch(
|
||||
optimizer=ng.optimizers.OnePlusOne,
|
||||
# space=space, # If you want to set the space manually
|
||||
metric="mean_loss",
|
||||
mode="min")
|
||||
)
|
||||
algo = ConcurrencyLimiter(algo, max_concurrent=4)
|
||||
|
||||
scheduler = AsyncHyperBandScheduler(metric="mean_loss", mode="min")
|
||||
scheduler = AsyncHyperBandScheduler()
|
||||
|
||||
tune.run(
|
||||
easy_objective,
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
name="nevergrad",
|
||||
search_alg=algo,
|
||||
scheduler=scheduler,
|
||||
|
||||
@@ -6,6 +6,7 @@ import time
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.suggest import ConcurrencyLimiter
|
||||
from ray.tune.schedulers import AsyncHyperBandScheduler
|
||||
from ray.tune.suggest.optuna import OptunaSearch
|
||||
|
||||
@@ -45,7 +46,13 @@ if __name__ == "__main__":
|
||||
"activation": tune.choice(["relu", "tanh"])
|
||||
}
|
||||
}
|
||||
algo = OptunaSearch(metric="mean_loss", mode="min")
|
||||
scheduler = AsyncHyperBandScheduler(metric="mean_loss", mode="min")
|
||||
algo = OptunaSearch()
|
||||
algo = ConcurrencyLimiter(algo, max_concurrent=4)
|
||||
scheduler = AsyncHyperBandScheduler()
|
||||
tune.run(
|
||||
easy_objective, search_alg=algo, scheduler=scheduler, **tune_kwargs)
|
||||
easy_objective,
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
search_alg=algo,
|
||||
scheduler=scheduler,
|
||||
**tune_kwargs)
|
||||
|
||||
@@ -6,6 +6,7 @@ import time
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.suggest import ConcurrencyLimiter
|
||||
from ray.tune.schedulers import AsyncHyperBandScheduler
|
||||
from ray.tune.suggest.skopt import SkOptSearch
|
||||
|
||||
@@ -59,15 +60,16 @@ if __name__ == "__main__":
|
||||
algo = SkOptSearch(
|
||||
# parameter_names=space.keys(), # If you want to set the space
|
||||
# parameter_ranges=space.values(), # If you want to set the space
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
points_to_evaluate=previously_run_params,
|
||||
evaluated_rewards=known_rewards)
|
||||
algo = ConcurrencyLimiter(algo, max_concurrent=4)
|
||||
|
||||
scheduler = AsyncHyperBandScheduler(metric="mean_loss", mode="min")
|
||||
scheduler = AsyncHyperBandScheduler()
|
||||
|
||||
tune.run(
|
||||
easy_objective,
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
name="skopt_exp_with_warmstart",
|
||||
search_alg=algo,
|
||||
scheduler=scheduler,
|
||||
|
||||
@@ -154,8 +154,8 @@ def train_cifar10(config):
|
||||
with ag.record():
|
||||
outputs = [finetune_net(X) for X in data]
|
||||
loss = [L(yhat, y) for yhat, y in zip(outputs, label)]
|
||||
for l in loss:
|
||||
l.backward()
|
||||
for ls in loss:
|
||||
ls.backward()
|
||||
|
||||
trainer.step(batch_size)
|
||||
mx.nd.waitall()
|
||||
@@ -170,7 +170,7 @@ def train_cifar10(config):
|
||||
outputs = [finetune_net(X) for X in data]
|
||||
loss = [L(yhat, y) for yhat, y in zip(outputs, label)]
|
||||
|
||||
test_loss += sum(l.mean().asscalar() for l in loss) / len(loss)
|
||||
test_loss += sum(ls.mean().asscalar() for ls in loss) / len(loss)
|
||||
metric.update(label, outputs)
|
||||
|
||||
_, test_acc = metric.get()
|
||||
@@ -194,11 +194,7 @@ if __name__ == "__main__":
|
||||
sched = FIFOScheduler()
|
||||
elif args.scheduler == "asynchyperband":
|
||||
sched = AsyncHyperBandScheduler(
|
||||
time_attr="training_iteration",
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
max_t=400,
|
||||
grace_period=60)
|
||||
metric="mean_loss", mode="min", max_t=400, grace_period=60)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
tune.run(
|
||||
|
||||
@@ -191,6 +191,7 @@ class _Bracket():
|
||||
return action
|
||||
|
||||
def debug_str(self):
|
||||
# TODO: fix up the output for this
|
||||
iters = " | ".join([
|
||||
"Iter {:.3f}: {}".format(milestone, self.cutoff(recorded))
|
||||
for milestone, recorded in self._rungs
|
||||
|
||||
@@ -167,3 +167,6 @@ class Repeater(Searcher):
|
||||
|
||||
def set_state(self, state):
|
||||
self.__dict__.update(state)
|
||||
|
||||
def set_search_properties(self, metric, mode, config):
|
||||
return self.searcher.set_search_properties(metric, mode, config)
|
||||
|
||||
@@ -366,3 +366,6 @@ class ConcurrencyLimiter(Searcher):
|
||||
|
||||
def on_unpause(self, trial_id):
|
||||
self.searcher.on_unpause(trial_id)
|
||||
|
||||
def set_search_properties(self, metric, mode, config):
|
||||
return self.searcher.set_search_properties(metric, mode, config)
|
||||
|
||||
Reference in New Issue
Block a user