mirror of
https://github.com/wassname/seq2seq-time.git
synced 2026-06-27 17:31:58 +08:00
148 lines
4.9 KiB
Python
148 lines
4.9 KiB
Python
# from https://mohcinemadkour.github.io/posts/2019/10/Machine%20Learning,%20timeseriesAI,%20Time%20Series%20Classification,%20fastai_timeseries,%20TSC%20bechmark/
|
|
|
|
|
|
# This is an unofficial PyTorch implementation by Ignacio Oguiza - oguiza@gmail.com based on:
|
|
|
|
# Fawaz, H. I., Lucas, B., Forestier, G., Pelletier, C., Schmidt, D. F., Weber, J., ... & Petitjean, F. (2019). InceptionTime: Finding AlexNet for Time Series Classification. arXiv preprint arXiv:1909.04939.
|
|
# Official InceptionTime tensorflow implementation: https://github.com/hfawaz/InceptionTime
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
from torch.nn import functional as F
|
|
|
|
|
|
def noop(x):
|
|
return x
|
|
|
|
|
|
def shortcut(c_in, c_out):
|
|
return nn.Sequential(
|
|
*[nn.Conv1d(c_in, c_out, kernel_size=1), nn.BatchNorm1d(c_out)]
|
|
)
|
|
|
|
|
|
class InceptionLayer(nn.Module):
|
|
def __init__(self, c_in, bottleneck=32, kernel_size=40, nb_filters=32):
|
|
|
|
super().__init__()
|
|
self.bottleneck = (
|
|
nn.Conv1d(c_in, bottleneck, 1) if bottleneck and c_in > 1 else noop
|
|
)
|
|
mts_feat = bottleneck or c_in
|
|
conv_layers = []
|
|
kss = [kernel_size // (2 ** i) for i in range(3)]
|
|
# ensure odd kss until nn.Conv1d with padding='same' is available in pytorch 1.3
|
|
kss = [ksi if ksi % 2 != 0 else ksi - 1 for ksi in kss]
|
|
for i in range(len(kss)):
|
|
conv_layers.append(
|
|
nn.Conv1d(mts_feat, nb_filters, kernel_size=kss[i], padding=kss[i] // 2)
|
|
)
|
|
self.conv_layers = nn.ModuleList(conv_layers)
|
|
self.maxpool = nn.MaxPool1d(3, stride=1, padding=1)
|
|
self.conv = nn.Conv1d(c_in, nb_filters, kernel_size=1)
|
|
self.bn = nn.BatchNorm1d(nb_filters * 4)
|
|
self.act = nn.ReLU()
|
|
|
|
def forward(self, x):
|
|
input_tensor = x
|
|
x = self.bottleneck(input_tensor)
|
|
for i in range(3):
|
|
out_ = self.conv_layers[i](x)
|
|
if i == 0:
|
|
out = out_
|
|
else:
|
|
out = torch.cat((out, out_), 1)
|
|
mp = self.conv(self.maxpool(input_tensor))
|
|
inc_out = torch.cat((out, mp), 1)
|
|
return self.act(self.bn(inc_out))
|
|
|
|
|
|
class InceptionBlock(nn.Module):
|
|
def __init__(
|
|
self, c_in, bottleneck=32, kernel_size=40, nb_filters=32, residual=True, num_layers=6
|
|
):
|
|
|
|
super().__init__()
|
|
|
|
self.residual = residual
|
|
self.num_layers = num_layers
|
|
|
|
# inception & residual layers
|
|
inc_mods = []
|
|
res_layers = []
|
|
res = 0
|
|
for d in range(num_layers):
|
|
inc_mods.append(
|
|
InceptionLayer(
|
|
c_in if d == 0 else nb_filters * 4,
|
|
bottleneck=bottleneck if d > 0 else 0,
|
|
kernel_size=kernel_size,
|
|
nb_filters=nb_filters,
|
|
)
|
|
)
|
|
if self.residual and d % 3 == 2:
|
|
res_layers.append(
|
|
shortcut(c_in if res == 0 else nb_filters * 4, nb_filters * 4)
|
|
)
|
|
res += 1
|
|
else:
|
|
res_layer = res_layers.append(None)
|
|
self.inc_mods = nn.ModuleList(inc_mods)
|
|
self.res_layers = nn.ModuleList(res_layers)
|
|
self.act = nn.ReLU()
|
|
|
|
def forward(self, x):
|
|
res = x
|
|
for d, l in enumerate(range(self.num_layers)):
|
|
x = self.inc_mods[d](x)
|
|
if self.residual and d % 3 == 2:
|
|
res = self.res_layers[d](res)
|
|
x += res
|
|
res = x
|
|
x = self.act(x)
|
|
return x
|
|
|
|
|
|
|
|
class InceptionTimeSeq(nn.Module):
|
|
def __init__(
|
|
self,
|
|
x_dim,
|
|
y_dim,
|
|
hidden_size=32,
|
|
layers=6,
|
|
kernel_size=40,
|
|
bottleneck=16,
|
|
residual=True
|
|
):
|
|
super().__init__()
|
|
self.inc_block = InceptionBlock(
|
|
x_dim + y_dim,
|
|
bottleneck=bottleneck,
|
|
kernel_size=kernel_size,
|
|
nb_filters=hidden_size,
|
|
residual=residual,
|
|
num_layers=layers,
|
|
)
|
|
self._min_std = 0.01
|
|
self.mean = nn.Linear(hidden_size*4, y_dim)
|
|
self.std = nn.Linear(hidden_size*4, y_dim)
|
|
|
|
def forward(self, past_x, past_y, future_x, future_y=None):
|
|
device = next(self.parameters()).device
|
|
B, S, _ = future_x.shape
|
|
future_y_fake = past_y[:, -1:, :].repeat(1, S, 1).to(device)
|
|
context = torch.cat([past_x, past_y], -1)
|
|
target = torch.cat([future_x, future_y_fake], -1)
|
|
x = torch.cat([context, target * 1], 1).detach()
|
|
|
|
out = self.inc_block(x.permute(0, 2, 1)).permute(0, 2, 1)
|
|
|
|
# Seems to help a little, especially with extrapolating out of bounds
|
|
steps = past_y.shape[1]
|
|
mean = self.mean(out)[:, steps:, :]
|
|
log_sigma = self.std(out)[:, steps:, :]
|
|
|
|
sigma = self._min_std + (1 - self._min_std) * F.softplus(log_sigma)
|
|
return torch.distributions.Normal(mean, sigma), {}
|