This commit is contained in:
Dr. Kashif Rasul
2019-12-21 14:59:11 +01:00
parent 8d0daa3236
commit 032f32faf0
20 changed files with 506 additions and 398 deletions
+9 -4
View File
@@ -1,9 +1,16 @@
from typing import Callable, List, NamedTuple, Optional, Tuple, Union
from .common import MetaData, CategoricalFeatureInfo, BasicFeatureInfo, FieldName, Dataset
from .common import (
MetaData,
CategoricalFeatureInfo,
BasicFeatureInfo,
FieldName,
Dataset,
)
from .list_dataset import ListDataset
from .stat import DatasetStatistics, calculate_dataset_statistics
class DatasetInfo(NamedTuple):
"""
Information stored on a dataset. When downloading from the repository, the
@@ -22,9 +29,7 @@ def constant_dataset() -> Tuple[DatasetInfo, Dataset, Dataset]:
metadata = MetaData(
freq="1H",
feat_static_cat=[
CategoricalFeatureInfo(
name="feat_static_cat_000", cardinality="10"
)
CategoricalFeatureInfo(name="feat_static_cat_000", cardinality="10")
],
feat_static_real=[BasicFeatureInfo(name="feat_static_real_000")],
)
+3 -1
View File
@@ -84,7 +84,9 @@ class FileDataset(Dataset):
for path in self.files():
for line in JsonLinesFile(path):
data = self.process(line.content)
data["source"] = SourceContext(source=line.span.path, row=line.span.line)
data["source"] = SourceContext(
source=line.span.path, row=line.span.line
)
yield data
def __len__(self):
+1 -1
View File
@@ -1,2 +1,2 @@
from .evaluator import Evaluator, MultivariateEvaluator
from .backtest import make_evaluation_predictions
from .backtest import make_evaluation_predictions
+24 -21
View File
@@ -7,14 +7,20 @@ import pandas as pd
# First-party imports
from pts.transform import AdhocTransform, TransformedDataset
from pts.dataset import DataEntry, Dataset, InferenceDataLoader, DatasetStatistics, calculate_dataset_statistics
from pts.dataset import (
DataEntry,
Dataset,
InferenceDataLoader,
DatasetStatistics,
calculate_dataset_statistics,
)
from pts.model import Estimator, PTSEstimator, PTSPredictor, Predictor, Forecast
from .evaluator import Evaluator
def make_evaluation_predictions(
dataset: Dataset, predictor: Predictor,
num_samples: int) -> Tuple[Iterator[Forecast], Iterator[pd.Series]]:
dataset: Dataset, predictor: Predictor, num_samples: int
) -> Tuple[Iterator[Forecast], Iterator[pd.Series]]:
"""
Return predictions on the last portion of predict_length time units of the
target. Such portion is cut before making predictions, such a function can
@@ -38,17 +44,13 @@ def make_evaluation_predictions(
prediction_length = predictor.prediction_length
freq = predictor.freq
def add_ts_dataframe(
data_iterator: Iterator[DataEntry]) -> Iterator[DataEntry]:
def add_ts_dataframe(data_iterator: Iterator[DataEntry]) -> Iterator[DataEntry]:
for data_entry in data_iterator:
data = data_entry.copy()
index = pd.date_range(
start=data["start"],
freq=freq,
periods=data["target"].shape[-1],
start=data["start"], freq=freq, periods=data["target"].shape[-1],
)
data["ts"] = pd.DataFrame(index=index,
data=data["target"].transpose())
data["ts"] = pd.DataFrame(index=index, data=data["target"].transpose())
yield data
def ts_iter(dataset: Dataset) -> pd.DataFrame:
@@ -58,8 +60,9 @@ def make_evaluation_predictions(
def truncate_target(data):
data = data.copy()
target = data["target"]
assert (target.shape[-1] >= prediction_length
) # handles multivariate case (target_dim, history_length)
assert (
target.shape[-1] >= prediction_length
) # handles multivariate case (target_dim, history_length)
data["target"] = target[..., :-prediction_length]
return data
@@ -68,7 +71,8 @@ def make_evaluation_predictions(
# TODO the test set may be gone otherwise with such a filtering)
dataset_trunc = TransformedDataset(
dataset, transformations=[AdhocTransform(truncate_target)])
dataset, transformations=[AdhocTransform(truncate_target)]
)
return (
predictor.predict(dataset_trunc, num_samples=num_samples),
@@ -90,8 +94,7 @@ def backtest_metrics(
train_dataset: Optional[Dataset],
test_dataset: Dataset,
forecaster: Union[Estimator, Predictor],
evaluator=Evaluator(quantiles=(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8,
0.9)),
evaluator=Evaluator(quantiles=(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)),
num_samples: int = 100,
logging_file: Optional[str] = None,
):
@@ -145,13 +148,13 @@ def backtest_metrics(
else:
predictor = forecaster
forecast_it, ts_it = make_evaluation_predictions(test_dataset,
predictor=predictor,
num_samples=num_samples)
forecast_it, ts_it = make_evaluation_predictions(
test_dataset, predictor=predictor, num_samples=num_samples
)
agg_metrics, item_metrics = evaluator(ts_it,
forecast_it,
num_series=len(test_dataset))
agg_metrics, item_metrics = evaluator(
ts_it, forecast_it, num_series=len(test_dataset)
)
# we only log aggregate metrics for now as item metrics may be very large
for name, value in agg_metrics.items():
+122 -118
View File
@@ -45,10 +45,10 @@ class Evaluator:
default_quantiles = 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9
def __init__(
self,
quantiles: Iterable[Union[float, str]] = default_quantiles,
seasonality: Optional[int] = None,
alpha: float = 0.05,
self,
quantiles: Iterable[Union[float, str]] = default_quantiles,
seasonality: Optional[int] = None,
alpha: float = 0.05,
) -> None:
self.quantiles = tuple(map(Quantile.parse, quantiles))
self.seasonality = seasonality
@@ -86,18 +86,20 @@ class Evaluator:
rows = []
with tqdm(
zip(ts_iterator, fcst_iterator),
total=num_series,
desc="Running evaluation",
zip(ts_iterator, fcst_iterator),
total=num_series,
desc="Running evaluation",
) as it, np.errstate(invalid="ignore"):
for ts, forecast in it:
rows.append(self.get_metrics_per_ts(ts, forecast))
assert not any(True for _ in ts_iterator
), "ts_iterator has more elements than fcst_iterator"
assert not any(
True for _ in ts_iterator
), "ts_iterator has more elements than fcst_iterator"
assert not any(True for _ in fcst_iterator
), "fcst_iterator has more elements than ts_iterator"
assert not any(
True for _ in fcst_iterator
), "fcst_iterator has more elements than ts_iterator"
if num_series is not None:
assert (
@@ -112,8 +114,8 @@ class Evaluator:
@staticmethod
def extract_pred_target(
time_series: Union[pd.Series, pd.DataFrame],
forecast: Forecast) -> np.ndarray:
time_series: Union[pd.Series, pd.DataFrame], forecast: Forecast
) -> np.ndarray:
"""
Parameters
@@ -126,18 +128,17 @@ class Evaluator:
Union[pandas.Series, pandas.DataFrame]
time series cut in the Forecast object dates
"""
assert forecast.index.intersection(time_series.index).equals(
forecast.index
), ("Cannot extract prediction target since the index of forecast is outside the index of target\n"
assert forecast.index.intersection(time_series.index).equals(forecast.index), (
"Cannot extract prediction target since the index of forecast is outside the index of target\n"
f"Index of forecast: {forecast.index}\n Index of target: {time_series.index}"
)
)
# cut the time series using the dates of the forecast object
return np.atleast_1d(
np.squeeze(time_series.loc[forecast.index].transpose()))
return np.atleast_1d(np.squeeze(time_series.loc[forecast.index].transpose()))
def seasonal_error(self, time_series: Union[pd.Series, pd.DataFrame],
forecast: Forecast) -> float:
def seasonal_error(
self, time_series: Union[pd.Series, pd.DataFrame], forecast: Forecast
) -> float:
r"""
.. math::
@@ -154,8 +155,9 @@ class Evaluator:
ts = time_series[:date_before_forecast]
# Check if the length of the time series is larger than the seasonal frequency
seasonality = (self.seasonality
if self.seasonality else get_seasonality(forecast.freq))
seasonality = (
self.seasonality if self.seasonality else get_seasonality(forecast.freq)
)
if seasonality < len(ts):
forecast_freq = seasonality
else:
@@ -171,8 +173,8 @@ class Evaluator:
return seasonal_mae if seasonal_mae is not np.ma.masked else np.nan
def get_metrics_per_ts(
self, time_series: Union[pd.Series, pd.DataFrame],
forecast: Forecast) -> Dict[str, Union[float, str, None]]:
self, time_series: Union[pd.Series, pd.DataFrame], forecast: Forecast
) -> Dict[str, Union[float, str, None]]:
pred_target = np.array(self.extract_pred_target(time_series, forecast))
pred_target = np.ma.masked_invalid(pred_target)
@@ -183,33 +185,21 @@ class Evaluator:
median_fcst = forecast.quantile(0.5)
seasonal_error = self.seasonal_error(time_series, forecast)
# For MSIS: alpha/2 quantile may not exist. Find the closest.
lower_q = min(self.quantiles,
key=lambda q: abs(q.value - self.alpha / 2))
lower_q = min(self.quantiles, key=lambda q: abs(q.value - self.alpha / 2))
upper_q = min(
reversed(self.quantiles),
key=lambda q: abs(q.value - (1 - self.alpha / 2)),
reversed(self.quantiles), key=lambda q: abs(q.value - (1 - self.alpha / 2)),
)
metrics = {
"item_id":
forecast.item_id,
"MSE":
self.mse(pred_target, mean_fcst)
if mean_fcst is not None else None,
"abs_error":
self.abs_error(pred_target, median_fcst),
"abs_target_sum":
self.abs_target_sum(pred_target),
"abs_target_mean":
self.abs_target_mean(pred_target),
"seasonal_error":
seasonal_error,
"MASE":
self.mase(pred_target, median_fcst, seasonal_error),
"sMAPE":
self.smape(pred_target, median_fcst),
"MSIS":
self.msis(
"item_id": forecast.item_id,
"MSE": self.mse(pred_target, mean_fcst) if mean_fcst is not None else None,
"abs_error": self.abs_error(pred_target, median_fcst),
"abs_target_sum": self.abs_target_sum(pred_target),
"abs_target_mean": self.abs_target_mean(pred_target),
"seasonal_error": seasonal_error,
"MASE": self.mase(pred_target, median_fcst, seasonal_error),
"sMAPE": self.smape(pred_target, median_fcst),
"MSIS": self.msis(
pred_target,
forecast.quantile(lower_q.value),
forecast.quantile(upper_q.value),
@@ -222,9 +212,11 @@ class Evaluator:
forecast_quantile = forecast.quantile(quantile.value)
metrics[quantile.loss_name] = self.quantile_loss(
pred_target, forecast_quantile, quantile.value)
pred_target, forecast_quantile, quantile.value
)
metrics[quantile.coverage_name] = self.coverage(
pred_target, forecast_quantile)
pred_target, forecast_quantile
)
return metrics
@@ -245,39 +237,41 @@ class Evaluator:
agg_funs[quantile.loss_name] = "sum"
agg_funs[quantile.coverage_name] = "mean"
assert (set(metric_per_ts.columns) >= agg_funs.keys()
), "The some of the requested item metrics are missing."
assert (
set(metric_per_ts.columns) >= agg_funs.keys()
), "The some of the requested item metrics are missing."
totals = {
key: metric_per_ts[key].agg(agg)
for key, agg in agg_funs.items()
}
totals = {key: metric_per_ts[key].agg(agg) for key, agg in agg_funs.items()}
# derived metrics based on previous aggregate metrics
totals["RMSE"] = np.sqrt(totals["MSE"])
flag = totals["abs_target_mean"] == 0
totals["NRMSE"] = np.divide(totals["RMSE"] * (1 - flag),
totals["abs_target_mean"] + flag)
totals["NRMSE"] = np.divide(
totals["RMSE"] * (1 - flag), totals["abs_target_mean"] + flag
)
flag = totals["abs_target_sum"] == 0
totals["ND"] = np.divide(totals["abs_error"] * (1 - flag),
totals["abs_target_sum"] + flag)
totals["ND"] = np.divide(
totals["abs_error"] * (1 - flag), totals["abs_target_sum"] + flag
)
all_qLoss_names = [
quantile.weighted_loss_name for quantile in self.quantiles
]
all_qLoss_names = [quantile.weighted_loss_name for quantile in self.quantiles]
for quantile in self.quantiles:
totals[quantile.weighted_loss_name] = np.divide(
totals[quantile.loss_name], totals["abs_target_sum"])
totals[quantile.loss_name], totals["abs_target_sum"]
)
totals["mean_wQuantileLoss"] = np.array(
[totals[ql] for ql in all_qLoss_names]).mean()
[totals[ql] for ql in all_qLoss_names]
).mean()
totals["MAE_Coverage"] = np.mean([
np.abs(totals[q.coverage_name] - np.array([q.value]))
for q in self.quantiles
])
totals["MAE_Coverage"] = np.mean(
[
np.abs(totals[q.coverage_name] - np.array([q.value]))
for q in self.quantiles
]
)
return totals, metric_per_ts
@staticmethod
@@ -291,8 +285,8 @@ class Evaluator:
@staticmethod
def quantile_loss(target, quantile_forecast, q):
return 2.0 * np.sum(
np.abs((quantile_forecast - target) *
((target <= quantile_forecast) - q)))
np.abs((quantile_forecast - target) * ((target <= quantile_forecast) - q))
)
@staticmethod
def coverage(target, quantile_forecast):
@@ -308,8 +302,9 @@ class Evaluator:
https://www.m4.unic.ac.cy/wp-content/uploads/2018/03/M4-Competitors-Guide.pdf
"""
flag = seasonal_error == 0
return (np.mean(np.abs(target - forecast)) *
(1 - flag)) / (seasonal_error + flag)
return (np.mean(np.abs(target - forecast)) * (1 - flag)) / (
seasonal_error + flag
)
@staticmethod
def smape(target, forecast):
@@ -325,7 +320,8 @@ class Evaluator:
flag = denominator == 0
smape = 2 * np.mean(
(np.abs(target - forecast) * (1 - flag)) / (denominator + flag))
(np.abs(target - forecast) * (1 - flag)) / (denominator + flag)
)
return smape
@staticmethod
@@ -337,11 +333,12 @@ class Evaluator:
https://www.m4.unic.ac.cy/wp-content/uploads/2018/03/M4-Competitors-Guide.pdf
"""
numerator = np.mean(upper_quantile - lower_quantile + 2.0 / alpha *
(lower_quantile - target) *
(target < lower_quantile) + 2.0 / alpha *
(target - upper_quantile) *
(target > upper_quantile))
numerator = np.mean(
upper_quantile
- lower_quantile
+ 2.0 / alpha * (lower_quantile - target) * (target < lower_quantile)
+ 2.0 / alpha * (target - upper_quantile) * (target > upper_quantile)
)
flag = seasonal_error == 0
return (numerator * (1 - flag)) / (seasonal_error + flag)
@@ -383,6 +380,7 @@ class MultivariateEvaluator(Evaluator):
(if target_agg_funcs is set).
'm_sum_abs_error': 4.2}
"""
def __init__(
self,
quantiles: Iterable[Union[float, str]] = np.linspace(0.1, 0.9, 9),
@@ -412,33 +410,35 @@ class MultivariateEvaluator(Evaluator):
dimension axis. Useful to compute metrics over aggregated target
and forecast (typically sum or mean).
"""
super().__init__(quantiles=quantiles,
seasonality=seasonality,
alpha=alpha)
super().__init__(quantiles=quantiles, seasonality=seasonality, alpha=alpha)
self._eval_dims = eval_dims
self.target_agg_funcs = target_agg_funcs
@staticmethod
def extract_target_by_dim(it_iterator: Iterator[pd.DataFrame],
dim: int) -> Iterator[pd.DataFrame]:
def extract_target_by_dim(
it_iterator: Iterator[pd.DataFrame], dim: int
) -> Iterator[pd.DataFrame]:
for i in it_iterator:
yield (i[dim])
@staticmethod
def extract_forecast_by_dim(forecast_iterator: Iterator[Forecast],
dim: int) -> Iterator[Forecast]:
def extract_forecast_by_dim(
forecast_iterator: Iterator[Forecast], dim: int
) -> Iterator[Forecast]:
for forecast in forecast_iterator:
yield forecast.copy_dim(dim)
@staticmethod
def extract_aggregate_target(it_iterator: Iterator[pd.DataFrame],
agg_fun: Callable) -> Iterator[pd.DataFrame]:
def extract_aggregate_target(
it_iterator: Iterator[pd.DataFrame], agg_fun: Callable
) -> Iterator[pd.DataFrame]:
for i in it_iterator:
yield i.agg(agg_fun, axis=1)
@staticmethod
def extract_aggregate_forecast(forecast_iterator: Iterator[Forecast],
agg_fun: Callable) -> Iterator[Forecast]:
def extract_aggregate_forecast(
forecast_iterator: Iterator[Forecast], agg_fun: Callable
) -> Iterator[Forecast]:
for forecast in forecast_iterator:
yield forecast.copy_aggregate(agg_fun)
@@ -454,22 +454,27 @@ class MultivariateEvaluator(Evaluator):
assert target_dim > 1, (
f"the dimensionality of the forecast should be larger than 1, "
f"but got {target_dim}. "
f"Please use the Evaluator to evaluate 1D forecasts.")
f"Please use the Evaluator to evaluate 1D forecasts."
)
return target_dim
def get_eval_dims(self, target_dimensionality: int) -> List[int]:
eval_dims = (self._eval_dims if self._eval_dims is not None else list(
range(0, target_dimensionality)))
eval_dims = (
self._eval_dims
if self._eval_dims is not None
else list(range(0, target_dimensionality))
)
assert max(eval_dims) < target_dimensionality, (
f"eval dims should range from 0 to target_dimensionality - 1, "
f"but got max eval_dim {max(eval_dims)}")
f"but got max eval_dim {max(eval_dims)}"
)
return eval_dims
def calculate_aggregate_multivariate_metrics(
self,
ts_iterator: Iterator[pd.DataFrame],
forecast_iterator: Iterator[Forecast],
agg_fun: Callable,
self,
ts_iterator: Iterator[pd.DataFrame],
forecast_iterator: Iterator[Forecast],
agg_fun: Callable,
) -> Dict[str, float]:
"""
@@ -493,9 +498,7 @@ class MultivariateEvaluator(Evaluator):
return agg_metrics
def calculate_aggregate_vector_metrics(
self,
all_agg_metrics: Dict[str, float],
all_metrics_per_ts: pd.DataFrame,
self, all_agg_metrics: Dict[str, float], all_metrics_per_ts: pd.DataFrame,
) -> Dict[str, float]:
"""
@@ -513,17 +516,16 @@ class MultivariateEvaluator(Evaluator):
dictionary with aggregate metrics (of individual (evaluated)
dimensions and the entire vector)
"""
vector_aggregate_metrics, _ = self.get_aggregate_metrics(
all_metrics_per_ts)
vector_aggregate_metrics, _ = self.get_aggregate_metrics(all_metrics_per_ts)
for key, value in vector_aggregate_metrics.items():
all_agg_metrics[key] = value
return all_agg_metrics
def __call__(
self,
ts_iterator: Iterable[pd.DataFrame],
fcst_iterator: Iterable[Forecast],
num_series=None,
self,
ts_iterator: Iterable[pd.DataFrame],
fcst_iterator: Iterable[Forecast],
num_series=None,
) -> Tuple[Dict[str, float], pd.DataFrame]:
ts_iterator = iter(ts_iterator)
fcst_iterator = iter(fcst_iterator)
@@ -536,16 +538,17 @@ class MultivariateEvaluator(Evaluator):
eval_dims = self.get_eval_dims(target_dimensionality)
ts_iterator_set = tee(
ts_iterator, target_dimensionality + len(self.target_agg_funcs))
ts_iterator, target_dimensionality + len(self.target_agg_funcs)
)
fcst_iterator_set = tee(
fcst_iterator, target_dimensionality + len(self.target_agg_funcs))
fcst_iterator, target_dimensionality + len(self.target_agg_funcs)
)
for dim in eval_dims:
agg_metrics, metrics_per_ts = super(
MultivariateEvaluator, self).__call__(
self.extract_target_by_dim(ts_iterator_set[dim], dim),
self.extract_forecast_by_dim(fcst_iterator_set[dim], dim),
)
agg_metrics, metrics_per_ts = super(MultivariateEvaluator, self).__call__(
self.extract_target_by_dim(ts_iterator_set[dim], dim),
self.extract_forecast_by_dim(fcst_iterator_set[dim], dim),
)
all_metrics_per_ts.append(metrics_per_ts)
@@ -554,7 +557,8 @@ class MultivariateEvaluator(Evaluator):
all_metrics_per_ts = pd.concat(all_metrics_per_ts)
all_agg_metrics = self.calculate_aggregate_vector_metrics(
all_agg_metrics, all_metrics_per_ts)
all_agg_metrics, all_metrics_per_ts
)
if self.target_agg_funcs:
multivariate_metrics = {
@@ -563,9 +567,9 @@ class MultivariateEvaluator(Evaluator):
fcst_iterator_set[-(index + 1)],
agg_fun,
)
for index, (
agg_fun_name,
agg_fun) in enumerate(self.target_agg_funcs.items())
for index, (agg_fun_name, agg_fun) in enumerate(
self.target_agg_funcs.items()
)
}
for key, metric_dict in multivariate_metrics.items():
+1 -1
View File
@@ -11,4 +11,4 @@ from .time_feature import (
time_features_from_frequency_str,
)
from .utils import get_granularity, get_seasonality
from .utils import get_granularity, get_seasonality
+1
View File
@@ -22,6 +22,7 @@ def get_granularity(freq_str: str) -> Tuple[int, str]:
granularity = groups[2]
return multiple, granularity
@lru_cache()
def get_seasonality(freq: str) -> int:
"""
+1 -1
View File
@@ -2,4 +2,4 @@ from .estimator import Estimator, PTSEstimator
from .forecast import Forecast, SampleForecast, QuantileForecast, DistributionForecast
from .predictor import Predictor, PTSPredictor
from .quantile import Quantile
from .utils import get_module_forward_input_names, copy_parameters
from .utils import get_module_forward_input_names, copy_parameters
+1 -1
View File
@@ -1,2 +1,2 @@
from .deepar_estimator import DeepAREstimator
from .deepar_network import DeepARNetwork, DeepARTrainingNetwork
from .deepar_network import DeepARNetwork, DeepARTrainingNetwork
+71 -51
View File
@@ -9,7 +9,8 @@ from pts import Trainer
from pts.feature import (
TimeFeature,
get_lags_for_frequency,
time_features_from_frequency_str)
time_features_from_frequency_str,
)
from pts.transform import (
Transformation,
Chain,
@@ -29,35 +30,37 @@ from pts.modules import DistributionOutput, StudentTOutput
from .deepar_network import DeepARTrainingNetwork, DeepARPredictionNetwork
class DeepAREstimator(PTSEstimator):
def __init__(
self,
freq: str,
prediction_length: int,
input_size: int,
trainer: Trainer = Trainer(),
context_length: Optional[int] = None,
num_layers: int = 2,
num_cells: int = 40,
cell_type: str = "LSTM",
dropout_rate: float = 0.1,
use_feat_dynamic_real: bool = False,
use_feat_static_cat: bool = False,
use_feat_static_real: bool = False,
cardinality: Optional[List[int]] = None,
embedding_dimension: Optional[List[int]] = None,
distr_output: DistributionOutput = StudentTOutput(),
scaling: bool = True,
lags_seq: Optional[List[int]] = None,
time_features: Optional[List[TimeFeature]] = None,
num_parallel_samples: int = 100,
dtype: np.dtype = np.float32,
self,
freq: str,
prediction_length: int,
input_size: int,
trainer: Trainer = Trainer(),
context_length: Optional[int] = None,
num_layers: int = 2,
num_cells: int = 40,
cell_type: str = "LSTM",
dropout_rate: float = 0.1,
use_feat_dynamic_real: bool = False,
use_feat_static_cat: bool = False,
use_feat_static_real: bool = False,
cardinality: Optional[List[int]] = None,
embedding_dimension: Optional[List[int]] = None,
distr_output: DistributionOutput = StudentTOutput(),
scaling: bool = True,
lags_seq: Optional[List[int]] = None,
time_features: Optional[List[TimeFeature]] = None,
num_parallel_samples: int = 100,
dtype: np.dtype = np.float32,
) -> None:
super().__init__(trainer=trainer)
self.freq = freq
self.context_length = (context_length if context_length is not None
else prediction_length)
self.context_length = (
context_length if context_length is not None else prediction_length
)
self.prediction_length = prediction_length
self.distr_output = distr_output
self.distr_output.dtype = dtype
@@ -71,13 +74,19 @@ class DeepAREstimator(PTSEstimator):
self.use_feat_static_real = use_feat_static_real
self.cardinality = cardinality if cardinality and use_feat_static_cat else [1]
self.embedding_dimension = (
embedding_dimension if embedding_dimension is not None else
[min(50, (cat + 1) // 2) for cat in self.cardinality])
embedding_dimension
if embedding_dimension is not None
else [min(50, (cat + 1) // 2) for cat in self.cardinality]
)
self.scaling = scaling
self.lags_seq = (lags_seq if lags_seq is not None else
get_lags_for_frequency(freq_str=freq))
self.time_features = (time_features if time_features is not None else
time_features_from_frequency_str(self.freq))
self.lags_seq = (
lags_seq if lags_seq is not None else get_lags_for_frequency(freq_str=freq)
)
self.time_features = (
time_features
if time_features is not None
else time_features_from_frequency_str(self.freq)
)
self.history_length = self.context_length + max(self.lags_seq)
@@ -91,21 +100,23 @@ class DeepAREstimator(PTSEstimator):
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
return Chain(
[RemoveFields(field_names=remove_field_names)] +
([SetField(output_field=FieldName.FEAT_STATIC_CAT, value=[0]
)] if not self.use_feat_static_cat else []) +
([SetField(output_field=FieldName.FEAT_STATIC_REAL, value=[0.0]
)] if not self.use_feat_static_real else []) +
[
[RemoveFields(field_names=remove_field_names)]
+ (
[SetField(output_field=FieldName.FEAT_STATIC_CAT, value=[0])]
if not self.use_feat_static_cat
else []
)
+ (
[SetField(output_field=FieldName.FEAT_STATIC_REAL, value=[0.0])]
if not self.use_feat_static_real
else []
)
+ [
AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT,
expected_ndim=1,
dtype=np.long,
field=FieldName.FEAT_STATIC_CAT, expected_ndim=1, dtype=np.long,
),
AsNumpyArray(
field=FieldName.FEAT_STATIC_REAL,
expected_ndim=1,
dtype=self.dtype,
field=FieldName.FEAT_STATIC_REAL, expected_ndim=1, dtype=self.dtype,
),
AsNumpyArray(
field=FieldName.TARGET,
@@ -134,9 +145,12 @@ class DeepAREstimator(PTSEstimator):
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE] +
([FieldName.FEAT_DYNAMIC_REAL] if self.
use_feat_dynamic_real else []),
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if self.use_feat_dynamic_real
else []
),
),
InstanceSplitter(
target_field=FieldName.TARGET,
@@ -151,7 +165,8 @@ class DeepAREstimator(PTSEstimator):
FieldName.OBSERVED_VALUES,
],
),
])
]
)
def create_training_network(self, device: torch.device) -> DeepARTrainingNetwork:
return DeepARTrainingNetwork(
@@ -168,10 +183,14 @@ class DeepAREstimator(PTSEstimator):
embedding_dimension=self.embedding_dimension,
lags_seq=self.lags_seq,
scaling=self.scaling,
dtype=self.dtype).to(device)
dtype=self.dtype,
).to(device)
def create_predictor(
self, transformation: Transformation, trained_network: nn.Module, device: torch.device
self,
transformation: Transformation,
trained_network: nn.Module,
device: torch.device,
) -> Predictor:
prediction_network = DeepARPredictionNetwork(
num_parallel_samples=self.num_parallel_samples,
@@ -188,7 +207,8 @@ class DeepAREstimator(PTSEstimator):
embedding_dimension=self.embedding_dimension,
lags_seq=self.lags_seq,
scaling=self.scaling,
dtype=self.dtype).to(device)
dtype=self.dtype,
).to(device)
copy_parameters(trained_network, prediction_network)
@@ -199,5 +219,5 @@ class DeepAREstimator(PTSEstimator):
freq=self.freq,
prediction_length=self.prediction_length,
device=device,
dtype=self.dtype
)
dtype=self.dtype,
)
+114 -83
View File
@@ -8,29 +8,31 @@ import numpy as np
from pts.modules import DistributionOutput, MeanScaler, NOPScaler, FeatureEmbedder
def prod(xs):
p = 1
for x in xs:
p *= x
return p
class DeepARNetwork(nn.Module):
def __init__(
self,
input_size: int,
num_layers: int,
num_cells: int,
cell_type: str,
history_length: int,
context_length: int,
prediction_length: int,
distr_output: DistributionOutput,
dropout_rate: float,
cardinality: List[int],
embedding_dimension: List[int],
lags_seq: List[int],
scaling: bool = True,
dtype: np.dtype = np.float32,
self,
input_size: int,
num_layers: int,
num_cells: int,
cell_type: str,
history_length: int,
context_length: int,
prediction_length: int,
distr_output: DistributionOutput,
dropout_rate: float,
cardinality: List[int],
embedding_dimension: List[int],
lags_seq: List[int],
scaling: bool = True,
dtype: np.dtype = np.float32,
) -> None:
super().__init__()
self.input_size = input_size
@@ -51,18 +53,21 @@ class DeepARNetwork(nn.Module):
self.distr_output = distr_output
rnn = {"LSTM": nn.LSTM, "GRU": nn.GRU}[self.cell_type]
self.rnn = rnn(input_size=input_size,
hidden_size=num_cells,
num_layers=num_layers,
dropout=dropout_rate,
batch_first=True)
self.rnn = rnn(
input_size=input_size,
hidden_size=num_cells,
num_layers=num_layers,
dropout=dropout_rate,
batch_first=True,
)
self.target_shape = distr_output.event_shape
self.proj_distr_args = distr_output.get_args_proj(num_cells)
self.embedder = FeatureEmbedder(cardinalities=cardinality,
embedding_dims=embedding_dimension)
self.embedder = FeatureEmbedder(
cardinalities=cardinality, embedding_dims=embedding_dimension
)
if scaling:
self.scaler = MeanScaler(keepdim=True)
@@ -71,10 +76,11 @@ class DeepARNetwork(nn.Module):
@staticmethod
def get_lagged_subsequences(
sequence: torch.Tensor,
sequence_length: int,
indices: List[int],
subsequences_length: int = 1) -> torch.Tensor:
sequence: torch.Tensor,
sequence_length: int,
indices: List[int],
subsequences_length: int = 1,
) -> torch.Tensor:
"""
Returns lagged subsequences of a given sequence.
Parameters
@@ -97,7 +103,8 @@ class DeepARNetwork(nn.Module):
"""
assert max(indices) + subsequences_length <= sequence_length, (
f"lags cannot go further than history length, found lag {max(indices)} "
f"while history length is only {sequence_length}")
f"while history length is only {sequence_length}"
)
assert all(lag_index >= 0 for lag_index in indices)
lagged_values = []
@@ -108,9 +115,9 @@ class DeepARNetwork(nn.Module):
return torch.stack(lagged_values, dim=-1)
@staticmethod
def weighted_average(tensor: torch.Tensor,
weights: Optional[torch.Tensor] = None,
dim=None):
def weighted_average(
tensor: torch.Tensor, weights: Optional[torch.Tensor] = None, dim=None
):
if weights is not None:
weighted_tensor = tensor * weights
if dim is not None:
@@ -121,7 +128,7 @@ class DeepARNetwork(nn.Module):
sum_weighted_tensor = weighted_tensor.sum()
sum_weights = torch.max(torch.ones_like(sum_weights), sum_weights)
return sum_weighted_tensor / sum_weights
else:
if dim is not None:
@@ -130,45 +137,51 @@ class DeepARNetwork(nn.Module):
return tensor.mean()
def unroll_encoder(
self,
feat_static_cat: torch.Tensor, # (batch_size, num_features)
feat_static_real: torch.Tensor, # (batch_size, num_features)
past_time_feat: torch.Tensor, # (batch_size, history_length, num_features)
past_target: torch.Tensor, # (batch_size, history_length, *target_shape)
past_observed_values: torch.Tensor, # (batch_size, history_length, *target_shape)
future_time_feat: Optional[
torch.Tensor]=None, # (batch_size, prediction_length, num_features)
future_target: Optional[
torch.Tensor]=None, # (batch_size, prediction_length, *target_shape)
self,
feat_static_cat: torch.Tensor, # (batch_size, num_features)
feat_static_real: torch.Tensor, # (batch_size, num_features)
past_time_feat: torch.Tensor, # (batch_size, history_length, num_features)
past_target: torch.Tensor, # (batch_size, history_length, *target_shape)
past_observed_values: torch.Tensor, # (batch_size, history_length, *target_shape)
future_time_feat: Optional[
torch.Tensor
] = None, # (batch_size, prediction_length, num_features)
future_target: Optional[
torch.Tensor
] = None, # (batch_size, prediction_length, *target_shape)
) -> Tuple[torch.Tensor, Union[torch.Tensor, List], torch.Tensor, torch.Tensor]:
if future_time_feat is None or future_target is None:
time_feat = past_time_feat[:,self.history_length - self.context_length:,...]
time_feat = past_time_feat[
:, self.history_length - self.context_length :, ...
]
sequence = past_target
sequence_length = self.history_length
subsequences_length = self.context_length
else:
time_feat = torch.cat(
(
past_time_feat[:,self.history_length - self.context_length:,...],
past_time_feat[:, self.history_length - self.context_length :, ...],
future_time_feat,
),
dim=1)
dim=1,
)
sequence = torch.cat((past_target, future_target), dim=1)
sequence_length = self.history_length + self.prediction_length
subsequences_length = self.context_length + self.prediction_length
lags = self.get_lagged_subsequences(
sequence=sequence,
sequence_length=sequence_length,
indices=self.lags_seq,
subsequences_length=subsequences_length)
subsequences_length=subsequences_length,
)
# scale is computed on the context length last units of the past target
# scale shape is (batch_size, 1, *target_shape)
_, scale = self.scaler(
past_target[:,self.context_length:,...],
past_observed_values[:,self.context_length:,...]
past_target[:, self.context_length :, ...],
past_observed_values[:, self.context_length :, ...],
)
# (batch_size, num_features)
@@ -177,23 +190,28 @@ class DeepARNetwork(nn.Module):
# in addition to embedding features, use the log scale as it can help
# prediction too
# (batch_size, num_features + prod(target_shape))
static_feat = torch.cat((
embedded_cat,
feat_static_real,
scale.log()
if len(self.target_shape) == 0
else scale.squeeze(1).log()
), dim=1)
static_feat = torch.cat(
(
embedded_cat,
feat_static_real,
scale.log() if len(self.target_shape) == 0 else scale.squeeze(1).log(),
),
dim=1,
)
# (batch_size, subsequences_length, num_features + 1)
repeated_static_feat = static_feat.unsqueeze(1).expand(-1, subsequences_length, -1)
repeated_static_feat = static_feat.unsqueeze(1).expand(
-1, subsequences_length, -1
)
# (batch_size, sub_seq_len, *target_shape, num_lags)
lags_scaled = lags / scale.unsqueeze(-1)
# from (batch_size, sub_seq_len, *target_shape, num_lags)
# to (batch_size, sub_seq_len, prod(target_shape) * num_lags)
input_lags = lags_scaled.reshape((-1, subsequences_length, len(self.lags_seq) * prod(self.target_shape)))
input_lags = lags_scaled.reshape(
(-1, subsequences_length, len(self.lags_seq) * prod(self.target_shape))
)
# (batch_size, sub_seq_len, input_dim)
inputs = torch.cat((input_lags, time_feat, repeated_static_feat), dim=-1)
@@ -207,6 +225,7 @@ class DeepARNetwork(nn.Module):
# static_feat: (batch_size, num_features + prod(target_shape))
return outputs, state, scale, static_feat
class DeepARTrainingNetwork(DeepARNetwork):
def distribution(
self,
@@ -217,7 +236,7 @@ class DeepARTrainingNetwork(DeepARNetwork):
past_observed_values: torch.Tensor,
future_time_feat: torch.Tensor,
future_target: torch.Tensor,
future_observed_values: torch.Tensor
future_observed_values: torch.Tensor,
) -> Distribution:
rnn_outputs, _, scale, _ = self.unroll_encoder(
feat_static_cat=feat_static_cat,
@@ -233,7 +252,8 @@ class DeepARTrainingNetwork(DeepARNetwork):
return self.distr_output.distribution(distr_args, scale=scale)
def forward(self,
def forward(
self,
feat_static_cat: torch.Tensor,
feat_static_real: torch.Tensor,
past_time_feat: torch.Tensor,
@@ -241,7 +261,7 @@ class DeepARTrainingNetwork(DeepARNetwork):
past_observed_values: torch.Tensor,
future_time_feat: torch.Tensor,
future_target: torch.Tensor,
future_observed_values: torch.Tensor
future_observed_values: torch.Tensor,
) -> torch.Tensor:
distr = self.distribution(
feat_static_cat=feat_static_cat,
@@ -256,19 +276,27 @@ class DeepARTrainingNetwork(DeepARNetwork):
# put together target sequence
# (batch_size, seq_len, *target_shape)
target = torch.cat((
past_target[:,self.history_length - self.context_length:,...],
future_target
), dim=1)
target = torch.cat(
(
past_target[:, self.history_length - self.context_length :, ...],
future_target,
),
dim=1,
)
# (batch_size, seq_len)
loss = -distr.log_prob(target)
# (batch_size, seq_len, *target_shape)
observed_values = torch.cat((
past_observed_values[:,self.history_length - self.context_length:,...],
future_observed_values
), dim=1)
observed_values = torch.cat(
(
past_observed_values[
:, self.history_length - self.context_length :, ...
],
future_observed_values,
),
dim=1,
)
# mask the loss at one time step iff one or more observations is missing in the target dimensions
# (batch_size, seq_len)
@@ -282,6 +310,7 @@ class DeepARTrainingNetwork(DeepARNetwork):
return weighted_loss, loss
class DeepARPredictionNetwork(DeepARNetwork):
def __init__(self, num_parallel_samples: int = 100, **kwargs) -> None:
super().__init__(**kwargs)
@@ -331,18 +360,20 @@ class DeepARPredictionNetwork(DeepARNetwork):
repeats=self.num_parallel_samples, dim=0
)
repeated_static_feat = static_feat.repeat_interleave(
repeats=self.num_parallel_samples, dim=0).unsqueeze(1)
repeats=self.num_parallel_samples, dim=0
).unsqueeze(1)
repeated_scale = scale.repeat_interleave(
repeats=self.num_parallel_samples, dim=0
)
if self.cell_type == 'LSTM':
if self.cell_type == "LSTM":
repeated_states = [
s.repeat_interleave(repeats=self.num_parallel_samples, dim=1)
for s in begin_states
]
else:
repeated_states = begin_states.repeat_interleave(
repeats=self.num_parallel_samples, dim=1)
repeats=self.num_parallel_samples, dim=1
)
future_samples = []
@@ -361,14 +392,14 @@ class DeepARPredictionNetwork(DeepARNetwork):
# from (batch_size * num_samples, 1, *target_shape, num_lags)
# to (batch_size * num_samples, 1, prod(target_shape) * num_lags)
input_lags = lags_scaled.reshape((-1, 1, prod(self.target_shape) * len(self.lags_seq)))
input_lags = lags_scaled.reshape(
(-1, 1, prod(self.target_shape) * len(self.lags_seq))
)
# (batch_size * num_samples, 1, prod(target_shape) * num_lags + num_time_features + num_static_features)
decoder_input = torch.cat((
input_lags,
repeated_time_feat[:,k:k+1,:],
repeated_static_feat),
dim=-1
decoder_input = torch.cat(
(input_lags, repeated_time_feat[:, k : k + 1, :], repeated_static_feat),
dim=-1,
)
# output shape: (batch_size * num_samples, 1, num_cells)
@@ -378,9 +409,7 @@ class DeepARPredictionNetwork(DeepARNetwork):
distr_args = self.proj_distr_args(rnn_outputs)
# compute likelihood of target given the predicted parameters
distr = self.distr_output.distribution(
distr_args, scale=repeated_scale
)
distr = self.distr_output.distribution(distr_args, scale=repeated_scale)
# (batch_size * num_samples, 1, *target_shape)
new_samples = distr.sample()
@@ -393,10 +422,12 @@ class DeepARPredictionNetwork(DeepARNetwork):
samples = torch.cat(future_samples, dim=1)
# (batch_size, num_samples, prediction_length, *target_shape)
return samples.reshape((
return samples.reshape(
(
(-1, self.num_parallel_samples)
+ (self.prediction_length,)
+ self.target_shape)
+ self.target_shape
)
)
# noinspection PyMethodOverriding,PyPep8Naming
+8 -2
View File
@@ -81,7 +81,11 @@ class PTSEstimator(Estimator):
@abstractmethod
def create_predictor(
self, transformation: Transformation, trained_network: nn.Module, device: torch.device) -> Predictor:
self,
transformation: Transformation,
trained_network: nn.Module,
device: torch.device,
) -> Predictor:
"""
Create and return a predictor object.
@@ -118,7 +122,9 @@ class PTSEstimator(Estimator):
return TrainOutput(
transformation=transformation,
trained_net=trained_net,
predictor=self.create_predictor(transformation, trained_net, self.trainer.device),
predictor=self.create_predictor(
transformation, trained_net, self.trainer.device
),
)
def train(self, training_data: Dataset) -> Predictor:
+50 -44
View File
@@ -103,13 +103,12 @@ class Forecast(ABC):
assert 0.0 <= c <= 100.0
ps = [50.0] + [
50.0 + f * c / 2.0 for c in prediction_intervals
for f in [-1.0, +1.0]
50.0 + f * c / 2.0 for c in prediction_intervals for f in [-1.0, +1.0]
]
percentiles_sorted = sorted(set(ps))
def alpha_for_percentile(p):
return (p / 100.0)**0.3
return (p / 100.0) ** 0.3
ps_data = [self.quantile(p / 100.0) for p in percentiles_sorted]
i_p50 = len(percentiles_sorted) // 2
@@ -121,11 +120,7 @@ class Forecast(ABC):
if show_mean:
mean_data = np.mean(self._sorted_samples, axis=0)
pd.Series(data=mean_data, index=self.index).plot(
color=color,
ls=":",
label=f"{label_prefix}mean",
*args,
**kwargs,
color=color, ls=":", label=f"{label_prefix}mean", *args, **kwargs,
)
for i in range(len(percentiles_sorted) // 2):
@@ -157,9 +152,9 @@ class Forecast(ABC):
@property
def index(self) -> pd.DatetimeIndex:
if self._index is None:
self._index = pd.date_range(self.start_date,
periods=self.prediction_length,
freq=self.freq)
self._index = pd.date_range(
self.start_date, periods=self.prediction_length, freq=self.freq
)
return self._index
def as_json_dict(self, config: "Config") -> dict:
@@ -199,6 +194,7 @@ class SampleForecast(Forecast):
additional information that the forecaster may provide e.g. estimated
parameters, number of iterations ran etc.
"""
def __init__(
self,
samples: Union[torch.Tensor, np.ndarray],
@@ -208,14 +204,16 @@ class SampleForecast(Forecast):
info: Optional[Dict] = None,
):
assert isinstance(
samples,
(np.ndarray, torch.Tensor
)), "samples should be either a numpy array or an torch tensor"
samples, (np.ndarray, torch.Tensor)
), "samples should be either a numpy array or an torch tensor"
assert (
len(np.shape(samples)) == 2 or len(np.shape(samples)) == 3
), "samples should be a 2-dimensional or 3-dimensional array. Dimensions found: {}".format(
len(np.shape(samples)))
self.samples = samples if (isinstance(samples, np.ndarray)) else samples.cpu().numpy()
len(np.shape(samples))
)
self.samples = (
samples if (isinstance(samples, np.ndarray)) else samples.cpu().numpy()
)
self._sorted_samples_value = None
self._mean = None
self._dim = None
@@ -223,8 +221,8 @@ class SampleForecast(Forecast):
self.info = info
assert isinstance(
start_date,
pd.Timestamp), "start_date should be a pandas Timestamp object"
start_date, pd.Timestamp
), "start_date should be a pandas Timestamp object"
self.start_date = start_date
assert isinstance(freq, str), "freq should be a string"
@@ -287,7 +285,8 @@ class SampleForecast(Forecast):
target_dim = self.samples.shape[2]
assert dim < target_dim, (
f"must set 0 <= dim < target_dim, but got dim={dim},"
f" target_dim={target_dim}")
f" target_dim={target_dim}"
)
samples = self.samples[:, :, dim]
return SampleForecast(
@@ -347,13 +346,15 @@ class SampleForecast(Forecast):
return result
def __repr__(self):
return ", ".join([
f"SampleForecast({self.samples!r})",
f"{self.start_date!r}",
f"{self.freq!r}",
f"item_id={self.item_id!r}",
f"info={self.info!r})",
])
return ", ".join(
[
f"SampleForecast({self.samples!r})",
f"{self.start_date!r}",
f"{self.freq!r}",
f"item_id={self.item_id!r}",
f"info={self.info!r})",
]
)
class QuantileForecast(Forecast):
@@ -376,6 +377,7 @@ class QuantileForecast(Forecast):
additional information that the forecaster may provide e.g. estimated
parameters, number of iterations ran etc.
"""
def __init__(
self,
forecast_arrays: np.ndarray,
@@ -401,11 +403,11 @@ class QuantileForecast(Forecast):
shape = self.forecast_array.shape
assert shape[0] == len(self.forecast_keys), (
f"The forecast_array (shape={shape} should have the same "
f"length as the forecast_keys (len={len(self.forecast_keys)}).")
f"length as the forecast_keys (len={len(self.forecast_keys)})."
)
self.prediction_length = shape[-1]
self._forecast_dict = {
k: self.forecast_array[i]
for i, k in enumerate(self.forecast_keys)
k: self.forecast_array[i] for i, k in enumerate(self.forecast_keys)
}
self._nan_out = np.array([np.nan] * self.prediction_length)
@@ -429,22 +431,26 @@ class QuantileForecast(Forecast):
if self._dim is not None:
return self._dim
else:
if (len(self.forecast_array.shape) == 2
): # 1D target. shape: (num_samples, prediction_length)
if (
len(self.forecast_array.shape) == 2
): # 1D target. shape: (num_samples, prediction_length)
return 1
else:
return self.forecast_array.shape[
1] # 2D target. shape: (num_samples, target_dim, prediction_length)
1
] # 2D target. shape: (num_samples, target_dim, prediction_length)
def __repr__(self):
return ", ".join([
f"QuantileForecast({self.forecast_array!r})",
f"start_date={self.start_date!r}",
f"freq={self.freq!r}",
f"forecast_keys={self.forecast_keys!r}",
f"item_id={self.item_id!r}",
f"info={self.info!r})",
])
return ", ".join(
[
f"QuantileForecast({self.forecast_array!r})",
f"start_date={self.start_date!r}",
f"freq={self.freq!r}",
f"forecast_keys={self.forecast_keys!r}",
f"item_id={self.item_id!r}",
f"info={self.info!r})",
]
)
class DistributionForecast(Forecast):
@@ -472,6 +478,7 @@ class DistributionForecast(Forecast):
additional information that the forecaster may provide e.g. estimated
parameters, number of iterations ran etc.
"""
def __init__(
self,
distribution: Distribution,
@@ -481,15 +488,14 @@ class DistributionForecast(Forecast):
info: Optional[Dict] = None,
):
self.distribution = distribution
self.shape = (self.distribution.batch_shape +
self.distribution.event_shape)
self.shape = self.distribution.batch_shape + self.distribution.event_shape
self.prediction_length = self.shape[0]
self.item_id = item_id
self.info = info
assert isinstance(
start_date,
pd.Timestamp), "start_date should be a pandas Timestamp object"
start_date, pd.Timestamp
), "start_date should be a pandas Timestamp object"
self.start_date = start_date
assert isinstance(freq, str), "freq should be a string"
+49 -24
View File
@@ -44,11 +44,18 @@ class ForecastGenerator(ABC):
"""
Classes used to bring the output of a network into a class.
"""
@abstractmethod
def __call__(self, inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module, input_names: List[str], freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int], **kwargs) -> Iterator[Forecast]:
def __call__(
self,
inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module,
input_names: List[str],
freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int],
**kwargs
) -> Iterator[Forecast]:
pass
@@ -56,11 +63,16 @@ class DistributionForecastGenerator(ForecastGenerator):
def __init__(self, distr_output: DistributionOutput) -> None:
self.distr_output = distr_output
def __call__(self, inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module, input_names: List[str], freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int],
**kwargs) -> Iterator[DistributionForecast]:
def __call__(
self,
inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module,
input_names: List[str],
freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int],
**kwargs
) -> Iterator[DistributionForecast]:
for batch in inference_data_loader:
inputs = [batch[k] for k in input_names]
outputs = prediction_net(*inputs)
@@ -68,8 +80,7 @@ class DistributionForecastGenerator(ForecastGenerator):
outputs = output_transform(batch, outputs)
distributions = [
self.distr_output.distribution(*u)
for u in _extract_instances(outputs)
self.distr_output.distribution(*u) for u in _extract_instances(outputs)
]
i = -1
@@ -79,7 +90,8 @@ class DistributionForecastGenerator(ForecastGenerator):
start_date=batch["forecast_start"][i],
freq=freq,
item_id=batch[FieldName.ITEM_ID][i]
if FieldName.ITEM_ID in batch else None,
if FieldName.ITEM_ID in batch
else None,
info=batch["info"][i] if "info" in batch else None,
)
assert i + 1 == len(batch["forecast_start"])
@@ -89,10 +101,16 @@ class QuantileForecastGenerator(ForecastGenerator):
def __init__(self, quantiles: List[str]) -> None:
self.quantiles = quantiles
def __call__(self, inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module, input_names: List[str], freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int], **kwargs) -> Iterator[Forecast]:
def __call__(
self,
inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module,
input_names: List[str],
freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int],
**kwargs
) -> Iterator[Forecast]:
for batch in inference_data_loader:
inputs = [batch[k] for k in input_names]
outputs = prediction_net(*inputs).cpu().numpy()
@@ -106,7 +124,8 @@ class QuantileForecastGenerator(ForecastGenerator):
start_date=batch["forecast_start"][i],
freq=freq,
item_id=batch[FieldName.ITEM_ID][i]
if FieldName.ITEM_ID in batch else None,
if FieldName.ITEM_ID in batch
else None,
info=batch["info"][i] if "info" in batch else None,
forecast_keys=self.quantiles,
)
@@ -114,10 +133,16 @@ class QuantileForecastGenerator(ForecastGenerator):
class SampleForecastGenerator(ForecastGenerator):
def __call__(self, inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module, input_names: List[str], freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int], **kwargs) -> Iterator[Forecast]:
def __call__(
self,
inference_data_loader: InferenceDataLoader,
prediction_net: nn.Module,
input_names: List[str],
freq: str,
output_transform: Optional[OutputTransform],
num_samples: Optional[int],
**kwargs
) -> Iterator[Forecast]:
for batch in inference_data_loader:
inputs = [batch[k] for k in input_names]
outputs = prediction_net(*inputs).cpu().numpy()
@@ -133,8 +158,7 @@ class SampleForecastGenerator(ForecastGenerator):
collected_samples.append(outputs)
num_collected_samples += outputs[0].shape[0]
outputs = [
np.concatenate(s)[:num_samples]
for s in zip(*collected_samples)
np.concatenate(s)[:num_samples] for s in zip(*collected_samples)
]
assert len(outputs[0]) == num_samples
i = -1
@@ -144,7 +168,8 @@ class SampleForecastGenerator(ForecastGenerator):
start_date=batch["forecast_start"][i],
freq=freq,
item_id=batch[FieldName.ITEM_ID][i]
if FieldName.ITEM_ID in batch else None,
if FieldName.ITEM_ID in batch
else None,
info=batch["info"][i] if "info" in batch else None,
)
assert i + 1 == len(batch["forecast_start"])
+3 -3
View File
@@ -49,9 +49,9 @@ class PTSPredictor(Predictor):
self.device = device
self.dtype = dtype
def predict(self,
dataset: Dataset,
num_samples: Optional[int] = None) -> Iterator[Forecast]:
def predict(
self, dataset: Dataset, num_samples: Optional[int] = None
) -> Iterator[Forecast]:
inference_data_loader = InferenceDataLoader(
dataset,
self.input_transform,
+9 -2
View File
@@ -1,4 +1,11 @@
from .distribution_output import ArgProj, Output, DistributionOutput, StudentTOutput, BetaOutput, NegativeBinomialOutput
from .distribution_output import (
ArgProj,
Output,
DistributionOutput,
StudentTOutput,
BetaOutput,
NegativeBinomialOutput,
)
from .lambda_layer import LambdaLayer
from .feature import FeatureEmbedder, FeatureAssembler
from .scaler import MeanScaler, NOPScaler
from .scaler import MeanScaler, NOPScaler
+2 -2
View File
@@ -91,7 +91,7 @@ class BetaOutput(DistributionOutput):
concentration1 = F.softplus(concentration1) + 1e-8
concentration0 = F.softplus(concentration0) + 1e-8
return concentration1.squeeze(-1), concentration0.squeeze(-1)
@property
def event_shape(self) -> Tuple:
return ()
@@ -120,7 +120,7 @@ class NegativeBinomialOutput(DistributionOutput):
p = mu * alpha / (1.0 + mu * alpha)
return NegativeBinomial(total_count=n, probs=p)
@property
def event_shape(self) -> Tuple:
return ()
+33 -34
View File
@@ -1,15 +1,11 @@
from typing import Callable, List, Optional
from typing import Callable, List, Optional
import torch
import torch.nn as nn
class FeatureEmbedder(nn.Module):
def __init__(
self,
cardinalities: List[int],
embedding_dims: List[int],
) -> None:
def __init__(self, cardinalities: List[int], embedding_dims: List[int],) -> None:
super().__init__()
self.__num_features = len(cardinalities)
@@ -18,46 +14,49 @@ class FeatureEmbedder(nn.Module):
embedding = nn.Embedding(c, d)
return embedding
self.__embedders = nn.ModuleList([
create_embedding(c, d)
for c, d in zip(cardinalities, embedding_dims)
])
self.__embedders = nn.ModuleList(
[create_embedding(c, d) for c, d in zip(cardinalities, embedding_dims)]
)
def forward(self, features: torch.Tensor) -> torch.Tensor:
if self.__num_features > 1:
# we slice the last dimension, giving an array of length
# self.__num_features with shape (N,T) or (N)
cat_feature_slices = torch.chunk(features,
self.__num_features,
dim=-1)
cat_feature_slices = torch.chunk(features, self.__num_features, dim=-1)
else:
cat_feature_slices = [features]
return torch.cat([
embed(cat_feature_slice.squeeze(-1)) for embed, cat_feature_slice
in zip(self.__embedders, cat_feature_slices)
], dim=-1)
return torch.cat(
[
embed(cat_feature_slice.squeeze(-1))
for embed, cat_feature_slice in zip(
self.__embedders, cat_feature_slices
)
],
dim=-1,
)
class FeatureAssembler(nn.Module):
def __init__(self,
T: int,
embed_static: Optional[FeatureEmbedder] = None,
embed_dynamic: Optional[FeatureEmbedder] = None) -> None:
def __init__(
self,
T: int,
embed_static: Optional[FeatureEmbedder] = None,
embed_dynamic: Optional[FeatureEmbedder] = None,
) -> None:
super().__init__()
self.T = T
self.embeddings = nn.ModuleDict({
'embed_static': embed_static,
'embed_dynamic': embed_dynamic
})
self.embeddings = nn.ModuleDict(
{"embed_static": embed_static, "embed_dynamic": embed_dynamic}
)
def forward(
self,
feat_static_cat: torch.Tensor,
feat_static_real: torch.Tensor,
feat_dynamic_cat: torch.Tensor,
feat_dynamic_real: torch.Tensor,
self,
feat_static_cat: torch.Tensor,
feat_static_real: torch.Tensor,
feat_dynamic_cat: torch.Tensor,
feat_dynamic_real: torch.Tensor,
) -> torch.Tensor:
processed_features = [
self.process_static_cat(feat_static_cat),
@@ -69,15 +68,15 @@ class FeatureAssembler(nn.Module):
return torch.cat(processed_features, dim=-1)
def process_static_cat(self, feature: torch.Tensor) -> torch.Tensor:
if self.embeddings['embed_static'] is not None:
feature = self.embeddings['embed_static'](feature)
if self.embeddings["embed_static"] is not None:
feature = self.embeddings["embed_static"](feature)
return feature.unsqueeze(1).expand(-1, self.T, -1).float()
def process_dynamic_cat(self, feature: torch.Tensor) -> torch.Tensor:
if self.embeddings['embed_dynamic'] is None:
if self.embeddings["embed_dynamic"] is None:
return feature.float()
else:
return self.embeddings['embed_dynamic'](feature)
return self.embeddings["embed_dynamic"](feature)
def process_static_real(self, feature: torch.Tensor) -> torch.Tensor:
return feature.unsqueeze(1).expand(-1, self.T, -1)
+1 -2
View File
@@ -64,7 +64,7 @@ class MeanScaler(Scaler):
def __init__(self, minimum_scale: float = 1e-10, *args, **kwargs):
super().__init__(*args, **kwargs)
self.register_buffer('minimum_scale', torch.tensor(minimum_scale))
self.register_buffer("minimum_scale", torch.tensor(minimum_scale))
def compute_scale(
self, data: torch.Tensor, observed_indicator: torch.Tensor
@@ -106,4 +106,3 @@ class NOPScaler(Scaler):
self, data: torch.Tensor, observed_indicator: torch.Tensor
) -> torch.Tensor:
return torch.ones_like(data).mean(dim=1)
+3 -3
View File
@@ -46,12 +46,12 @@ class Trainer:
loss = output[0]
else:
loss = output
avg_epoch_loss += loss.item()
it.set_postfix(
ordered_dict={
"avg_epoch_loss": avg_epoch_loss/batch_no,
'epoch': epoch_no,
"avg_epoch_loss": avg_epoch_loss / batch_no,
"epoch": epoch_no,
},
refresh=False,
)