[tune] Add AutoML algorithm of GeneticSearcher (#2699)

Add new search algorithm (genetic) along with the base framework of the searcher (which performs some basic jobs such as logging, recording and organizing in our project).
Note that this is the initial commit. In the following days, we will add example, UT, and other refinements.
This commit is contained in:
old-bear
2018-09-13 00:17:04 +08:00
committed by Richard Liaw
parent bee743c152
commit f3c1194be3
8 changed files with 836 additions and 0 deletions
+17
View File
@@ -0,0 +1,17 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.tune.automl.genetic_searcher import GeneticSearch
from ray.tune.automl.search_policy import GridSearch, RandomSearch
from ray.tune.automl.search_space import SearchSpace, \
ContinuousSpace, DiscreteSpace
__all__ = [
"ContinuousSpace",
"DiscreteSpace",
"SearchSpace",
"GridSearch",
"RandomSearch",
"GeneticSearch",
]
+258
View File
@@ -0,0 +1,258 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import numpy as np
from ray.tune.automl.search_policy import AutoMLSearcher
logger = logging.getLogger(__name__)
LOGGING_PREFIX = "[GENETIC SEARCH] "
class GeneticSearch(AutoMLSearcher):
"""Implement the genetic search.
Keep a collection of top-K parameter permutations as base genes,
then apply selection, crossover, and mutation to them to generate
new genes (a.k.a new generation). Hopefully, the performance of
the top population would increase generation by generation.
"""
def __init__(self,
search_space,
reward_attr,
max_generation=2,
population_size=10,
population_decay=0.95,
keep_top_ratio=0.2,
selection_bound=0.4,
crossover_bound=0.4):
"""
Initialize GeneticSearcher.
Args:
search_space (SearchSpace): The space to search.
reward_attr: The attribute name of the reward in the result.
max_generation: Max iteration number of genetic search.
population_size: Number of trials of the initial generation.
population_decay: Decay ratio of population size for the
next generation.
keep_top_ratio: Ratio of the top performance population.
selection_bound: Threshold for performing selection.
crossover_bound: Threshold for performing crossover.
"""
super(GeneticSearch, self).__init__(search_space, reward_attr)
self._cur_generation = 1
self._max_generation = max_generation
self._population_size = population_size
self._population_decay = population_decay
self._keep_top_ratio = keep_top_ratio
self._selection_bound = selection_bound
self._crossover_bound = crossover_bound
self._cur_config_list = []
self._cur_encoding_list = []
for _ in range(population_size):
one_hot = self.search_space.generate_random_one_hot_encoding()
self._cur_encoding_list.append(one_hot)
self._cur_config_list.append(
self.search_space.apply_one_hot_encoding(one_hot))
def _select(self):
population_size = len(self._cur_config_list)
logger.info(
LOGGING_PREFIX + "Generate the %sth generation, population=%s",
self._cur_generation, population_size)
return self._cur_config_list, self._cur_encoding_list
def _feedback(self, trials):
self._cur_generation += 1
if self._cur_generation > self._max_generation:
return AutoMLSearcher.TERMINATE
sorted_trials = sorted(
trials,
key=lambda t: t.best_result[self.reward_attr],
reverse=True)
self._cur_encoding_list = self._next_generation(sorted_trials)
self._cur_config_list = []
for one_hot in self._cur_encoding_list:
self._cur_config_list.append(
self.search_space.apply_one_hot_encoding(one_hot))
return AutoMLSearcher.CONTINUE
def _next_generation(self, sorted_trials):
"""Generate genes (encodings) for the next generation.
Use the top K (_keep_top_ratio) trials of the last generation
as candidates to generate the next generation. The action could
be selection, crossover and mutation according corresponding
ratio (_selection_bound, _crossover_bound).
Args:
sorted_trials: List of finished trials with top
performance ones first.
Returns:
A list of new genes (encodings)
"""
candidate = []
next_generation = []
num_population = self._next_population_size(len(sorted_trials))
top_num = int(max(num_population * self._keep_top_ratio, 2))
for i in range(top_num):
candidate.append(sorted_trials[i].extra_arg)
next_generation.append(sorted_trials[i].extra_arg)
for i in range(top_num, num_population):
flip_coin = np.random.uniform()
if flip_coin < self._selection_bound:
next_generation.append(GeneticSearch._selection(candidate))
else:
if flip_coin < self._selection_bound + self._crossover_bound:
next_generation.append(GeneticSearch._crossover(candidate))
else:
next_generation.append(GeneticSearch._mutation(candidate))
return next_generation
def _next_population_size(self, last_population_size):
"""Calculate the population size of the next generation.
Intuitively, the population should decay after each iteration since
it should converge. It can also decrease the total resource required.
Args:
last_population_size: The last population size.
Returns:
The new population size.
"""
# TODO: implement an generic resource allocate algorithm.
return int(max(last_population_size * self._population_decay, 3))
@staticmethod
def _selection(candidate):
"""Perform selection action to candidates.
For example, new gene = sample_1 + the 5th bit of sample2.
Args:
candidate: List of candidate genes (encodings).
Examples:
>>> # Genes that represent 3 parameters
>>> gene1 = np.array([[0, 0, 1], [0, 1], [1, 0]])
>>> gene2 = np.array([[0, 1, 0], [1, 0], [0, 1]])
>>> new_gene = _selection([gene1, gene2])
>>> # new_gene could be gene1 overwritten with the
>>> # 2nd parameter of gene2
>>> # in which case:
>>> # new_gene[0] = gene1[0]
>>> # new_gene[1] = gene2[1]
>>> # new_gene[2] = gene1[0]
Returns:
New gene (encoding)
"""
sample_index1 = np.random.choice(len(candidate))
sample_index2 = np.random.choice(len(candidate))
sample_1 = candidate[sample_index1]
sample_2 = candidate[sample_index2]
select_index = np.random.choice(len(sample_1))
logger.info(
LOGGING_PREFIX + "Perform selection from %sth to %sth at index=%s",
sample_index2, sample_index1, select_index)
next_gen = []
for i in range(len(sample_1)):
if i is select_index:
next_gen.append(sample_2[i])
else:
next_gen.append(sample_1[i])
return next_gen
@staticmethod
def _crossover(candidate):
"""Perform crossover action to candidates.
For example, new gene = 60% sample_1 + 40% sample_2.
Args:
candidate: List of candidate genes (encodings).
Examples:
>>> # Genes that represent 3 parameters
>>> gene1 = np.array([[0, 0, 1], [0, 1], [1, 0]])
>>> gene2 = np.array([[0, 1, 0], [1, 0], [0, 1]])
>>> new_gene = _crossover([gene1, gene2])
>>> # new_gene could be the first [n=1] parameters of
>>> # gene1 + the rest of gene2
>>> # in which case:
>>> # new_gene[0] = gene1[0]
>>> # new_gene[1] = gene2[1]
>>> # new_gene[2] = gene1[1]
Returns:
New gene (encoding)
"""
sample_index1 = np.random.choice(len(candidate))
sample_index2 = np.random.choice(len(candidate))
sample_1 = candidate[sample_index1]
sample_2 = candidate[sample_index2]
cross_index = int(len(sample_1) * np.random.uniform(low=0.3, high=0.7))
logger.info(
LOGGING_PREFIX +
"Perform crossover between %sth and %sth at index=%s",
sample_index1, sample_index2, cross_index)
next_gen = []
for i in range(len(sample_1)):
if i > cross_index:
next_gen.append(sample_2[i])
else:
next_gen.append(sample_1[i])
return next_gen
@staticmethod
def _mutation(candidate, rate=0.1):
"""Perform mutation action to candidates.
For example, randomly change 10% of original sample
Args:
candidate: List of candidate genes (encodings).
rate: Percentage of mutation bits
Examples:
>>> # Genes that represent 3 parameters
>>> gene1 = np.array([[0, 0, 1], [0, 1], [1, 0]])
>>> new_gene = _mutation([gene1])
>>> # new_gene could be the gene1 with the 3rd parameter changed
>>> # new_gene[0] = gene1[0]
>>> # new_gene[1] = gene1[1]
>>> # new_gene[2] = [0, 1] != gene1[2]
Returns:
New gene (encoding)
"""
sample_index = np.random.choice(len(candidate))
sample = candidate[sample_index]
idx_list = []
for i in range(int(max(len(sample) * rate, 1))):
idx = np.random.choice(len(sample))
idx_list.append(idx)
field = sample[idx] # one-hot encoding
field[np.argmax(field)] = 0
bit = np.random.choice(field.shape[0])
field[bit] = 1
logger.info(LOGGING_PREFIX + "Perform mutation on %sth at index=%s",
sample_index, str(idx_list))
return sample
+240
View File
@@ -0,0 +1,240 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import copy
import logging
from ray.tune.trial import Trial
from ray.tune.suggest import SearchAlgorithm
from ray.tune.experiment import convert_to_experiment_list
from ray.tune.suggest.variant_generator import generate_variants
from ray.tune.config_parser import make_parser, create_trial_from_spec
logger = logging.getLogger(__name__)
def deep_insert(path_list, value, config):
"""Inserts value into config by path, generating intermediate dictionaries.
Example:
>>> deep_insert(path.split("."), value, {})
"""
if len(path_list) > 1:
inside_config = config.setdefault(path_list[0], {})
deep_insert(path_list[1:], value, inside_config)
else:
config[path_list[0]] = value
class AutoMLSearcher(SearchAlgorithm):
"""Base class for AutoML search algorithm.
It works in a round-by-round way. For each experiment round,
it generates a bunch of parameter config permutations, submits
and keeps track of them. Once all of them finish, results will
be fed back to the algorithm as a whole.
"""
CONTINUE = "CONTINUE"
TERMINATE = "TERMINATE"
def __init__(self, search_space, reward_attr):
"""Initialize AutoMLSearcher.
Arguments:
search_space (SearchSpace): The space to search.
reward_attr: The attribute name of the reward in the result.
"""
# Pass experiment later to allow construction without this parameter
super(AutoMLSearcher, self).__init__()
self.search_space = search_space
self.reward_attr = reward_attr
self.experiment_list = []
self.best_trial = None
self._is_finished = False
self._parser = make_parser()
self._unfinished_count = 0
self._running_trials = {}
self._completed_trials = {}
self._iteration = 0
self._total_trial_num = 0
self._start_ts = 0
def add_configurations(self, experiments):
self.experiment_list = convert_to_experiment_list(experiments)
def get_best_trial(self):
"""Returns the Trial object with the best reward_attr"""
return self.best_trial
def next_trials(self):
if self._unfinished_count > 0:
# Last round not finished
return []
trials = []
raw_param_list, extra_arg_list = self._select()
if not extra_arg_list:
extra_arg_list = [None] * len(raw_param_list)
for exp in self.experiment_list:
for param_config, extra_arg in zip(raw_param_list, extra_arg_list):
tag = ''
new_spec = copy.deepcopy(exp.spec)
for path, value in param_config.items():
tag += '%s=%s-' % (path.split('.')[-1], value)
deep_insert(path.split('.'), value, new_spec['config'])
trial = create_trial_from_spec(
new_spec, exp.name, self._parser, experiment_tag=tag)
# AutoML specific fields set in Trial
trial.results = []
trial.best_result = None
trial.param_config = param_config
trial.extra_arg = extra_arg
trials.append(trial)
self._running_trials[trial.trial_id] = trial
ntrial = len(trials)
self._iteration += 1
self._unfinished_count = ntrial
self._total_trial_num += ntrial
self._start_ts = time.time()
logger.info(
"=========== BEGIN Experiment-Round: %(round)s "
"[%(new)s NEW | %(total)s TOTAL] ===========", {
"round": self._iteration,
"new": ntrial,
"total": self._total_trial_num
})
return trials
def on_trial_result(self, trial_id, result):
if not result:
return
trial = self._running_trials[trial_id]
# Update trial's best result
trial.results.append(result)
if trial.best_result is None \
or result[self.reward_attr] \
> trial.best_result[self.reward_attr]:
trial.best_result = result
# Update job's best trial
if self.best_trial is None \
or (result[self.reward_attr]
> self.best_trial.best_result[self.reward_attr]):
self.best_trial = self._running_trials[trial_id]
def on_trial_complete(self,
trial_id,
result=None,
error=False,
early_terminated=False):
self.on_trial_result(trial_id, result)
self._unfinished_count -= 1
if self._unfinished_count == 0:
total = len(self._running_trials)
succ = sum(t.status == Trial.TERMINATED
for t in self._running_trials.values())
# handle the last trial
this_trial = self._running_trials[trial_id]
if this_trial.status == Trial.RUNNING and not error:
succ += 1
elapsed = time.time() - self._start_ts
logger.info(
"=========== END Experiment-Round: %(round)s "
"[%(succ)s SUCC | %(fail)s FAIL] this round, "
"elapsed=%(elapsed).2f, "
"BEST %(reward_attr)s=%(reward)f ===========", {
"round": self._iteration,
"succ": succ,
"fail": total - succ,
"elapsed": elapsed,
"reward_attr": self.reward_attr,
"reward": self.best_trial.best_result[self.reward_attr]
if self.best_trial else None
})
action = self._feedback(self._running_trials.values())
if action == AutoMLSearcher.TERMINATE:
self._is_finished = True
self._completed_trials.update(self._running_trials)
self._running_trials = {}
def is_finished(self):
return self._is_finished
def _select(self):
"""Select a bunch of parameter permutations to run.
The permutations should be a list of dict, which contains the
<path, value> pair. The ``path`` could be a dot separated string,
which will be expanded to merge into the experiment's config by the
framework. For example:
pair : {"path.to.key": 1}
config in experiment : {"path": {"to": {"key": 1}, ...}, ...}
The framework generates 1 config for 1 Trial. User could also return
an extra list to add an additional argument to the trial
Returns:
A list of config + a list of extra argument (can be None)
"""
raise NotImplementedError
def _feedback(self, trials):
"""Feedback the completed trials corresponding to the last selected
parameter permutations
Arguments:
trials (list): A list of Trial object, where user can fetch the
result attribute, etc.
Returns:
Next action, i.e.: CONTINUE, TERMINATE
"""
raise NotImplementedError
class GridSearch(AutoMLSearcher):
"""Implement the grid search"""
def _select(self):
grid = self.search_space.to_grid_search()
configs = []
for _, config in generate_variants(grid):
configs.append(config)
return configs, None
def _feedback(self, trials):
return AutoMLSearcher.TERMINATE
class RandomSearch(AutoMLSearcher):
"""Implement the random search"""
def __init__(self, search_space, reward_attr, repeat):
super(RandomSearch, self).__init__(search_space, reward_attr)
self.repeat = repeat
def _select(self):
choices = self.search_space.to_random_choice()
configs = []
for _ in range(self.repeat):
for _, config in generate_variants(choices):
configs.append(config)
return configs, None
def _feedback(self, trials):
return AutoMLSearcher.TERMINATE
+183
View File
@@ -0,0 +1,183 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import random
import logging
import numpy as np
from ray.tune import grid_search
logger = logging.getLogger(__name__)
class ParameterSpace(object):
"""Base class of a single parameter's search space.
"""
def __init__(self, name):
"""Initialize ParameterSpace.
Arguments:
name (str): Name of the parameter. Name can be dot separated,
which will be interpreted as path of a nested config
"""
self.name = name
class DiscreteSpace(ParameterSpace):
"""Search space with discrete choices.
"""
def __init__(self, name, choices):
"""Initialize DiscreteSpace.
Arguments:
name (str): Name of the parameter.
choices (list): List of all possible choices.
"""
super(DiscreteSpace, self).__init__(name)
self.choices = choices
def to_grid_search(self):
"""Returns a ray.tune.grid_search structure.
Contains all the choices inside and can be expanded by ray.tune.
"""
return grid_search(self.choices)
def to_random_choice(self):
"""Returns a lambda function that choose a value randomly.
Can be expanded by ray.tune.
"""
return lambda _: random.choice(self.choices)
def choices_count(self):
return len(self.choices)
def __str__(self):
return "DiscreteSpace %s: %s" % (self.name, str(self.choices))
class ContinuousSpace(ParameterSpace):
"""Search space of continuous type.
NOTE that it can be converted to ``DiscreteSpace`` by sampling under
certain distribution such as linear.
"""
LINEAR = 'linear'
# TODO: logspace
def __init__(self, name, start, end, num, distribution=LINEAR):
"""Initialize ContinuousSpace.
Arguments:
name (str): Name of the parameter.
start: Start of the continuous space included.
end: End of the continuous space included.
num: Sampling count if possible.
distribution: Sampling distribution, should be in [LINEAR]
"""
super(ContinuousSpace, self).__init__(name)
self.start = float(start)
self.end = float(end)
self.num = num
if distribution == ContinuousSpace.LINEAR:
self.choices = np.linspace(start, end, num)
else:
raise NotImplementedError(
"Distribution %s not supported" % distribution)
self.distribution = distribution
def to_grid_search(self):
"""Returns a ray.tune.grid_search structure.
Apply sampling to get discrete choices.
"""
return grid_search(self.choices)
def to_random_choice(self):
"""Returns a lambda function that choose a value randomly.
Can be expanded by ray.tune.
"""
return lambda _: random.uniform(self.start, self.end)
def choices_count(self):
return len(self.choices)
def __str__(self):
return "ContinuousSpace %s: [%s, %s]" % (self.name, self.start,
self.end)
class SearchSpace(object):
"""Collection of ``ParameterSpace``, a.k.a <name, space> pair.
It's supposed to be used with a fixed experiment config, which
could be a very complicated (nested) dict. Each ``ParameterSpace``
points to a unique place in the experiment config using its name
as the path.
"""
def __init__(self, param_list):
"""Initialize SearchSpace.
Arguments:
param_list: List of ``ParameterSpace`` (or its subclass).
"""
self.param_list = param_list
for ps in param_list:
# ps MUST be ParameterSpace
logger.info("Add %s into SearchSpace" % ps)
def to_grid_search(self):
"""Returns a dict of {parameter name: grid_search}.
Apply ``to_grid_search`` to all ``ParameterSpace``.
"""
return {ps.name: ps.to_grid_search() for ps in self.param_list}
def to_random_choice(self):
"""Returns a dict of {parameter name: lambda function}.
Apply ``to_grid_search`` to all ``ParameterSpace``.
"""
return {ps.name: ps.to_random_choice() for ps in self.param_list}
def generate_random_one_hot_encoding(self):
"""Returns a list of one-hot encodings for all parameters.
1 one-hot np.array for 1 parameter,
and the 1's place is randomly chosen.
"""
encoding = []
for ps in self.param_list:
one_hot = np.zeros(ps.choices_count())
choice = random.randrange(ps.choices_count())
one_hot[choice] = 1
encoding.append(one_hot)
return encoding
def apply_one_hot_encoding(self, one_hot_encoding):
"""Apply one hot encoding to generate a specific config.
Arguments:
one_hot_encoding (list): A list of one hot encodings,
1 for each parameter. The shape of each encoding
should match that ``ParameterSpace``
Returns:
A dict config with specific <name, value> pair
"""
config = {}
for ps, one_hot in zip(self.param_list, one_hot_encoding):
index = np.argmax(one_hot)
config[ps.name] = ps.choices[index]
return config
@@ -0,0 +1,63 @@
"""This test checks that GeneticSearch is functional.
It also checks that it is usable with a separate scheduler.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
from ray.tune import run_experiments, register_trainable
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.automl import GeneticSearch
from ray.tune.automl import ContinuousSpace, DiscreteSpace, SearchSpace
def michalewicz_function(config, reporter):
"""f(x) = -sum{sin(xi) * [sin(i*xi^2 / pi)]^(2m)}"""
import numpy as np
x = np.array(
[config['x1'], config['x2'], config['x3'], config['x4'], config['x5']])
sin_x = np.sin(x)
z = (np.arange(1, 6) / np.pi * (x * x))
sin_z = np.power(np.sin(z), 20) # let m = 20
y = np.dot(sin_x, sin_z)
# Negate y since we want to minimize y value
reporter(timesteps_total=1, neg_mean_loss=-y)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
args, _ = parser.parse_known_args()
ray.init(redirect_output=True)
register_trainable("exp", michalewicz_function)
space = SearchSpace({
ContinuousSpace('x1', 0, 4, 100),
ContinuousSpace('x2', -2, 2, 100),
ContinuousSpace('x3', 1, 5, 100),
ContinuousSpace('x4', -3, 3, 100),
DiscreteSpace('x5', [-1, 0, 1, 2, 3]),
})
config = {
"my_exp": {
"run": "exp",
"stop": {
"training_iteration": 100
},
}
}
algo = GeneticSearch(
space,
reward_attr="neg_mean_loss",
max_generation=2 if args.smoke_test else 10,
population_size=10 if args.smoke_test else 50)
scheduler = AsyncHyperBandScheduler(reward_attr="neg_mean_loss")
run_experiments(config, search_alg=algo, scheduler=scheduler)
@@ -0,0 +1,69 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import random
import unittest
from ray.tune import register_trainable
from ray.tune.automl import SearchSpace, DiscreteSpace, GridSearch
class AutoMLSearcherTest(unittest.TestCase):
def setUp(self):
def dummy_train(config, reporter):
reporter(timesteps_total=100, done=True)
register_trainable("f1", dummy_train)
def testExpandSearchSpace(self):
exp = {"test-exp": {"run": "f1", "config": {"a": {'d': 'dummy'}}}}
space = SearchSpace([
DiscreteSpace('a.b.c', [1, 2]),
DiscreteSpace('a.d', ['a', 'b']),
])
searcher = GridSearch(space, 'reward')
searcher.add_configurations(exp)
trials = searcher.next_trials()
self.assertEqual(len(trials), 4)
self.assertTrue(trials[0].config['a']['b']['c'] in [1, 2])
self.assertTrue(trials[1].config['a']['d'] in ['a', 'b'])
def testSearchRound(self):
exp = {"test-exp": {"run": "f1", "config": {"a": {'d': 'dummy'}}}}
space = SearchSpace([
DiscreteSpace('a.b.c', [1, 2]),
DiscreteSpace('a.d', ['a', 'b']),
])
searcher = GridSearch(space, 'reward')
searcher.add_configurations(exp)
trials = searcher.next_trials()
self.assertEqual(len(searcher.next_trials()), 0)
for trial in trials[1:]:
searcher.on_trial_complete(trial.trial_id)
searcher.on_trial_complete(trials[0].trial_id, error=True)
self.assertTrue(searcher.is_finished())
def testBestTrial(self):
exp = {"test-exp": {"run": "f1", "config": {"a": {'d': 'dummy'}}}}
space = SearchSpace([
DiscreteSpace('a.b.c', [1, 2]),
DiscreteSpace('a.d', ['a', 'b']),
])
searcher = GridSearch(space, 'reward')
searcher.add_configurations(exp)
trials = searcher.next_trials()
self.assertEqual(len(searcher.next_trials()), 0)
for i, trial in enumerate(trials):
rewards = [x for x in range(i, i + 10)]
random.shuffle(rewards)
for reward in rewards:
searcher.on_trial_result(trial.trial_id, {"reward": reward})
best_trial = searcher.get_best_trial()
self.assertEqual(best_trial, trials[-1])
self.assertEqual(best_trial.best_result['reward'], 3 + 10 - 1)