From ea9b2b7df5eda0974e6235968b8aa473c0426db3 Mon Sep 17 00:00:00 2001 From: Kashif Rasul Date: Sun, 7 Feb 2021 17:43:07 +0100 Subject: [PATCH] Gluon master (#29) * Estimator needs an create_instance_splitter now * updated estimators and tests * fix test * validated --- pts/dataset/loader.py | 5 +- pts/model/deepar/deepar_estimator.py | 48 ++++-- pts/model/deepvar/deepvar_estimator.py | 88 +++++++---- pts/model/estimator.py | 28 +++- pts/model/lstnet/lstnet_estimator.py | 47 ++++-- pts/model/n_beats/n_beats_ensemble.py | 3 +- pts/model/n_beats/n_beats_estimator.py | 46 ++++-- .../simple_feedforward_estimator.py | 46 ++++-- pts/model/tempflow/tempflow_estimator.py | 68 +++++--- .../transformer/transformer_estimator.py | 49 ++++-- .../transformer_tempflow_estimator.py | 68 +++++--- test/feature/test_holiday.py | 5 +- test/model/deepar/test_auxillary_outputs.py | 11 +- test/model/test_forecast.py | 13 +- test/model/test_lstnet.py | 12 +- .../test_implicit_quantile_distr_output.py | 147 ++++++++++-------- 16 files changed, 452 insertions(+), 232 deletions(-) diff --git a/pts/dataset/loader.py b/pts/dataset/loader.py index c3e3288..f56fa96 100644 --- a/pts/dataset/loader.py +++ b/pts/dataset/loader.py @@ -5,7 +5,7 @@ from torch.utils.data import IterableDataset from gluonts.dataset.common import Dataset from gluonts.transform import Transformation, TransformedDataset -from gluonts.itertools import Cyclic, PseudoShuffled +from gluonts.itertools import Cyclic, PseudoShuffled, Cached class TransformedIterableDataset(IterableDataset): @@ -15,12 +15,13 @@ class TransformedIterableDataset(IterableDataset): transform: Transformation, is_train: bool = True, shuffle_buffer_length: Optional[int] = None, + cache_data: bool = False, ): super().__init__() self.shuffle_buffer_length = shuffle_buffer_length self.transformed_dataset = TransformedDataset( - Cyclic(dataset), + Cyclic(dataset) if not cache_data else Cached(Cyclic(dataset)), transform, is_train=is_train, ) diff --git a/pts/model/deepar/deepar_estimator.py b/pts/model/deepar/deepar_estimator.py index eb10f92..166e07c 100644 --- a/pts/model/deepar/deepar_estimator.py +++ b/pts/model/deepar/deepar_estimator.py @@ -4,6 +4,7 @@ import numpy as np import torch import torch.nn as nn +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.time_feature import ( TimeFeature, @@ -21,6 +22,8 @@ from gluonts.transform import ( AddAgeFeature, VstackFeatures, InstanceSplitter, + ValidationSplitSampler, + TestSplitSampler, ExpectedNumInstanceSampler, ) from gluonts.torch.support.util import copy_parameters @@ -37,6 +40,7 @@ from .deepar_network import DeepARTrainingNetwork, DeepARPredictionNetwork class DeepAREstimator(PyTorchEstimator): + @validated() def __init__( self, freq: str, @@ -99,6 +103,11 @@ class DeepAREstimator(PyTorchEstimator): self.num_parallel_samples = num_parallel_samples + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1.0, min_future=prediction_length + ) + self.validation_sampler = ValidationSplitSampler(min_future=prediction_length) + def create_transformation(self) -> Transformation: remove_field_names = [] if not self.use_feat_static_real: @@ -170,22 +179,32 @@ class DeepAREstimator(PyTorchEstimator): else [] ), ), - InstanceSplitter( - target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - past_length=self.history_length, - future_length=self.prediction_length, - time_series_fields=[ - FieldName.FEAT_TIME, - FieldName.OBSERVED_VALUES, - ], - ), ] ) + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + past_length=self.history_length, + future_length=self.prediction_length, + time_series_fields=[ + FieldName.FEAT_TIME, + FieldName.OBSERVED_VALUES, + ], + ) + def create_training_network(self, device: torch.device) -> DeepARTrainingNetwork: return DeepARTrainingNetwork( input_size=self.input_size, @@ -230,9 +249,10 @@ class DeepAREstimator(PyTorchEstimator): copy_parameters(trained_network, prediction_network) input_names = get_module_forward_input_names(prediction_network) + prediction_splitter = self.create_instance_splitter("test") return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/pts/model/deepvar/deepvar_estimator.py b/pts/model/deepvar/deepvar_estimator.py index 419cc81..456b57a 100644 --- a/pts/model/deepvar/deepvar_estimator.py +++ b/pts/model/deepvar/deepvar_estimator.py @@ -3,6 +3,7 @@ from typing import List, Optional, Callable import numpy as np import torch +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.time_feature import TimeFeature from gluonts.torch.modules.distribution_output import DistributionOutput @@ -18,6 +19,8 @@ from gluonts.transform import ( ExpandDimArray, ExpectedNumInstanceSampler, InstanceSplitter, + ValidationSplitSampler, + TestSplitSampler, RenameFields, SetField, TargetDimIndicator, @@ -41,6 +44,7 @@ from .deepvar_network import DeepVARTrainingNetwork, DeepVARPredictionNetwork class DeepVAREstimator(PyTorchEstimator): + @validated() def __init__( self, input_size: int, @@ -126,25 +130,18 @@ class DeepVAREstimator(PyTorchEstimator): else: self.output_transform = None - def create_transformation(self) -> Transformation: - def use_marginal_transformation( - marginal_transformation: bool, - ) -> Transformation: - if marginal_transformation: - return CDFtoGaussianTransform( - target_field=FieldName.TARGET, - observed_values_field=FieldName.OBSERVED_VALUES, - max_context_length=self.conditioning_length, - target_dim=self.target_dim, - ) - else: - return RenameFields( - { - f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf", - f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf", - } - ) + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1.0, + min_past=0 if pick_incomplete else self.history_length, + min_future=prediction_length, + ) + self.validation_sampler = ValidationSplitSampler( + min_past=0 if pick_incomplete else self.history_length, + min_future=prediction_length, + ) + + def create_transformation(self) -> Transformation: remove_field_names = [FieldName.FEAT_DYNAMIC_CAT] if not self.use_feat_dynamic_real: remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL) @@ -208,24 +205,46 @@ class DeepVAREstimator(PyTorchEstimator): field=FieldName.FEAT_STATIC_CAT, expected_ndim=1, dtype=np.long ), AsNumpyArray(field=FieldName.FEAT_STATIC_REAL, expected_ndim=1), - InstanceSplitter( - target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - past_length=self.history_length, - future_length=self.prediction_length, - time_series_fields=[ - FieldName.FEAT_TIME, - FieldName.OBSERVED_VALUES, - ], - pick_incomplete=self.pick_incomplete, - ), - use_marginal_transformation(self.use_marginal_transformation), ] ) + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + past_length=self.history_length, + future_length=self.prediction_length, + time_series_fields=[ + FieldName.FEAT_TIME, + FieldName.OBSERVED_VALUES, + ], + ) + ( + CDFtoGaussianTransform( + target_field=FieldName.TARGET, + observed_values_field=FieldName.OBSERVED_VALUES, + max_context_length=self.conditioning_length, + target_dim=self.target_dim, + ) + if self.use_marginal_transformation + else RenameFields( + { + f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf", + f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf", + } + ) + ) + def create_training_network(self, device: torch.device) -> DeepVARTrainingNetwork: return DeepVARTrainingNetwork( input_size=self.input_size, @@ -270,9 +289,10 @@ class DeepVAREstimator(PyTorchEstimator): copy_parameters(trained_network, prediction_network) input_names = get_module_forward_input_names(prediction_network) + prediction_splitter = self.create_instance_splitter("test") return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/pts/model/estimator.py b/pts/model/estimator.py index 74fe6e0..a10429c 100644 --- a/pts/model/estimator.py +++ b/pts/model/estimator.py @@ -46,6 +46,18 @@ class PyTorchEstimator(Estimator): """ raise NotImplementedError + def create_instance_splitter(self, mode: str) -> Transformation: + """ + Create and return the instance splitter needed for training, validation or testing. + + Returns + ------- + Transformation + The InstanceSplitter that will be applied entry-wise to datasets, + at training, validation and inference time based on mode. + """ + raise NotImplementedError + def create_training_network(self, device: torch.device) -> nn.Module: """ Create and return the network used for training (i.e., computing the @@ -81,6 +93,7 @@ class PyTorchEstimator(Estimator): num_workers: int = 0, prefetch_factor: int = 2, shuffle_buffer_length: Optional[int] = None, + cache_data: bool = False, **kwargs, ) -> TrainOutput: transformation = self.create_transformation() @@ -88,12 +101,15 @@ class PyTorchEstimator(Estimator): trained_net = self.create_training_network(self.trainer.device) input_names = get_module_forward_input_names(trained_net) - + training_instance_splitter = self.create_instance_splitter("training") training_iter_dataset = TransformedIterableDataset( dataset=training_data, - transform=transformation + SelectFields(input_names), + transform=transformation + + training_instance_splitter + + SelectFields(input_names), is_train=True, shuffle_buffer_length=shuffle_buffer_length, + cache_data=cache_data, ) training_data_loader = DataLoader( @@ -106,10 +122,14 @@ class PyTorchEstimator(Estimator): validation_data_loader = None if validation_data is not None: + validation_instance_splitter = self.create_instance_splitter("validation") validation_iter_dataset = TransformedIterableDataset( dataset=validation_data, - transform=transformation + SelectFields(input_names), + transform=transformation + + validation_instance_splitter + + SelectFields(input_names), is_train=True, + cache_data=cache_data, ) validation_data_loader = DataLoader( validation_iter_dataset, @@ -140,6 +160,7 @@ class PyTorchEstimator(Estimator): num_workers: int = 0, prefetch_factor: int = 2, shuffle_buffer_length: Optional[int] = None, + cache_data: bool = False, **kwargs, ) -> PyTorchPredictor: return self.train_model( @@ -148,5 +169,6 @@ class PyTorchEstimator(Estimator): num_workers=num_workers, prefetch_factor=prefetch_factor, shuffle_buffer_length=shuffle_buffer_length, + cache_data=cache_data, **kwargs, ).predictor diff --git a/pts/model/lstnet/lstnet_estimator.py b/pts/model/lstnet/lstnet_estimator.py index bdb5799..e98981b 100644 --- a/pts/model/lstnet/lstnet_estimator.py +++ b/pts/model/lstnet/lstnet_estimator.py @@ -4,12 +4,15 @@ import numpy as np import torch import torch.nn as nn +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.torch.support.util import copy_parameters from gluonts.torch.model.predictor import PyTorchPredictor from gluonts.model.predictor import Predictor from gluonts.transform import ( InstanceSplitter, + ValidationSplitSampler, + TestSplitSampler, Transformation, Chain, ExpectedNumInstanceSampler, @@ -25,16 +28,17 @@ from .lstnet_network import LSTNetTrain, LSTNetPredict class LSTNetEstimator(PyTorchEstimator): + @validated() def __init__( self, freq: str, + prediction_length: int, context_length: int, num_series: int, ar_window: int = 24, skip_size: int = 24, channels: int = 100, kernel_size: int = 6, - prediction_length: Optional[int] = None, horizon: Optional[int] = None, trainer: Trainer = Trainer(), dropout_rate: Optional[float] = 0.2, @@ -66,6 +70,12 @@ class LSTNetEstimator(PyTorchEstimator): self.skip_rnn_cell_type = skip_rnn_cell_type self.skip_rnn_num_cells = skip_rnn_num_cells self.scaling = scaling + + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1.0, min_future=self.future_length + ) + self.validation_sampler = ValidationSplitSampler(min_future=self.future_length) + self.dtype = dtype def create_transformation(self) -> Transformation: @@ -77,20 +87,30 @@ class LSTNetEstimator(PyTorchEstimator): output_field=FieldName.OBSERVED_VALUES, dtype=self.dtype, ), - InstanceSplitter( - target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - time_series_fields=[FieldName.OBSERVED_VALUES], - past_length=self.context_length, - future_length=self.future_length, - output_NTC=False, - ), ] ) + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + time_series_fields=[FieldName.OBSERVED_VALUES], + past_length=self.context_length, + future_length=self.future_length, + output_NTC=False, + ) + def create_training_network(self, device: torch.device) -> LSTNetTrain: return LSTNetTrain( num_series=self.num_series, @@ -136,9 +156,10 @@ class LSTNetEstimator(PyTorchEstimator): copy_parameters(trained_network, prediction_network) input_names = get_module_forward_input_names(prediction_network) + prediction_splitter = self.create_instance_splitter("test") return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/pts/model/n_beats/n_beats_ensemble.py b/pts/model/n_beats/n_beats_ensemble.py index b11626f..7e4c586 100644 --- a/pts/model/n_beats/n_beats_ensemble.py +++ b/pts/model/n_beats/n_beats_ensemble.py @@ -5,6 +5,7 @@ from typing import List, Optional, Iterator import numpy as np +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.dataset.common import Dataset from gluonts.model.predictor import Predictor @@ -168,7 +169,7 @@ class NBEATSEnsembleEstimator(PyTorchEstimator): **kwargs Arguments passed down to the individual estimators. """ - + @validted() def __init__( self, freq: str, diff --git a/pts/model/n_beats/n_beats_estimator.py b/pts/model/n_beats/n_beats_estimator.py index fc867a3..95a3a21 100644 --- a/pts/model/n_beats/n_beats_estimator.py +++ b/pts/model/n_beats/n_beats_estimator.py @@ -3,12 +3,16 @@ from typing import List, Optional import torch import torch.nn as nn +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.model.predictor import Predictor from gluonts.torch.model.predictor import PyTorchPredictor from gluonts.torch.support.util import copy_parameters from gluonts.transform import ( InstanceSplitter, + ValidationSplitSampler, + TestSplitSampler, + AddObservedValuesIndicator, Transformation, Chain, RemoveFields, @@ -26,7 +30,8 @@ from .n_beats_network import ( ) -class NBEATSEstimator(PyTorchEstimator): +class NBEATSEstimato + @validated() def __init__( self, freq: str, @@ -97,6 +102,11 @@ class NBEATSEstimator(PyTorchEstimator): invalidation_message=f"Values of 'stack_types' should be one of {VALID_N_BEATS_STACK_TYPES}", ) + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1.0, min_future=prediction_length + ) + self.validation_sampler = ValidationSplitSampler(min_future=prediction_length) + def _validate_nbeats_argument( self, argument_value, @@ -138,19 +148,34 @@ class NBEATSEstimator(PyTorchEstimator): FieldName.FEAT_DYNAMIC_CAT, ] ), - InstanceSplitter( + AddObservedValuesIndicator( target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - past_length=self.context_length, - future_length=self.prediction_length, - time_series_fields=[], + output_field=FieldName.OBSERVED_VALUES, + dtype=self.dtype, ), ] ) + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + past_length=self.context_length, + future_length=self.prediction_length, + time_series_fields=[FieldName.OBSERVED_VALUES], + ) + def create_training_network(self, device: torch.device) -> NBEATSTrainingNetwork: return NBEATSTrainingNetwork( prediction_length=self.prediction_length, @@ -186,9 +211,10 @@ class NBEATSEstimator(PyTorchEstimator): copy_parameters(trained_network, prediction_network) input_names = get_module_forward_input_names(prediction_network) + prediction_splitter = self.create_instance_splitter("test") return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/pts/model/simple_feedforward/simple_feedforward_estimator.py b/pts/model/simple_feedforward/simple_feedforward_estimator.py index 12a21c2..1a3ca90 100644 --- a/pts/model/simple_feedforward/simple_feedforward_estimator.py +++ b/pts/model/simple_feedforward/simple_feedforward_estimator.py @@ -3,6 +3,7 @@ from typing import List, Optional import torch import torch.nn as nn +from gluonts.core.component import validated from gluonts.torch.support.util import copy_parameters from gluonts.torch.model.predictor import PyTorchPredictor from gluonts.torch.modules.distribution_output import DistributionOutput @@ -18,6 +19,8 @@ from gluonts.transform import ( Chain, InstanceSplitter, ExpectedNumInstanceSampler, + ValidationSplitSampler, + TestSplitSampler, ) from pts.model.utils import get_module_forward_input_names @@ -83,7 +86,7 @@ class SimpleFeedForwardEstimator(PyTorchEstimator): Number of evaluation samples per time series to increase parallelism during inference. This is a model optimization that does not affect the accuracy (default: 100) """ - + @validated() def __init__( self, freq: str, @@ -116,6 +119,11 @@ class SimpleFeedForwardEstimator(PyTorchEstimator): self.mean_scaling = mean_scaling self.num_parallel_samples = num_parallel_samples + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1, min_future=prediction_length + ) + self.validation_sampler = ValidationSplitSampler(min_future=prediction_length) + # here we do only a simple operation to convert the input data to a form # that can be digested by our model by only splitting the target in two, a # conditioning part and a to-predict part, for each training example. @@ -123,19 +131,25 @@ class SimpleFeedForwardEstimator(PyTorchEstimator): # transformation that includes time features, age feature, observed values # indicator, etc. def create_transformation(self) -> Transformation: - return Chain( - [ - InstanceSplitter( - target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - past_length=self.context_length, - future_length=self.prediction_length, - time_series_fields=[], # [FieldName.FEAT_DYNAMIC_REAL] - ) - ] + return Chain([]) + + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + past_length=self.context_length, + future_length=self.prediction_length, + time_series_fields=[], # [FieldName.FEAT_DYNAMIC_REAL] ) # defines the network, we get to see one batch to initialize it. @@ -161,6 +175,8 @@ class SimpleFeedForwardEstimator(PyTorchEstimator): trained_network: nn.Module, device: torch.device, ) -> Predictor: + prediction_splitter = self.create_instance_splitter("test") + prediction_network = SimpleFeedForwardPredictionNetwork( num_hidden_dimensions=self.num_hidden_dimensions, prediction_length=self.prediction_length, @@ -175,7 +191,7 @@ class SimpleFeedForwardEstimator(PyTorchEstimator): input_names = get_module_forward_input_names(prediction_network) return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/pts/model/tempflow/tempflow_estimator.py b/pts/model/tempflow/tempflow_estimator.py index 9baaa8b..2b8dc6a 100644 --- a/pts/model/tempflow/tempflow_estimator.py +++ b/pts/model/tempflow/tempflow_estimator.py @@ -2,6 +2,7 @@ from typing import List, Optional import torch +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.time_feature import TimeFeature from gluonts.torch.model.predictor import PyTorchPredictor @@ -12,6 +13,8 @@ from gluonts.transform import ( Transformation, Chain, InstanceSplitter, + ValidationSplitSampler, + TestSplitSampler, ExpectedNumInstanceSampler, RenameFields, AsNumpyArray, @@ -35,6 +38,7 @@ from .tempflow_network import TempFlowTrainingNetwork, TempFlowPredictionNetwork class TempFlowEstimator(PyTorchEstimator): + @validated() def __init__( self, input_size: int, @@ -103,6 +107,17 @@ class TempFlowEstimator(PyTorchEstimator): self.pick_incomplete = pick_incomplete self.scaling = scaling + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1.0, + min_past=0 if pick_incomplete else self.history_length, + min_future=prediction_length, + ) + + self.validation_sampler = ValidationSplitSampler( + min_past=0 if pick_incomplete else self.history_length, + min_future=prediction_length, + ) + def create_transformation(self) -> Transformation: return Chain( [ @@ -137,29 +152,39 @@ class TempFlowEstimator(PyTorchEstimator): target_field=FieldName.TARGET, ), AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1), - InstanceSplitter( - target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - past_length=self.history_length, - future_length=self.prediction_length, - time_series_fields=[ - FieldName.FEAT_TIME, - FieldName.OBSERVED_VALUES, - ], - pick_incomplete=self.pick_incomplete, - ), - RenameFields( - { - f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf", - f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf", - } - ), ] ) + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + past_length=self.history_length, + future_length=self.prediction_length, + time_series_fields=[ + FieldName.FEAT_TIME, + FieldName.OBSERVED_VALUES, + ], + ) + ( + RenameFields( + { + f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf", + f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf", + } + ), + ) + def create_training_network(self, device: torch.device) -> TempFlowTrainingNetwork: return TempFlowTrainingNetwork( input_size=self.input_size, @@ -214,9 +239,10 @@ class TempFlowEstimator(PyTorchEstimator): copy_parameters(trained_network, prediction_network) input_names = get_module_forward_input_names(prediction_network) + prediction_splitter = self.create_instance_splitter("test") return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/pts/model/transformer/transformer_estimator.py b/pts/model/transformer/transformer_estimator.py index 43f4f46..4a9d7c4 100644 --- a/pts/model/transformer/transformer_estimator.py +++ b/pts/model/transformer/transformer_estimator.py @@ -4,6 +4,7 @@ import numpy as np import torch import torch.nn as nn +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.time_feature import TimeFeature from gluonts.torch.modules.distribution_output import DistributionOutput @@ -14,6 +15,9 @@ from gluonts.transform import ( Transformation, Chain, InstanceSplitter, + InstanceSampler, + ValidationSplitSampler, + TestSplitSampler, ExpectedNumInstanceSampler, RemoveFields, AddAgeFeature, @@ -40,6 +44,7 @@ from .transformer_network import ( class TransformerEstimator(PyTorchEstimator): + @validated() def __init__( self, input_size: int, @@ -101,6 +106,11 @@ class TransformerEstimator(PyTorchEstimator): self.num_encoder_layers = num_encoder_layers self.num_decoder_layers = num_decoder_layers + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1.0, min_future=prediction_length + ) + self.validation_sampler = ValidationSplitSampler(min_future=prediction_length) + def create_transformation(self) -> Transformation: remove_field_names = [ FieldName.FEAT_DYNAMIC_CAT, @@ -161,22 +171,32 @@ class TransformerEstimator(PyTorchEstimator): else [] ), ), - InstanceSplitter( - target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - past_length=self.history_length, - future_length=self.prediction_length, - time_series_fields=[ - FieldName.FEAT_TIME, - FieldName.OBSERVED_VALUES, - ], - ), ] ) + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + past_length=self.history_length, + future_length=self.prediction_length, + time_series_fields=[ + FieldName.FEAT_TIME, + FieldName.OBSERVED_VALUES, + ], + ) + def create_training_network( self, device: torch.device ) -> TransformerTrainingNetwork: @@ -231,9 +251,10 @@ class TransformerEstimator(PyTorchEstimator): copy_parameters(trained_network, prediction_network) input_names = get_module_forward_input_names(prediction_network) + prediction_splitter = self._create_instance_splitter("test") return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/pts/model/transformer_tempflow/transformer_tempflow_estimator.py b/pts/model/transformer_tempflow/transformer_tempflow_estimator.py index adcb092..c738148 100644 --- a/pts/model/transformer_tempflow/transformer_tempflow_estimator.py +++ b/pts/model/transformer_tempflow/transformer_tempflow_estimator.py @@ -2,6 +2,7 @@ from typing import List, Optional import torch +from gluonts.core.component import validated from gluonts.dataset.field_names import FieldName from gluonts.time_feature import TimeFeature from gluonts.torch.support.util import copy_parameters @@ -11,6 +12,8 @@ from gluonts.transform import ( Transformation, Chain, InstanceSplitter, + ValidationSplitSampler, + TestSplitSampler, ExpectedNumInstanceSampler, RenameFields, AsNumpyArray, @@ -37,6 +40,7 @@ from .transformer_tempflow_network import ( class TransformerTempFlowEstimator(PyTorchEstimator): + @validated() def __init__( self, input_size: int, @@ -113,6 +117,17 @@ class TransformerTempFlowEstimator(PyTorchEstimator): self.pick_incomplete = pick_incomplete self.scaling = scaling + self.train_sampler = ExpectedNumInstanceSampler( + num_instances=1.0, + min_past=0 if pick_incomplete else self.history_length, + min_future=prediction_length, + ) + + self.validation_sampler = ValidationSplitSampler( + min_past=0 if pick_incomplete else self.history_length, + min_future=prediction_length, + ) + def create_transformation(self) -> Transformation: return Chain( [ @@ -147,29 +162,39 @@ class TransformerTempFlowEstimator(PyTorchEstimator): target_field=FieldName.TARGET, ), AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1), - InstanceSplitter( - target_field=FieldName.TARGET, - is_pad_field=FieldName.IS_PAD, - start_field=FieldName.START, - forecast_start_field=FieldName.FORECAST_START, - train_sampler=ExpectedNumInstanceSampler(num_instances=1), - past_length=self.history_length, - future_length=self.prediction_length, - time_series_fields=[ - FieldName.FEAT_TIME, - FieldName.OBSERVED_VALUES, - ], - pick_incomplete=self.pick_incomplete, - ), - RenameFields( - { - f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf", - f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf", - } - ), ] ) + def create_instance_splitter(self, mode: str): + assert mode in ["training", "validation", "test"] + + instance_sampler = { + "training": self.train_sampler, + "validation": self.validation_sampler, + "test": TestSplitSampler(), + }[mode] + + return InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=instance_sampler, + past_length=self.history_length, + future_length=self.prediction_length, + time_series_fields=[ + FieldName.FEAT_TIME, + FieldName.OBSERVED_VALUES, + ], + ) + ( + RenameFields( + { + f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf", + f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf", + } + ), + ) + def create_training_network( self, device: torch.device ) -> TransformerTempFlowTrainingNetwork: @@ -232,9 +257,10 @@ class TransformerTempFlowEstimator(PyTorchEstimator): copy_parameters(trained_network, prediction_network) input_names = get_module_forward_input_names(prediction_network) + prediction_splitter = self.create_instance_splitter("test") return PyTorchPredictor( - input_transform=transformation, + input_transform=transformation + prediction_splitter, input_names=input_names, prediction_net=prediction_network, batch_size=self.trainer.batch_size, diff --git a/test/feature/test_holiday.py b/test/feature/test_holiday.py index 33dc62b..72368f5 100644 --- a/test/feature/test_holiday.py +++ b/test/feature/test_holiday.py @@ -19,7 +19,7 @@ from pandas.tseries.holiday import Holiday # First-party imports -from pts.feature.holiday import ( +from gluonts.time_feature.holiday import ( CHRISTMAS_DAY, CHRISTMAS_EVE, COLUMBUS_DAY, @@ -42,9 +42,8 @@ from pts.feature.holiday import ( SpecialDateFeatureSet, squared_exponential_kernel, exponential_kernel, - CustomDateFeatureSet, - CustomHolidayFeatureSet, ) +from pts.feature.holiday import CustomDateFeatureSet, CustomHolidayFeatureSet test_dates = { NEW_YEARS_DAY: [ diff --git a/test/model/deepar/test_auxillary_outputs.py b/test/model/deepar/test_auxillary_outputs.py index 413fb56..200f514 100644 --- a/test/model/deepar/test_auxillary_outputs.py +++ b/test/model/deepar/test_auxillary_outputs.py @@ -17,7 +17,7 @@ import torch from gluonts.dataset.artificial import constant_dataset from gluonts.dataset.loader import TrainDataLoader -from gluonts.torch.batchify import batchify +from gluonts.torch.batchify import batchify from pts import Trainer from pts.model import get_module_forward_input_names @@ -50,7 +50,8 @@ def test_distribution(): training_data_loader = TrainDataLoader( train_ds, - transform=train_output.transformation, + transform=train_output.transformation + + estimator.create_instance_splitter("training"), batch_size=batch_size, num_batches_per_epoch=estimator.trainer.num_batches_per_epoch, stack_fn=batchify, @@ -65,4 +66,8 @@ def test_distribution(): *[data_entry[k] for k in input_names] ) - assert distr.sample((num_samples,)).shape == (num_samples, batch_size, seq_len,) + assert distr.sample((num_samples,)).shape == ( + num_samples, + batch_size, + seq_len, + ) diff --git a/test/model/test_forecast.py b/test/model/test_forecast.py index 1163750..27012b7 100644 --- a/test/model/test_forecast.py +++ b/test/model/test_forecast.py @@ -19,11 +19,8 @@ import torch from torch.distributions import Uniform # First-party imports -from pts.model import ( - QuantileForecast, - SampleForecast, - DistributionForecast, -) +from gluonts.model.forecast import SampleForecast +from gluonts.torch.model.forecast import DistributionForecast QUANTILES = np.arange(1, 100) / 100 SAMPLES = np.arange(101).reshape(101, 1) / 100 @@ -31,12 +28,6 @@ START_DATE = pd.Timestamp(2017, 1, 1, 12) FREQ = "1D" FORECASTS = { - "QuantileForecast": QuantileForecast( - forecast_arrays=QUANTILES.reshape(-1, 1), - start_date=START_DATE, - forecast_keys=np.array(QUANTILES, str), - freq=FREQ, - ), "SampleForecast": SampleForecast(samples=SAMPLES, start_date=START_DATE, freq=FREQ), "DistributionForecast": DistributionForecast( distribution=Uniform(low=torch.zeros(1), high=torch.ones(1)), diff --git a/test/model/test_lstnet.py b/test/model/test_lstnet.py index 16b8a68..f662418 100644 --- a/test/model/test_lstnet.py +++ b/test/model/test_lstnet.py @@ -17,12 +17,14 @@ import numpy as np import pandas as pd # First-party imports -from pts.dataset.artificial import constant_dataset -from pts.dataset import TrainDatasets, MultivariateGrouper -from pts.evaluation import backtest_metrics +from gluonts.dataset.artificial import constant_dataset +from gluonts.dataset.common import TrainDatasets +from gluonts.dataset.multivariate_grouper import MultivariateGrouper +from gluonts.evaluation import MultivariateEvaluator +from gluonts.evaluation.backtest import make_evaluation_predictions + from pts.model.lstnet import LSTNetEstimator from pts import Trainer -from pts.evaluation import MultivariateEvaluator, make_evaluation_predictions NUM_SERIES = 10 @@ -60,7 +62,7 @@ def test_lstnet(skip_size, ar_window, horizon, prediction_length): freq=freq, horizon=horizon, prediction_length=prediction_length, - trainer=Trainer(epochs=1, batch_size=2, learning_rate=0.01,), + trainer=Trainer(epochs=3, batch_size=2, learning_rate=0.01,), ) predictor = estimator.train(dataset.train) diff --git a/test/modules/test_implicit_quantile_distr_output.py b/test/modules/test_implicit_quantile_distr_output.py index 207f702..dd433c8 100644 --- a/test/modules/test_implicit_quantile_distr_output.py +++ b/test/modules/test_implicit_quantile_distr_output.py @@ -3,10 +3,7 @@ from typing import List import numpy as np import torch import torch.nn as nn -from torch.distributions import ( - Normal, - Uniform, - Bernoulli) +from torch.distributions import Normal, Uniform, Bernoulli from torch.nn.utils import clip_grad_norm_ from torch.optim import SGD from torch.utils.data import TensorDataset, DataLoader @@ -17,10 +14,7 @@ from gluonts.evaluation.backtest import make_evaluation_predictions from gluonts.torch.modules.distribution_output import DistributionOutput from pts import Trainer from pts.model.deepar import DeepAREstimator -from pts.model.simple_feedforward import SimpleFeedForwardEstimator -from pts.modules import ( - ImplicitQuantileOutput -) +from pts.modules import ImplicitQuantileOutput NUM_SAMPLES = 2000 BATCH_SIZE = 32 @@ -73,11 +67,15 @@ def learn_distribution( i, (data, sample_label) = next(enumerate(sampling_dataloader)) distr_args = arg_proj(data) distr = distr_output.distribution(distr_args) - samples = distr.sample((NUM_SAMPLES, )) + samples = distr.sample((NUM_SAMPLES,)) with torch.no_grad(): - percentile_90 = distr.quantile_function(torch.ones((1, 1, 1)), torch.ones((1, 1)) * 0.9) - percentile_10 = distr.quantile_function(torch.ones((1, 1, 1)), torch.ones((1, 1)) * 0.1) + percentile_90 = distr.quantile_function( + torch.ones((1, 1, 1)), torch.ones((1, 1)) * 0.9 + ) + percentile_10 = distr.quantile_function( + torch.ones((1, 1, 1)), torch.ones((1, 1)) * 0.1 + ) return samples.mean(), samples.std(), percentile_10, percentile_90 @@ -86,8 +84,8 @@ def test_independent_implicit_quantile() -> None: num_samples = NUM_SAMPLES # # Normal distrib - distr_mean = torch.Tensor([10.]) - distr_std = torch.Tensor([4.]) + distr_mean = torch.Tensor([10.0]) + distr_std = torch.Tensor([4.0]) distr_pp10 = distr_mean - 1.282 * distr_std distr_pp90 = distr_mean + 1.282 * distr_std distr = Normal(loc=distr_mean, scale=distr_std) @@ -97,21 +95,29 @@ def test_independent_implicit_quantile() -> None: ImplicitQuantileOutput(output_domain="Real"), samples=samples, num_epochs=50, - learning_rate=1e-2 + learning_rate=1e-2, ) - torch.testing.assert_allclose(learned_mean, distr_mean.squeeze(), rtol=0.1, atol=0.1*10) - torch.testing.assert_allclose(learned_std, distr_std.squeeze(), rtol=0.1, atol=.1*4) - torch.testing.assert_allclose(learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=.1 * 4) - torch.testing.assert_allclose(learned_pp10, distr_pp10.squeeze(), rtol=0.1, atol=.1 * 4) + torch.testing.assert_allclose( + learned_mean, distr_mean.squeeze(), rtol=0.1, atol=0.1 * 10 + ) + torch.testing.assert_allclose( + learned_std, distr_std.squeeze(), rtol=0.1, atol=0.1 * 4 + ) + torch.testing.assert_allclose( + learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=0.1 * 4 + ) + torch.testing.assert_allclose( + learned_pp10, distr_pp10.squeeze(), rtol=0.1, atol=0.1 * 4 + ) # Uniform distrib - a = torch.Tensor([0.]) - b = torch.Tensor([20.]) - distr_mean = 0.5*(a+b) - distr_std = (1./12.*(b-a)**2)**0.5 - distr_pp10 = 0.1 * (a+b) - distr_pp90 = 0.9 * (a+b) + a = torch.Tensor([0.0]) + b = torch.Tensor([20.0]) + distr_mean = 0.5 * (a + b) + distr_std = (1.0 / 12.0 * (b - a) ** 2) ** 0.5 + distr_pp10 = 0.1 * (a + b) + distr_pp90 = 0.9 * (a + b) distr = Uniform(low=a, high=b) samples = distr.sample((num_samples,)) @@ -119,19 +125,25 @@ def test_independent_implicit_quantile() -> None: ImplicitQuantileOutput(output_domain="Positive"), samples=samples, num_epochs=50, - learning_rate=1e-2 + learning_rate=1e-2, ) - torch.testing.assert_allclose(learned_mean, distr_mean.squeeze(), atol=1., rtol=0.1) + torch.testing.assert_allclose( + learned_mean, distr_mean.squeeze(), atol=1.0, rtol=0.1 + ) torch.testing.assert_allclose(learned_std, distr_std.squeeze(), atol=0.5, rtol=0.1) - torch.testing.assert_allclose(learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=.1 * 18) - torch.testing.assert_allclose(learned_pp10, distr_pp10.squeeze(), rtol=0.2, atol=.2 * 2) + torch.testing.assert_allclose( + learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=0.1 * 18 + ) + torch.testing.assert_allclose( + learned_pp10, distr_pp10.squeeze(), rtol=0.2, atol=0.2 * 2 + ) # Bernoulli distrib distr_mean = torch.Tensor([0.2]) distr_std = distr_mean * (1 - distr_mean) - distr_pp10 = torch.Tensor([0.]) - distr_pp90 = torch.Tensor([1.]) + distr_pp10 = torch.Tensor([0.0]) + distr_pp90 = torch.Tensor([1.0]) distr = Bernoulli(probs=distr_mean) samples = distr.sample((num_samples,)) @@ -139,13 +151,19 @@ def test_independent_implicit_quantile() -> None: ImplicitQuantileOutput(output_domain="Positive"), samples=samples, num_epochs=50, - learning_rate=1e-2 + learning_rate=1e-2, ) - torch.testing.assert_allclose(learned_mean, distr_mean.squeeze(), atol=1., rtol=0.1) + torch.testing.assert_allclose( + learned_mean, distr_mean.squeeze(), atol=1.0, rtol=0.1 + ) torch.testing.assert_allclose(learned_std, distr_std.squeeze(), atol=0.5, rtol=0.1) - torch.testing.assert_allclose(learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=.1 * 18) - torch.testing.assert_allclose(learned_pp10, distr_pp10.squeeze(), rtol=0.1, atol=.1 * 2) + torch.testing.assert_allclose( + learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=0.1 * 18 + ) + torch.testing.assert_allclose( + learned_pp10, distr_pp10.squeeze(), rtol=0.1, atol=0.1 * 2 + ) def test_training_with_implicit_quantile_output(): @@ -156,16 +174,16 @@ def test_training_with_implicit_quantile_output(): distr_output=ImplicitQuantileOutput(output_domain="Real"), freq=metadata.freq, prediction_length=metadata.prediction_length, - trainer=Trainer(device="cpu", - epochs=5, - learning_rate=1e-3, - num_batches_per_epoch=3, - batch_size=256, - num_workers=1, - ), + trainer=Trainer( + device="cpu", + epochs=5, + learning_rate=1e-3, + num_batches_per_epoch=3, + batch_size=256, + ), input_size=48, ) - deepar_predictor = deepar_estimator.train(dataset.train) + deepar_predictor = deepar_estimator.train(dataset.train, num_workers=1) forecast_it, ts_it = make_evaluation_predictions( dataset=dataset.test, # test dataset predictor=deepar_predictor, # predictor @@ -174,13 +192,14 @@ def test_training_with_implicit_quantile_output(): forecasts = list(forecast_it) tss = list(ts_it) evaluator = Evaluator(num_workers=0) - agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(dataset.test)) + agg_metrics, item_metrics = evaluator( + iter(tss), iter(forecasts), num_series=len(dataset.test) + ) assert agg_metrics["MSE"] > 0 def test_instanciation_of_args_proj(): - class MockedImplicitQuantileOutput(ImplicitQuantileOutput): method_calls = 0 @@ -198,17 +217,17 @@ def test_instanciation_of_args_proj(): distr_output=distr_output, freq=metadata.freq, prediction_length=metadata.prediction_length, - trainer=Trainer(device="cpu", - epochs=1, - learning_rate=1e-3, - num_batches_per_epoch=1, - batch_size=256, - num_workers=1, - ), + trainer=Trainer( + device="cpu", + epochs=3, + learning_rate=1e-3, + num_batches_per_epoch=1, + batch_size=256, + ), input_size=48, ) assert distr_output.method_calls == 1 - deepar_predictor = deepar_estimator.train(dataset.train) + deepar_predictor = deepar_estimator.train(dataset.train, num_workers=1) # Method should be called when the MockedImplicitQuantileOutput is instanciated, # and one more time because in_features is different from 1 @@ -222,7 +241,9 @@ def test_instanciation_of_args_proj(): forecasts = list(forecast_it) tss = list(ts_it) evaluator = Evaluator(num_workers=0) - agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(dataset.test)) + agg_metrics, item_metrics = evaluator( + iter(tss), iter(forecasts), num_series=len(dataset.test) + ) assert distr_output.method_calls == 2 # Test that the implicit output module is proper reset @@ -230,15 +251,17 @@ def test_instanciation_of_args_proj(): distr_output=MockedImplicitQuantileOutput(output_domain="Real"), freq=metadata.freq, prediction_length=metadata.prediction_length, - trainer=Trainer(device="cpu", - epochs=1, - learning_rate=1e-3, - num_batches_per_epoch=1, - batch_size=256, - num_workers=1, - ), + trainer=Trainer( + device="cpu", + epochs=3, + learning_rate=1e-3, + num_batches_per_epoch=1, + batch_size=256, + ), input_size=48, ) assert distr_output.method_calls == 3 - new_estimator.train(dataset.train) - assert distr_output.method_calls == 3 # Since in_feature is the same as before, there should be no additional call + new_estimator.train(dataset.train, num_workers=1) + assert ( + distr_output.method_calls == 3 + ) # Since in_feature is the same as before, there should be no additional call