mirror of
https://github.com/wassname/pytorch-ts.git
synced 2026-06-27 17:49:41 +08:00
268 lines
8.6 KiB
Python
268 lines
8.6 KiB
Python
from typing import List
|
|
|
|
import numpy as np
|
|
import torch
|
|
import torch.nn as nn
|
|
from torch.distributions import Normal, Uniform, Bernoulli
|
|
from torch.nn.utils import clip_grad_norm_
|
|
from torch.optim import SGD
|
|
from torch.utils.data import TensorDataset, DataLoader
|
|
|
|
from gluonts.dataset.repository.datasets import get_dataset
|
|
from gluonts.evaluation import Evaluator
|
|
from gluonts.evaluation.backtest import make_evaluation_predictions
|
|
from gluonts.torch.modules.distribution_output import DistributionOutput
|
|
from pts import Trainer
|
|
from pts.model.deepar import DeepAREstimator
|
|
from pts.modules import ImplicitQuantileOutput
|
|
|
|
NUM_SAMPLES = 2000
|
|
BATCH_SIZE = 32
|
|
TOL = 0.3
|
|
START_TOL_MULTIPLE = 1
|
|
|
|
|
|
def inv_softplus(y: np.ndarray) -> np.ndarray:
|
|
return np.log(np.exp(y) - 1)
|
|
|
|
|
|
def learn_distribution(
|
|
distr_output: DistributionOutput,
|
|
samples: torch.Tensor,
|
|
init_biases: List[np.ndarray] = None,
|
|
num_epochs: int = 5,
|
|
learning_rate: float = 1e-2,
|
|
):
|
|
arg_proj = distr_output.get_args_proj(in_features=1)
|
|
|
|
if init_biases is not None:
|
|
for param, bias in zip(arg_proj.proj, init_biases):
|
|
nn.init.constant_(param.bias, bias)
|
|
|
|
dummy_data = torch.ones((len(samples), 1, 1))
|
|
|
|
dataset = TensorDataset(dummy_data, samples)
|
|
train_data = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
|
|
|
|
optimizer = SGD(arg_proj.parameters(), lr=learning_rate)
|
|
|
|
for e in range(num_epochs):
|
|
cumulative_loss = 0
|
|
num_batches = 0
|
|
|
|
for i, (data, sample_label) in enumerate(train_data):
|
|
optimizer.zero_grad()
|
|
distr_args = arg_proj(data)
|
|
distr = distr_output.distribution(distr_args)
|
|
loss = -distr.log_prob(sample_label).mean()
|
|
loss.backward()
|
|
#clip_grad_norm_(arg_proj.parameters(), 10.0)
|
|
optimizer.step()
|
|
|
|
num_batches += 1
|
|
cumulative_loss += loss.item()
|
|
print("Epoch %s, loss: %s" % (e, cumulative_loss / num_batches))
|
|
|
|
sampling_dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
|
|
i, (data, sample_label) = next(enumerate(sampling_dataloader))
|
|
distr_args = arg_proj(data)
|
|
distr = distr_output.distribution(distr_args)
|
|
samples = distr.sample((NUM_SAMPLES,))
|
|
|
|
with torch.no_grad():
|
|
percentile_90 = distr.quantile_function(
|
|
torch.ones((1, 1, 1)), torch.ones((1, 1)) * 0.9
|
|
)
|
|
percentile_10 = distr.quantile_function(
|
|
torch.ones((1, 1, 1)), torch.ones((1, 1)) * 0.1
|
|
)
|
|
|
|
return samples.mean(), samples.std(), percentile_10.squeeze(), percentile_90.squeeze()
|
|
|
|
|
|
def test_independent_implicit_quantile() -> None:
|
|
num_samples = NUM_SAMPLES
|
|
|
|
# # Normal distrib
|
|
distr_mean = torch.Tensor([10.0])
|
|
distr_std = torch.Tensor([4.0])
|
|
distr_pp10 = distr_mean - 1.282 * distr_std
|
|
distr_pp90 = distr_mean + 1.282 * distr_std
|
|
distr = Normal(loc=distr_mean, scale=distr_std)
|
|
|
|
samples = distr.sample((num_samples,))
|
|
learned_mean, learned_std, learned_pp10, learned_pp90 = learn_distribution(
|
|
ImplicitQuantileOutput(output_domain="Real"),
|
|
samples=samples,
|
|
num_epochs=50,
|
|
learning_rate=1e-2,
|
|
)
|
|
|
|
torch.testing.assert_allclose(
|
|
learned_mean, distr_mean.squeeze(), rtol=0.1, atol=0.1 * 10
|
|
)
|
|
torch.testing.assert_allclose(
|
|
learned_std, distr_std.squeeze(), rtol=0.1, atol=0.1 * 4
|
|
)
|
|
torch.testing.assert_allclose(
|
|
learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=0.1 * 4
|
|
)
|
|
torch.testing.assert_allclose(
|
|
learned_pp10, distr_pp10.squeeze(), rtol=0.1, atol=0.1 * 4
|
|
)
|
|
|
|
# Uniform distrib
|
|
a = torch.Tensor([0.0])
|
|
b = torch.Tensor([20.0])
|
|
distr_mean = 0.5 * (a + b)
|
|
distr_std = (1.0 / 12.0 * (b - a) ** 2) ** 0.5
|
|
distr_pp10 = 0.1 * (a + b)
|
|
distr_pp90 = 0.9 * (a + b)
|
|
distr = Uniform(low=a, high=b)
|
|
|
|
samples = distr.sample((num_samples,))
|
|
learned_mean, learned_std, learned_pp10, learned_pp90 = learn_distribution(
|
|
ImplicitQuantileOutput(output_domain="Positive"),
|
|
samples=samples,
|
|
num_epochs=50,
|
|
learning_rate=1e-2,
|
|
)
|
|
|
|
torch.testing.assert_allclose(
|
|
learned_mean, distr_mean.squeeze(), atol=1.0, rtol=0.1
|
|
)
|
|
torch.testing.assert_allclose(learned_std, distr_std.squeeze(), atol=0.5, rtol=0.1)
|
|
torch.testing.assert_allclose(
|
|
learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=0.1 * 18
|
|
)
|
|
torch.testing.assert_allclose(
|
|
learned_pp10, distr_pp10.squeeze(), rtol=0.2, atol=0.2 * 2
|
|
)
|
|
|
|
# Bernoulli distrib
|
|
distr_mean = torch.Tensor([0.2])
|
|
distr_std = distr_mean * (1 - distr_mean)
|
|
distr_pp10 = torch.Tensor([0.0])
|
|
distr_pp90 = torch.Tensor([1.0])
|
|
distr = Bernoulli(probs=distr_mean)
|
|
|
|
samples = distr.sample((num_samples,))
|
|
learned_mean, learned_std, learned_pp10, learned_pp90 = learn_distribution(
|
|
ImplicitQuantileOutput(output_domain="Positive"),
|
|
samples=samples,
|
|
num_epochs=50,
|
|
learning_rate=1e-2,
|
|
)
|
|
|
|
torch.testing.assert_allclose(
|
|
learned_mean, distr_mean.squeeze(), atol=1.0, rtol=0.1
|
|
)
|
|
torch.testing.assert_allclose(learned_std, distr_std.squeeze(), atol=0.5, rtol=0.1)
|
|
torch.testing.assert_allclose(
|
|
learned_pp90, distr_pp90.squeeze(), rtol=0.1, atol=0.1 * 18
|
|
)
|
|
torch.testing.assert_allclose(
|
|
learned_pp10, distr_pp10.squeeze(), rtol=0.1, atol=0.1 * 2
|
|
)
|
|
|
|
|
|
def test_training_with_implicit_quantile_output():
|
|
dataset = get_dataset("constant")
|
|
metadata = dataset.metadata
|
|
|
|
deepar_estimator = DeepAREstimator(
|
|
distr_output=ImplicitQuantileOutput(output_domain="Real"),
|
|
freq=metadata.freq,
|
|
prediction_length=metadata.prediction_length,
|
|
trainer=Trainer(
|
|
device="cpu",
|
|
epochs=5,
|
|
learning_rate=1e-3,
|
|
num_batches_per_epoch=3,
|
|
batch_size=256,
|
|
),
|
|
input_size=15,
|
|
)
|
|
deepar_predictor = deepar_estimator.train(dataset.train, num_workers=1)
|
|
forecast_it, ts_it = make_evaluation_predictions(
|
|
dataset=dataset.test, # test dataset
|
|
predictor=deepar_predictor, # predictor
|
|
num_samples=100, # number of sample paths we want for evaluation
|
|
)
|
|
forecasts = list(forecast_it)
|
|
tss = list(ts_it)
|
|
evaluator = Evaluator(num_workers=0)
|
|
agg_metrics, item_metrics = evaluator(
|
|
iter(tss), iter(forecasts), num_series=len(dataset.test)
|
|
)
|
|
|
|
assert agg_metrics["MSE"] > 0
|
|
|
|
|
|
def test_instanciation_of_args_proj():
|
|
class MockedImplicitQuantileOutput(ImplicitQuantileOutput):
|
|
method_calls = 0
|
|
|
|
@classmethod
|
|
def set_args_proj(cls):
|
|
super().set_args_proj()
|
|
cls.method_calls += 1
|
|
|
|
dataset = get_dataset("constant")
|
|
metadata = dataset.metadata
|
|
|
|
distr_output = MockedImplicitQuantileOutput(output_domain="Real")
|
|
|
|
deepar_estimator = DeepAREstimator(
|
|
distr_output=distr_output,
|
|
freq=metadata.freq,
|
|
prediction_length=metadata.prediction_length,
|
|
trainer=Trainer(
|
|
device="cpu",
|
|
epochs=3,
|
|
learning_rate=1e-3,
|
|
num_batches_per_epoch=1,
|
|
batch_size=256,
|
|
),
|
|
input_size=15,
|
|
)
|
|
assert distr_output.method_calls == 1
|
|
deepar_predictor = deepar_estimator.train(dataset.train, num_workers=1)
|
|
|
|
# Method should be called when the MockedImplicitQuantileOutput is instanciated,
|
|
# and one more time because in_features is different from 1
|
|
assert distr_output.method_calls == 2
|
|
|
|
forecast_it, ts_it = make_evaluation_predictions(
|
|
dataset=dataset.test, # test dataset
|
|
predictor=deepar_predictor, # predictor
|
|
num_samples=100, # number of sample paths we want for evaluation
|
|
)
|
|
forecasts = list(forecast_it)
|
|
tss = list(ts_it)
|
|
evaluator = Evaluator(num_workers=0)
|
|
agg_metrics, item_metrics = evaluator(
|
|
iter(tss), iter(forecasts), num_series=len(dataset.test)
|
|
)
|
|
assert distr_output.method_calls == 2
|
|
|
|
# Test that the implicit output module is proper reset
|
|
new_estimator = DeepAREstimator(
|
|
distr_output=MockedImplicitQuantileOutput(output_domain="Real"),
|
|
freq=metadata.freq,
|
|
prediction_length=metadata.prediction_length,
|
|
trainer=Trainer(
|
|
device="cpu",
|
|
epochs=3,
|
|
learning_rate=1e-3,
|
|
num_batches_per_epoch=1,
|
|
batch_size=256,
|
|
),
|
|
input_size=15,
|
|
)
|
|
assert distr_output.method_calls == 3
|
|
new_estimator.train(dataset.train, num_workers=1)
|
|
assert (
|
|
distr_output.method_calls == 3
|
|
) # Since in_feature is the same as before, there should be no additional call
|