diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index c88676b56..909857b28 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -292,7 +292,10 @@ def trial_progress_str(trials, metric_columns, fmt="psql", max_rows=None): keys = list(metric_columns.keys()) else: keys = metric_columns - keys = [k for k in keys if any(t.last_result.get(k) for t in trials)] + keys = [ + k for k in keys if any( + t.last_result.get(k) is not None for t in trials) + ] # Build trial rows. params = list(set().union(*[t.evaluated_params for t in trials])) trial_table = [_get_trial_info(trial, params, keys) for trial in trials] diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index 363705afe..32afd1342 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -3,7 +3,8 @@ import logging from ray.rllib.agents import with_common_config from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy from ray.rllib.agents.trainer_template import build_trainer -from ray.rllib.optimizers import SyncSamplesOptimizer, LocalMultiGPUOptimizer +from ray.rllib.optimizers import SyncSamplesOptimizer, \ + LocalMultiGPUOptimizer, TorchDistributedDataParallelOptimizer from ray.rllib.utils import try_import_tf tf = try_import_tf() @@ -64,6 +65,8 @@ DEFAULT_CONFIG = with_common_config({ # usually slower, but you might want to try it if you run into issues with # the default optimizer. "simple_optimizer": False, + # Use the experimental torch multi-node SGD optimizer. + "distributed_data_parallel_optimizer": False, # Use PyTorch as framework? "use_pytorch": False }) @@ -72,6 +75,33 @@ DEFAULT_CONFIG = with_common_config({ def choose_policy_optimizer(workers, config): + if config["distributed_data_parallel_optimizer"]: + if not config["use_pytorch"]: + raise ValueError( + "Distributed data parallel is only supported for PyTorch") + if config["num_gpus"]: + raise ValueError( + "When using distributed data parallel, you should set " + "num_gpus=0 since all optimization " + "is happening on workers. Enable GPUs for workers by setting " + "num_gpus_per_worker=1.") + if config["batch_mode"] != "truncate_episodes": + raise ValueError( + "Distributed data parallel requires truncate_episodes " + "batch mode.") + if config["sample_batch_size"] != config["train_batch_size"]: + raise ValueError( + "Distributed data parallel requires sample_batch_size to be " + "equal to train_batch_size. Each worker will sample and learn " + "on train_batch_size samples per iteration.") + + return TorchDistributedDataParallelOptimizer( + workers, + num_sgd_iter=config["num_sgd_iter"], + train_batch_size=config["train_batch_size"], + sgd_minibatch_size=config["sgd_minibatch_size"], + standardize_fields=["advantages"]) + if config["simple_optimizer"]: return SyncSamplesOptimizer( workers, diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index ea3d0c822..676a2e9b7 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -17,6 +17,7 @@ from ray.rllib.evaluation.sampler import AsyncSampler, SyncSampler from ray.rllib.policy.sample_batch import MultiAgentBatch, DEFAULT_POLICY_ID from ray.rllib.policy.policy import Policy from ray.rllib.policy.tf_policy import TFPolicy +from ray.rllib.policy.torch_policy import TorchPolicy from ray.rllib.offline import NoopOutput, IOContext, OutputWriter, InputReader from ray.rllib.offline.is_estimator import ImportanceSamplingEstimator from ray.rllib.offline.wis_estimator import WeightedImportanceSamplingEstimator @@ -27,6 +28,7 @@ from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.debug import disable_log_once_globally, log_once, \ summarize, enable_periodic_logging from ray.rllib.utils.filter import get_filter +from ray.rllib.utils.sgd import do_minibatch_sgd from ray.rllib.utils.tf_run_builder import TFRunBuilder from ray.rllib.utils import try_import_tf, try_import_torch @@ -619,6 +621,31 @@ class RolloutWorker(EvaluatorInterface): logger.debug("Training out:\n\n{}\n".format(summarize(info_out))) return info_out + def sample_and_learn(self, train_batch_size, num_sgd_iter, + sgd_minibatch_size, standardize_fields): + """Sample and batch and learn on it. + + This is typically used in combination with distributed allreduce. + + Arguments: + train_batch_size (int): Number of samples to learn on. + num_sgd_iter (int): Number of SGD iterations. + sgd_minibatch_size (int): SGD minibatch size. + standardize_fields (list): List of sample fields to normalize. + + Returns: + info: dictionary of extra metadata from learn_on_batch(). + count: number of samples learned on. + """ + batch = self.sample() + assert batch.count == train_batch_size, \ + (batch.count, "Batch size possibly out of sync between workers") + logger.info("Executing distributed minibatch SGD " + "on batch of size {}".format(batch.count)) + info = do_minibatch_sgd(batch, self.policy_map, self, num_sgd_iter, + sgd_minibatch_size, standardize_fields) + return info, batch.count + @DeveloperAPI def get_metrics(self): """Returns a list of new RolloutMetric objects from evaluation.""" @@ -783,6 +810,33 @@ class RolloutWorker(EvaluatorInterface): logger.info("Built preprocessor map: {}".format(preprocessors)) return policy_map, preprocessors + def setup_torch_data_parallel(self, url, world_rank, world_size, backend): + """Join a torch process group for distributed SGD.""" + + logger.info("Joining process group, url={}, world_rank={}, " + "world_size={}, backend={}".format(url, world_rank, + world_size, backend)) + torch.distributed.init_process_group( + backend=backend, + init_method=url, + rank=world_rank, + world_size=world_size) + + for pid, policy in self.policy_map.items(): + if not isinstance(policy, TorchPolicy): + raise ValueError( + "This policy does not support torch distributed", policy) + policy.distributed_world_size = world_size + + def get_node_ip(self): + """Returns the IP address of the current node.""" + return ray.services.get_node_ip_address() + + def find_free_port(self): + """Finds a free port on the current node.""" + from ray.experimental.sgd import utils + return utils.find_free_port() + def __del__(self): if hasattr(self, "sampler") and isinstance(self.sampler, AsyncSampler): self.sampler.shutdown = True diff --git a/rllib/optimizers/__init__.py b/rllib/optimizers/__init__.py index 48a07efe8..0025ec021 100644 --- a/rllib/optimizers/__init__.py +++ b/rllib/optimizers/__init__.py @@ -9,6 +9,8 @@ from ray.rllib.optimizers.sync_batch_replay_optimizer import \ SyncBatchReplayOptimizer from ray.rllib.optimizers.microbatch_optimizer import MicrobatchOptimizer from ray.rllib.optimizers.multi_gpu_optimizer import LocalMultiGPUOptimizer +from ray.rllib.optimizers.torch_distributed_data_parallel_optimizer import \ + TorchDistributedDataParallelOptimizer __all__ = [ "PolicyOptimizer", @@ -20,4 +22,5 @@ __all__ = [ "SyncReplayOptimizer", "LocalMultiGPUOptimizer", "SyncBatchReplayOptimizer", + "TorchDistributedDataParallelOptimizer", ] diff --git a/rllib/optimizers/multi_gpu_optimizer.py b/rllib/optimizers/multi_gpu_optimizer.py index d844552b1..ffcd6cf2c 100644 --- a/rllib/optimizers/multi_gpu_optimizer.py +++ b/rllib/optimizers/multi_gpu_optimizer.py @@ -10,6 +10,7 @@ from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer from ray.rllib.optimizers.rollout import collect_samples from ray.rllib.utils.annotations import override +from ray.rllib.utils.sgd import averaged from ray.rllib.utils.timer import TimerStat from ray.rllib.policy.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \ MultiAgentBatch @@ -201,8 +202,8 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): for k, v in batch_fetches[LEARNER_STATS_KEY].items(): iter_extra_fetches[k].append(v) logger.debug("{} {}".format(i, - _averaged(iter_extra_fetches))) - fetches[policy_id] = _averaged(iter_extra_fetches) + averaged(iter_extra_fetches))) + fetches[policy_id] = averaged(iter_extra_fetches) self.num_steps_sampled += samples.count self.num_steps_trained += tuples_per_device * len(self.devices) @@ -220,11 +221,3 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): 3), "learner": self.learner_stats, }) - - -def _averaged(kv): - out = {} - for k, v in kv.items(): - if v[0] is not None and not isinstance(v[0], dict): - out[k] = np.mean(v) - return out diff --git a/rllib/optimizers/sync_samples_optimizer.py b/rllib/optimizers/sync_samples_optimizer.py index 90119076c..582d0dc28 100644 --- a/rllib/optimizers/sync_samples_optimizer.py +++ b/rllib/optimizers/sync_samples_optimizer.py @@ -1,15 +1,11 @@ import logging -import random -from collections import defaultdict import ray -from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY -from ray.rllib.optimizers.multi_gpu_optimizer import _averaged from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer -from ray.rllib.policy.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \ - MultiAgentBatch +from ray.rllib.policy.sample_batch import SampleBatch, DEFAULT_POLICY_ID from ray.rllib.utils.annotations import override from ray.rllib.utils.filter import RunningStat +from ray.rllib.utils.sgd import do_minibatch_sgd from ray.rllib.utils.timer import TimerStat from ray.rllib.utils.memory import ray_get_and_free @@ -67,40 +63,14 @@ class SyncSamplesOptimizer(PolicyOptimizer): samples = SampleBatch.concat_samples(samples) self.sample_timer.push_units_processed(samples.count) - # Handle everything as if multiagent - if isinstance(samples, SampleBatch): - samples = MultiAgentBatch({ - DEFAULT_POLICY_ID: samples - }, samples.count) - - fetches = {} with self.grad_timer: - for policy_id, policy in self.policies.items(): - if policy_id not in samples.policy_batches: - continue - - batch = samples.policy_batches[policy_id] - for field in self.standardize_fields: - value = batch[field] - standardized = (value - value.mean()) / max( - 1e-4, value.std()) - batch[field] = standardized - - for i in range(self.num_sgd_iter): - iter_extra_fetches = defaultdict(list) - for minibatch in self._minibatches(batch): - batch_fetches = ( - self.workers.local_worker().learn_on_batch( - MultiAgentBatch({ - policy_id: minibatch - }, minibatch.count)))[policy_id] - for k, v in batch_fetches[LEARNER_STATS_KEY].items(): - iter_extra_fetches[k].append(v) - logger.debug("{} {}".format(i, - _averaged(iter_extra_fetches))) - fetches[policy_id] = _averaged(iter_extra_fetches) - + fetches = do_minibatch_sgd(samples, self.policies, + self.workers.local_worker(), + self.num_sgd_iter, + self.sgd_minibatch_size, + self.standardize_fields) self.grad_timer.push_units_processed(samples.count) + if len(fetches) == 1 and DEFAULT_POLICY_ID in fetches: self.learner_stats = fetches[DEFAULT_POLICY_ID] else: @@ -124,27 +94,3 @@ class SyncSamplesOptimizer(PolicyOptimizer): "opt_samples": round(self.grad_timer.mean_units_processed, 3), "learner": self.learner_stats, }) - - def _minibatches(self, samples): - if not self.sgd_minibatch_size: - yield samples - return - - if isinstance(samples, MultiAgentBatch): - raise NotImplementedError( - "Minibatching not implemented for multi-agent in simple mode") - - if "state_in_0" in samples.data: - logger.warning("Not shuffling RNN data for SGD in simple mode") - else: - samples.shuffle() - - i = 0 - slices = [] - while i < samples.count: - slices.append((i, i + self.sgd_minibatch_size)) - i += self.sgd_minibatch_size - random.shuffle(slices) - - for i, j in slices: - yield samples.slice(i, j) diff --git a/rllib/optimizers/torch_distributed_data_parallel_optimizer.py b/rllib/optimizers/torch_distributed_data_parallel_optimizer.py new file mode 100644 index 000000000..73402b70c --- /dev/null +++ b/rllib/optimizers/torch_distributed_data_parallel_optimizer.py @@ -0,0 +1,103 @@ +import logging + +import ray +from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer +from ray.rllib.utils.annotations import override +from ray.rllib.utils.timer import TimerStat + +logger = logging.getLogger(__name__) + + +class TorchDistributedDataParallelOptimizer(PolicyOptimizer): + """EXPERIMENTAL: torch distributed multi-node SGD.""" + + def __init__(self, + workers, + num_sgd_iter=1, + train_batch_size=1, + sgd_minibatch_size=0, + standardize_fields=frozenset([]), + keep_local_weights_in_sync=True, + backend="gloo"): + PolicyOptimizer.__init__(self, workers) + self.learner_stats = {} + self.num_sgd_iter = num_sgd_iter + self.train_batch_size = train_batch_size + self.sgd_minibatch_size = sgd_minibatch_size + self.standardize_fields = standardize_fields + self.keep_local_weights_in_sync = keep_local_weights_in_sync + self.update_weights_timer = TimerStat() + self.learn_timer = TimerStat() + + # Setup the distributed processes. + if not self.workers.remote_workers(): + raise ValueError("This optimizer requires >0 remote workers.") + ip = ray.get(workers.remote_workers()[0].get_node_ip.remote()) + port = ray.get(workers.remote_workers()[0].find_free_port.remote()) + address = "tcp://{ip}:{port}".format(ip=ip, port=port) + logger.info( + "Creating torch process group with leader {}".format(address)) + + # Get setup tasks in order to throw errors on failure. + ray.get([ + worker.setup_torch_data_parallel.remote( + address, i, len(workers.remote_workers()), backend) + for i, worker in enumerate(workers.remote_workers()) + ]) + logger.info("Torch process group init completed") + + @override(PolicyOptimizer) + def step(self): + # Sync up the weights. In principle we don't need this, but it doesn't + # add too much overhead and handles the case where the user manually + # updates the local weights. + if self.keep_local_weights_in_sync: + with self.update_weights_timer: + weights = ray.put(self.workers.local_worker().get_weights()) + for e in self.workers.remote_workers(): + e.set_weights.remote(weights) + + with self.learn_timer: + results = ray.get([ + w.sample_and_learn.remote( + self.train_batch_size, self.num_sgd_iter, + self.sgd_minibatch_size, self.standardize_fields) + for w in self.workers.remote_workers() + ]) + for info, count in results: + self.num_steps_sampled += count + self.num_steps_trained += count + self.learner_stats = results[0][0] + + # In debug mode, check the allreduce successfully synced the weights. + if logger.isEnabledFor(logging.DEBUG): + weights = ray.get([ + w.get_weights.remote() for w in self.workers.remote_workers() + ]) + sums = [] + for w in weights: + acc = 0 + for p in w.values(): + for k, v in p.items(): + acc += v.sum() + sums.append(float(acc)) + logger.debug("The worker weight sums are {}".format(sums)) + assert len(set(sums)) == 1, sums + + # Sync down the weights. As with the sync up, this is not really + # needed unless the user is reading the local weights. + if self.keep_local_weights_in_sync: + self.workers.local_worker().set_weights( + ray.get(self.workers.remote_workers()[0].get_weights.remote())) + + return self.learner_stats + + @override(PolicyOptimizer) + def stats(self): + return dict( + PolicyOptimizer.stats(self), **{ + "update_weights_time_ms": round( + 1000 * self.update_weights_timer.mean, 3), + "learn_time_ms": round(1000 * self.learn_timer.mean, 3), + "learner": self.learner_stats, + }) diff --git a/rllib/policy/policy.py b/rllib/policy/policy.py index 4d52d0398..9518a6d5d 100644 --- a/rllib/policy/policy.py +++ b/rllib/policy/policy.py @@ -141,7 +141,8 @@ class Policy(metaclass=ABCMeta): state_batch = [[s] for s in state] [action], state_out, info = self.compute_actions( - [obs], state_batch, + [obs], + state_batch, prev_action_batch=prev_action_batch, prev_reward_batch=prev_reward_batch, info_batch=info_batch, @@ -310,9 +311,8 @@ def clip_action(action, space): return np.clip(action, space.low, space.high) elif isinstance(space, gym.spaces.Tuple): if type(action) not in (tuple, list): - raise ValueError( - "Expected tuple space for actions {}: {}". - format(action, space)) + raise ValueError("Expected tuple space for actions {}: {}".format( + action, space)) out = [] for a, s in zip(action, space.spaces): out.append(clip_action(a, s)) diff --git a/rllib/policy/torch_policy.py b/rllib/policy/torch_policy.py index 80d99af3f..e69484ba9 100644 --- a/rllib/policy/torch_policy.py +++ b/rllib/policy/torch_policy.py @@ -1,4 +1,5 @@ import numpy as np +import time from ray.rllib.policy.policy import Policy, LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import SampleBatch @@ -22,6 +23,7 @@ class TorchPolicy(Policy): model (TorchModel): Torch model instance dist_class (type): Torch action distribution class """ + def __init__(self, observation_space, action_space, config, model, loss, action_distribution_class): """Build a policy from policy and loss torch modules. @@ -41,16 +43,19 @@ class TorchPolicy(Policy): action_distribution_class (ActionDistribution): Class for action distribution. """ - super(TorchPolicy, self).__init__( - observation_space, action_space, config - ) + super(TorchPolicy, self).__init__(observation_space, action_space, + config) self.device = (torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")) self.model = model.to(self.device) + self.unwrapped_model = model # used to support DistributedDataParallel self._loss = loss self._optimizer = self.optimizer() self.dist_class = action_distribution_class + # If set, means we are using distributed allreduce during learning. + self.distributed_world_size = None + @override(Policy) def compute_actions(self, obs_batch, @@ -85,12 +90,26 @@ class TorchPolicy(Policy): self._optimizer.zero_grad() loss_out.backward() - grad_process_info = self.extra_grad_process() - self._optimizer.step() + info = {} + info.update(self.extra_grad_process()) - grad_info = self.extra_grad_info(train_batch) - grad_info.update(grad_process_info) - return {LEARNER_STATS_KEY: grad_info} + if self.distributed_world_size: + grads = [] + for p in self.model.parameters(): + if p.grad is not None: + grads.append(p.grad) + start = time.time() + torch.distributed.all_reduce_coalesced( + grads, op=torch.distributed.ReduceOp.SUM) + for p in self.model.parameters(): + if p.grad is not None: + p.grad /= self.distributed_world_size + info["allreduce_latency"] = time.time() - start + + self._optimizer.step() + info.update(self.extra_grad_info(train_batch)) + + return {LEARNER_STATS_KEY: info} @override(Policy) def compute_gradients(self, postprocessed_batch): @@ -143,7 +162,10 @@ class TorchPolicy(Policy): return processing info.""" return {} - def extra_action_out(self, input_dict, state_batches, model, + def extra_action_out(self, + input_dict, + state_batches, + model, action_dist=None): """Returns dict of extra info to include in experience batch. @@ -205,8 +227,7 @@ class LearningRateSchedule(object): self.lr_schedule = ConstantSchedule(lr) else: self.lr_schedule = PiecewiseSchedule( - lr_schedule, outside_value=lr_schedule[-1][-1] - ) + lr_schedule, outside_value=lr_schedule[-1][-1]) @override(Policy) def on_global_var_update(self, global_vars): @@ -246,5 +267,4 @@ class EntropyCoeffSchedule(object): def on_global_var_update(self, global_vars): super(EntropyCoeffSchedule, self).on_global_var_update(global_vars) self.entropy_coeff = self.entropy_coeff_schedule.value( - global_vars["timestep"] - ) + global_vars["timestep"]) diff --git a/rllib/tuned_examples/regression_tests/cartpole-torch-dist.yaml b/rllib/tuned_examples/regression_tests/cartpole-torch-dist.yaml new file mode 100644 index 000000000..c346d9671 --- /dev/null +++ b/rllib/tuned_examples/regression_tests/cartpole-torch-dist.yaml @@ -0,0 +1,14 @@ +cartpole-torch-dist: + env: CartPole-v0 + run: PPO + stop: + episode_reward_mean: 150 + timesteps_total: 100000 + config: + num_workers: 2 + sample_batch_size: 4000 + train_batch_size: 4000 + batch_mode: truncate_episodes + observation_filter: MeanStdFilter + use_pytorch: true + distributed_data_parallel_optimizer: true diff --git a/rllib/utils/sgd.py b/rllib/utils/sgd.py new file mode 100644 index 000000000..04ce2b662 --- /dev/null +++ b/rllib/utils/sgd.py @@ -0,0 +1,116 @@ +"""Utils for minibatch SGD across multiple RLlib policies.""" + +import numpy as np +import logging +from collections import defaultdict +import random + +from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY +from ray.rllib.policy.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \ + MultiAgentBatch + +logger = logging.getLogger(__name__) + + +def averaged(kv): + """Average the value lists of a dictionary. + + Arguments: + kv (dict): dictionary with values that are lists of floats. + + Returns: + dictionary with single averaged float as values. + """ + out = {} + for k, v in kv.items(): + if v[0] is not None and not isinstance(v[0], dict): + out[k] = np.mean(v) + return out + + +def standardized(array): + """Normalize the values in an array. + + Arguments: + array (np.ndarray): Array of values to normalize. + + Returns: + array with zero mean and unit standard deviation. + """ + return (array - array.mean()) / max(1e-4, array.std()) + + +def minibatches(samples, sgd_minibatch_size): + """Return a generator yielding minibatches from a sample batch. + + Arguments: + samples (SampleBatch): batch of samples to split up. + sgd_minibatch_size (int): size of minibatches to return. + + Returns: + generator that returns mini-SampleBatches of size sgd_minibatch_size. + """ + if not sgd_minibatch_size: + yield samples + return + + if isinstance(samples, MultiAgentBatch): + raise NotImplementedError( + "Minibatching not implemented for multi-agent in simple mode") + + if "state_in_0" in samples.data: + logger.warning("Not shuffling RNN data for SGD in simple mode") + else: + samples.shuffle() + + i = 0 + slices = [] + while i < samples.count: + slices.append((i, i + sgd_minibatch_size)) + i += sgd_minibatch_size + random.shuffle(slices) + + for i, j in slices: + yield samples.slice(i, j) + + +def do_minibatch_sgd(samples, policies, local_worker, num_sgd_iter, + sgd_minibatch_size, standardize_fields): + """Execute minibatch SGD. + + Arguments: + samples (SampleBatch): batch of samples to optimize. + policies (dict): dictionary of policies to optimize. + local_worker (RolloutWorker): master rollout worker instance. + num_sgd_iter (int): number of epochs of optimization to take. + sgd_minibatch_size (int): size of minibatches to use for optimization. + standardize_fields (list): list of sample field names that should be + normalized prior to optimization. + + Returns: + averaged info fetches over the last SGD epoch taken. + """ + if isinstance(samples, SampleBatch): + samples = MultiAgentBatch({DEFAULT_POLICY_ID: samples}, samples.count) + + fetches = {} + for policy_id, policy in policies.items(): + if policy_id not in samples.policy_batches: + continue + + batch = samples.policy_batches[policy_id] + for field in standardize_fields: + batch[field] = standardized(batch[field]) + + for i in range(num_sgd_iter): + iter_extra_fetches = defaultdict(list) + for minibatch in minibatches(batch, sgd_minibatch_size): + batch_fetches = (local_worker.learn_on_batch( + MultiAgentBatch({ + policy_id: minibatch + }, minibatch.count)))[policy_id] + for k, v in batch_fetches[LEARNER_STATS_KEY].items(): + iter_extra_fetches[k].append(v) + logger.debug("{} {}".format(i, averaged(iter_extra_fetches))) + fetches[policy_id] = averaged(iter_extra_fetches) + return fetches