是的,Transformer 模型对于时间序列预测是有效的 (+ Autoformer)
引言
几个月前,我们介绍了 Informer 模型(Zhou, Haoyi, et al., 2021),它是一个时间序列 Transformer 模型,并获得了 AAAI 2021 最佳论文奖。我们还提供了使用 Informer 进行多元概率预测的示例。在这篇文章中,我们将讨论一个问题:Transformer 模型对时间序列预测是否有效?(AAAI 2023)。正如我们将看到的,它们是有效的。
首先,我们将提供经验证据表明 **Transformer 模型确实对时间序列预测有效**。我们的比较表明,简单的线性模型 DLinear 并不像所声称的那样优于 Transformer 模型。当与相同设置下相同大小的线性模型进行比较时,基于 Transformer 的模型在我们考虑的测试集指标上表现更好。之后,我们将介绍 Autoformer 模型(Wu, Haixu, et al., 2021),该模型于 Informer 模型之后,在 NeurIPS 2021 上发表。Autoformer 模型现已在 🤗 Transformers 中可用。最后,我们将讨论 DLinear 模型,它是一个简单的前馈网络,使用了 Autoformer 的分解层。DLinear 模型首次在《Transformer 模型对时间序列预测是否有效?》中介绍,并声称在时间序列预测中优于基于 Transformer 的模型。
冲呀!
基准测试 - Transformer 模型 vs. DLinear
在最近发表在 AAAI 2023 的论文 《Transformer 模型对时间序列预测是否有效?》中,作者声称 Transformer 模型对时间序列预测无效。他们将基于 Transformer 的模型与他们称为 DLinear 的简单线性模型进行比较。DLinear 模型使用了 Autoformer 模型中的分解层,我们将在本文后面介绍。作者声称 DLinear 模型在时间序列预测中优于基于 Transformer 的模型。是这样吗?让我们来一探究竟。
数据集 | Autoformer (univariate) MASE | DLinear MASE |
---|---|---|
交通 |
0.910 | 0.965 |
汇率 |
1.087 | 1.690 |
电力 |
0.751 | 0.831 |
上表显示了 Autoformer 和 DLinear 模型在论文中使用的三个数据集上的比较结果。
结果表明,Autoformer 模型在所有三个数据集上都优于 DLinear 模型。
接下来,我们将介绍新的 Autoformer 模型以及 DLinear 模型。我们将展示如何将它们与上表中的交通数据集进行比较,并解释我们获得的结果。
总结: 尽管简单的线性模型在某些情况下具有优势,但与 Transformer 等更复杂的模型相比,在单变量设置下,它无法融合协变量。
Autoformer - 内部解析
Autoformer 建立在将时间序列分解为季节性和趋势-周期分量的传统方法之上。这通过引入一个*分解层*来实现,该层增强了模型准确捕获这些分量的能力。此外,Autoformer 引入了一种创新的自相关机制,取代了香草 Transformer 中使用的标准自注意力。这种机制使模型能够利用基于周期的依赖性进行注意力,从而提高整体性能。
在接下来的部分中,我们将深入探讨 Autoformer 的两个关键贡献:*分解层*和*注意力(自相关)机制*。我们还将提供代码示例,以说明这些组件在 Autoformer 架构中的功能。
分解层
分解长期以来一直是时间序列分析中流行的方法,但在 Autoformer 论文问世之前,它尚未广泛地融入深度学习模型。在对概念进行简要解释之后,我们将使用 PyTorch 代码演示如何在 Autoformer 中应用这一思想。
时间序列分解
在时间序列分析中,分解是一种将时间序列分解为三个系统性成分的方法:趋势-周期、季节性变化和随机波动。趋势成分表示时间序列的长期方向,它可以随时间增加、减少或保持稳定。季节性成分表示时间序列中出现的周期性模式,例如年度或季度周期。最后,随机(有时称为“不规则”)成分表示数据中无法通过趋势或季节性成分解释的随机噪声。
分解主要有两种类型:加法分解和乘法分解,它们在优秀的 statsmodels 库中实现。通过将时间序列分解为这些成分,我们可以更好地理解和建模数据中的潜在模式。
但是,我们如何将分解纳入 Transformer 架构呢?让我们看看 Autoformer 是如何做到的。
Autoformer 中的分解
![]() |
---|
来自论文的 Autoformer 架构 |
Autoformer 将分解块作为模型的内部操作,如 Autoformer 架构图中所示。可以看出,编码器和解码器都使用分解块来聚合趋势-周期部分并从序列中逐步提取季节性部分。自 Autoformer 发表以来,内部分解的概念已证明其有用性。随后,它已被其他几篇时间序列论文采用,例如 FEDformer(Zhou, Tian, et al., ICML 2022)和 DLinear (Zeng, Ailing, et al., AAAI 2023),这凸显了它在时间序列建模中的重要性。
现在,我们来正式定义分解层
对于长度为 的输入序列 ,定义为 ,分解层返回
以及 PyTorch 中的实现
import torch
from torch import nn
class DecompositionLayer(nn.Module):
"""
Returns the trend and the seasonal parts of the time series.
"""
def __init__(self, kernel_size):
super().__init__()
self.kernel_size = kernel_size
self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=1, padding=0) # moving average
def forward(self, x):
"""Input shape: Batch x Time x EMBED_DIM"""
# padding on the both ends of time series
num_of_pads = (self.kernel_size - 1) // 2
front = x[:, 0:1, :].repeat(1, num_of_pads, 1)
end = x[:, -1:, :].repeat(1, num_of_pads, 1)
x_padded = torch.cat([front, x, end], dim=1)
# calculate the trend and seasonal part of the series
x_trend = self.avg(x_padded.permute(0, 2, 1)).permute(0, 2, 1)
x_seasonal = x - x_trend
return x_seasonal, x_trend
如您所见,这个实现非常简单,并且可以用于其他模型,正如我们将在 DLinear 中看到的那样。现在,让我们解释第二个贡献——*注意力(自相关)机制*。
注意力(自相关)机制
![]() |
---|
香草自注意力机制 vs 自相关机制,来自论文 |
除了分解层,Autoformer 还采用了一种新颖的自相关机制,它无缝地取代了自注意力。在香草时间序列 Transformer中,注意力权重在时域中计算并逐点聚合。另一方面,如上图所示,Autoformer 在频域中(使用快速傅里叶变换)计算它们,并通过时间延迟进行聚合。
在接下来的部分中,我们将详细深入探讨这些主题,并提供代码示例进行解释。
频域注意力
![]() |
---|
利用 FFT 在频域中计算注意力权重,来自论文 |
理论上,给定时间滞后 ,单个离散变量 的*自相关*用于衡量变量当前值在时间 与其过去值在时间 之间的“关系”(皮尔逊相关性)
Autoformer 利用自相关性从查询和键中提取基于频率的依赖关系,而不是它们之间的标准点积。您可以将其视为自注意力中 项的替代。
实际上,查询和键的自相关性会通过 FFT 一次性计算出**所有滞后**的自相关性。通过这样做,自相关机制实现了 的时间复杂度(其中 是输入时间长度),类似于 Informer 的 ProbSparse 注意力。请注意,使用 FFT 计算自相关性的理论基础是维纳-辛钦定理,这超出了本博客文章的范围。
现在,我们准备好查看 PyTorch 代码
import torch
def autocorrelation(query_states, key_states):
"""
Computes autocorrelation(Q,K) using `torch.fft`.
Think about it as a replacement for the QK^T in the self-attention.
Assumption: states are resized to same shape of [batch_size, time_length, embedding_dim].
"""
query_states_fft = torch.fft.rfft(query_states, dim=1)
key_states_fft = torch.fft.rfft(key_states, dim=1)
attn_weights = query_states_fft * torch.conj(key_states_fft)
attn_weights = torch.fft.irfft(attn_weights, dim=1)
return attn_weights
很简单!😎 请注意,这只是 autocorrelation(Q,K)
的部分实现,完整实现可以在 🤗 Transformers 中找到。
接下来,我们将了解如何通过时间延迟将 attn_weights
与值进行聚合,这一过程称为*时间延迟聚合*。
时间延迟聚合
![]() |
---|
按时间延迟聚合,来自Autoformer 论文 |
我们把自相关(称为 attn_weights
)看作 。问题来了:我们如何将这些 与 进行聚合?在标准的自注意力机制中,这种聚合是通过点积完成的。然而,在 Autoformer 中,我们采用了一种不同的方法。首先,我们通过计算 在每个时间延迟 的值来对其进行对齐,这也被称为*滚动*。随后,我们对对齐后的 和自相关进行逐元素相乘。在所提供的图中,您可以看到左侧展示了按时间延迟滚动的 ,而右侧则展示了与自相关的逐元素相乘。
可以用以下公式总结
就是这样!请注意, 由超参数 autocorrelation_factor
控制(类似于 Informer 中的 sampling_factor
),并在乘法之前将 softmax 应用于自相关。
现在,我们准备好查看最终代码
import torch
import math
def time_delay_aggregation(attn_weights, value_states, autocorrelation_factor=2):
"""
Computes aggregation as value_states.roll(delay) * top_k_autocorrelations(delay).
The final result is the autocorrelation-attention output.
Think about it as a replacement of the dot-product between attn_weights and value states.
The autocorrelation_factor is used to find top k autocorrelations delays.
Assumption: value_states and attn_weights shape: [batch_size, time_length, embedding_dim]
"""
bsz, num_heads, tgt_len, channel = ...
time_length = value_states.size(1)
autocorrelations = attn_weights.view(bsz, num_heads, tgt_len, channel)
# find top k autocorrelations delays
top_k = int(autocorrelation_factor * math.log(time_length))
autocorrelations_mean = torch.mean(autocorrelations, dim=(1, -1)) # bsz x tgt_len
top_k_autocorrelations, top_k_delays = torch.topk(autocorrelations_mean, top_k, dim=1)
# apply softmax on the channel dim
top_k_autocorrelations = torch.softmax(top_k_autocorrelations, dim=-1) # bsz x top_k
# compute aggregation: value_states.roll(delay) * top_k_autocorrelations(delay)
delays_agg = torch.zeros_like(value_states).float() # bsz x time_length x channel
for i in range(top_k):
value_states_roll_delay = value_states.roll(shifts=-int(top_k_delays[i]), dims=1)
top_k_at_delay = top_k_autocorrelations[:, i]
# aggregation
top_k_resized = top_k_at_delay.view(-1, 1, 1).repeat(num_heads, tgt_len, channel)
delays_agg += value_states_roll_delay * top_k_resized
attn_output = delays_agg.contiguous()
return attn_output
我们成功了!Autoformer 模型现已在 🤗 Transformers 库中可用,并简称为 AutoformerModel
。
我们使用此模型的策略是,展示单变量 Transformer 模型与 DLinear 模型(本质上是单变量模型,将在接下来展示)的性能比较。我们还将展示在相同数据上训练的**两个**多变量 Transformer 模型的评估结果。
DLinear - 内部解析
实际上,DLinear 的概念很简单:它只是一个带有 Autoformer 的 DecompositionLayer
的全连接网络。它使用上述 DecompositionLayer
将输入时间序列分解为残差(季节性)和趋势部分。在前向传播中,每个部分都会通过其自己的线性层,将信号投影到适当的 prediction_length
大小输出。最终输出是点预测模型中两个相应输出的总和
def forward(self, context):
seasonal, trend = self.decomposition(context)
seasonal_output = self.linear_seasonal(seasonal)
trend_output = self.linear_trend(trend)
return seasonal_output + trend_output
在概率设置中,可以将上下文长度数组通过 linear_seasonal
和 linear_trend
层投影到 prediction-length * hidden
维度。然后将得到的输出相加并重塑为 (prediction_length, hidden)
。最后,一个概率头部将大小为 hidden
的潜在表示映射到某个分布的参数。
在我们的基准测试中,我们使用来自 GluonTS 的 DLinear 实现。
示例:交通数据集
我们希望通过在 traffic
数据集上进行基准测试来实证展示库中基于 Transformer 的模型的性能,该数据集包含 862 个时间序列。我们将在每个独立时间序列(即单变量设置)上训练一个共享模型。每个时间序列表示一个传感器的占用值,范围在 [0, 1] 之间。我们将为所有模型固定以下超参数:
# Traffic prediction_length is 24. Reference:
# https://github.com/awslabs/gluonts/blob/6605ab1278b6bf92d5e47343efcf0d22bc50b2ec/src/gluonts/dataset/repository/_lstnet.py#L105
prediction_length = 24
context_length = prediction_length*2
batch_size = 128
num_batches_per_epoch = 100
epochs = 50
scaling = "std"
Transformer 模型都相对较小,具有
encoder_layers=2
decoder_layers=2
d_model=16
我们不再展示如何使用 Autoformer
训练模型,而是可以直接用新的 Autoformer
模型替换之前的两篇博客文章(TimeSeriesTransformer 和 Informer)中的模型,并在 traffic
数据集上进行训练。为了避免重复,我们已经训练了模型并将其推送到 HuggingFace Hub。我们将使用这些模型进行评估。
加载数据集
我们首先安装必要的库
!pip install -q transformers datasets evaluate accelerate "gluonts[torch]" ujson tqdm
由 Lai 等人 (2017) 使用的 traffic
数据集包含旧金山交通数据。它包含 862 个小时时间序列,显示 2015 年至 2016 年旧金山湾区高速公路上的道路占用率,范围在 之间。
from gluonts.dataset.repository.datasets import get_dataset
dataset = get_dataset("traffic")
freq = dataset.metadata.freq
prediction_length = dataset.metadata.prediction_length
我们来可视化数据集中的一个时间序列,并绘制训练/测试分割。
import matplotlib.pyplot as plt
train_example = next(iter(dataset.train))
test_example = next(iter(dataset.test))
num_of_samples = 4*prediction_length
figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
test_example["target"][-num_of_samples - prediction_length :],
color="red",
alpha=0.5,
)
plt.show()
我们来定义训练/测试分割
train_dataset = dataset.train
test_dataset = dataset.test
定义转换
接下来,我们定义数据的转换,特别是用于创建时间特征(基于数据集或通用特征)的转换。
我们定义了一个 GluonTS 的 Chain
转换(有点类似于图像的 torchvision.transforms.Compose
)。它允许我们将多个转换组合成一个单一的管道。
下面的转换都带有注释,以解释它们的功能。从高层次来看,我们将遍历数据集中的各个时间序列并添加/删除字段或特征。
from transformers import PretrainedConfig
from gluonts.time_feature import time_features_from_frequency_str
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
AddAgeFeature,
AddObservedValuesIndicator,
AddTimeFeatures,
AsNumpyArray,
Chain,
ExpectedNumInstanceSampler,
RemoveFields,
SelectFields,
SetField,
TestSplitSampler,
Transformation,
ValidationSplitSampler,
VstackFeatures,
RenameFields,
)
def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
# create a list of fields to remove later
remove_field_names = []
if config.num_static_real_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_REAL)
if config.num_dynamic_real_features == 0:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
if config.num_static_categorical_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_CAT)
return Chain(
# step 1: remove static/dynamic fields if not specified
[RemoveFields(field_names=remove_field_names)]
# step 2: convert the data to NumPy (potentially not needed)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT,
expected_ndim=1,
dtype=int,
)
]
if config.num_static_categorical_features > 0
else []
)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_REAL,
expected_ndim=1,
)
]
if config.num_static_real_features > 0
else []
)
+ [
AsNumpyArray(
field=FieldName.TARGET,
# we expect an extra dim for the multivariate case:
expected_ndim=1 if config.input_size == 1 else 2,
),
# step 3: handle the NaN's by filling in the target with zero
# and return the mask (which is in the observed values)
# true for observed values, false for nan's
# the decoder uses this mask (no loss is incurred for unobserved values)
# see loss_weights inside the xxxForPrediction model
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
# step 4: add temporal features based on freq of the dataset
# these serve as positional encodings
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(freq),
pred_length=config.prediction_length,
),
# step 5: add another temporal feature (just a single number)
# tells the model where in the life the value of the time series is
# sort of running counter
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=config.prediction_length,
log_scale=True,
),
# step 6: vertically stack all the temporal features into the key FEAT_TIME
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if config.num_dynamic_real_features > 0
else []
),
),
# step 7: rename to match HuggingFace names
RenameFields(
mapping={
FieldName.FEAT_STATIC_CAT: "static_categorical_features",
FieldName.FEAT_STATIC_REAL: "static_real_features",
FieldName.FEAT_TIME: "time_features",
FieldName.TARGET: "values",
FieldName.OBSERVED_VALUES: "observed_mask",
}
),
]
)
定义 InstanceSplitter
为了进行训练/验证/测试,我们接下来创建一个 InstanceSplitter
,它用于从数据集中采样窗口(因为,请记住,由于时间和内存限制,我们无法将整个历史值传递给模型)。
实例分割器从数据中采样随机的 context_length
大小和随后的 prediction_length
大小的窗口,并为相应窗口中的任何时间序列键在 time_series_fields
中附加 past_
或 future_
前缀。实例分割器可以配置为三种不同的模式:
mode="train"
:在这种模式下,我们从给定数据集(训练数据集)中随机采样上下文和预测长度窗口。mode="validation"
:在这种模式下,我们从给定数据集(用于回溯测试或验证似然计算)中采样最后一个上下文长度窗口和预测窗口。mode="test"
:在这种模式下,我们仅采样最后一个上下文长度窗口(用于预测用例)。
from gluonts.transform import InstanceSplitter
from gluonts.transform.sampler import InstanceSampler
from typing import Optional
def create_instance_splitter(
config: PretrainedConfig,
mode: str,
train_sampler: Optional[InstanceSampler] = None,
validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
assert mode in ["train", "validation", "test"]
instance_sampler = {
"train": train_sampler
or ExpectedNumInstanceSampler(
num_instances=1.0, min_future=config.prediction_length
),
"validation": validation_sampler
or ValidationSplitSampler(min_future=config.prediction_length),
"test": TestSplitSampler(),
}[mode]
return InstanceSplitter(
target_field="values",
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=instance_sampler,
past_length=config.context_length + max(config.lags_sequence),
future_length=config.prediction_length,
time_series_fields=["time_features", "observed_mask"],
)
创建 PyTorch DataLoaders
接下来,是时候创建 PyTorch DataLoaders 了,它们允许我们拥有批量的(输入,输出)对——换句话说,(past_values
,future_values
)。
from typing import Iterable
import torch
from gluonts.itertools import Cyclic, Cached
from gluonts.dataset.loader import as_stacked_batches
def create_train_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
num_batches_per_epoch: int,
shuffle_buffer_length: Optional[int] = None,
cache_data: bool = True,
**kwargs,
) -> Iterable:
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
"future_values",
"future_observed_mask",
]
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=True)
if cache_data:
transformed_data = Cached(transformed_data)
# we initialize a Training instance
instance_splitter = create_instance_splitter(config, "train")
# the instance splitter will sample a window of
# context length + lags + prediction length (from the 366 possible transformed time series)
# randomly from within the target time series and return an iterator.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(stream)
return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
def create_backtest_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
**kwargs,
):
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data)
# we create a Validation Instance splitter which will sample the very last
# context window seen during training only for the encoder.
instance_sampler = create_instance_splitter(config, "validation")
# we apply the transformations in train mode
testing_instances = instance_sampler.apply(transformed_data, is_train=True)
return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
def create_test_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
**kwargs,
):
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=False)
# We create a test Instance splitter to sample the very last
# context window from the dataset provided.
instance_sampler = create_instance_splitter(config, "test")
# We apply the transformations in test mode
testing_instances = instance_sampler.apply(transformed_data, is_train=False)
return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
在 Autoformer 上评估
我们已经在这个数据集上预训练了一个 Autoformer 模型,所以我们只需获取模型并在测试集上对其进行评估
from transformers import AutoformerConfig, AutoformerForPrediction
config = AutoformerConfig.from_pretrained("kashif/autoformer-traffic-hourly")
model = AutoformerForPrediction.from_pretrained("kashif/autoformer-traffic-hourly")
test_dataloader = create_backtest_dataloader(
config=config,
freq=freq,
data=test_dataset,
batch_size=64,
)
在推理时,我们将使用模型的 generate()
方法,从训练集中每个时间序列的最后一个上下文窗口预测未来 prediction_length
步。
from accelerate import Accelerator
accelerator = Accelerator()
device = accelerator.device
model.to(device)
model.eval()
forecasts_ = []
for batch in test_dataloader:
outputs = model.generate(
static_categorical_features=batch["static_categorical_features"].to(device)
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"].to(device)
if config.num_static_real_features > 0
else None,
past_time_features=batch["past_time_features"].to(device),
past_values=batch["past_values"].to(device),
future_time_features=batch["future_time_features"].to(device),
past_observed_mask=batch["past_observed_mask"].to(device),
)
forecasts_.append(outputs.sequences.cpu().numpy())
模型输出的张量形状为 (batch_size
, 样本数
, 预测长度
, 输入大小
)。
在这种情况下,我们为测试数据加载器批次中的每个时间序列(您还记得上面是 64
)的未来 24
小时获得 100
个可能的值。
forecasts_[0].shape
>>> (64, 100, 24)
我们将它们垂直堆叠,以获取测试数据集中所有时间序列的预测结果:测试集中有 7
个滚动窗口,这就是为什么我们最终得到总计 7 * 862 = 6034
个预测。
import numpy as np
forecasts = np.vstack(forecasts_)
print(forecasts.shape)
>>> (6034, 100, 24)
我们可以根据测试集中存在的样本外真实值评估由此产生的预测。为此,我们将使用 🤗 Evaluate 库,其中包括 MASE 指标。
我们计算数据集中每个时间序列的指标并返回平均值
from tqdm.autonotebook import tqdm
from evaluate import load
from gluonts.time_feature import get_seasonality
mase_metric = load("evaluate-metric/mase")
forecast_median = np.median(forecasts, 1)
mase_metrics = []
for item_id, ts in enumerate(tqdm(test_dataset)):
training_data = ts["target"][:-prediction_length]
ground_truth = ts["target"][-prediction_length:]
mase = mase_metric.compute(
predictions=forecast_median[item_id],
references=np.array(ground_truth),
training=np.array(training_data),
periodicity=get_seasonality(freq))
mase_metrics.append(mase["mase"])
因此,Autoformer 模型的结果是
print(f"Autoformer univariate MASE: {np.mean(mase_metrics):.3f}")
>>> Autoformer univariate MASE: 0.910
为了绘制任何时间序列相对于真实测试数据的预测图,我们定义了以下辅助函数
import matplotlib.dates as mdates
import pandas as pd
test_ds = list(test_dataset)
def plot(ts_index):
fig, ax = plt.subplots()
index = pd.period_range(
start=test_ds[ts_index][FieldName.START],
periods=len(test_ds[ts_index][FieldName.TARGET]),
freq=test_ds[ts_index][FieldName.START].freq,
).to_timestamp()
ax.plot(
index[-5*prediction_length:],
test_ds[ts_index]["target"][-5*prediction_length:],
label="actual",
)
plt.plot(
index[-prediction_length:],
np.median(forecasts[ts_index], axis=0),
label="median",
)
plt.gcf().autofmt_xdate()
plt.legend(loc="best")
plt.show()
例如,对于测试集中索引为 4
的时间序列
plot(4)
在 DLinear 上评估
概率 DLinear 在 gluonts
中实现,因此我们可以在这里相对快速地对其进行训练和评估
from gluonts.torch.model.d_linear.estimator import DLinearEstimator
# Define the DLinear model with the same parameters as the Autoformer model
estimator = DLinearEstimator(
prediction_length=dataset.metadata.prediction_length,
context_length=dataset.metadata.prediction_length*2,
scaling=scaling,
hidden_dimension=2,
batch_size=batch_size,
num_batches_per_epoch=num_batches_per_epoch,
trainer_kwargs=dict(max_epochs=epochs)
)
训练模型
predictor = estimator.train(
training_data=train_dataset,
cache_data=True,
shuffle_buffer_length=1024
)
>>> INFO:pytorch_lightning.callbacks.model_summary:
| Name | Type | Params
---------------------------------------
0 | model | DLinearModel | 4.7 K
---------------------------------------
4.7 K Trainable params
0 Non-trainable params
4.7 K Total params
0.019 Total estimated model params size (MB)
Training: 0it [00:00, ?it/s]
...
INFO:pytorch_lightning.utilities.rank_zero:Epoch 49, global step 5000: 'train_loss' was not in top 1
INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=50` reached.
并在测试集上对其进行评估
from gluonts.evaluation import make_evaluation_predictions, Evaluator
forecast_it, ts_it = make_evaluation_predictions(
dataset=dataset.test,
predictor=predictor,
)
d_linear_forecasts = list(forecast_it)
d_linear_tss = list(ts_it)
evaluator = Evaluator()
agg_metrics, _ = evaluator(iter(d_linear_tss), iter(d_linear_forecasts))
因此,DLinear 模型的结果是
dlinear_mase = agg_metrics["MASE"]
print(f"DLinear MASE: {dlinear_mase:.3f}")
>>> DLinear MASE: 0.965
和以前一样,我们通过此辅助函数绘制了我们训练过的 DLinear 模型的预测图
def plot_gluonts(index):
plt.plot(d_linear_tss[index][-4 * dataset.metadata.prediction_length:].to_timestamp(), label="target")
d_linear_forecasts[index].plot(show_label=True, color='g')
plt.legend()
plt.gcf().autofmt_xdate()
plt.show()
plot_gluonts(4)
traffic
数据集在工作日和周末之间的传感器模式存在分布偏移。那么这里发生了什么?由于 DLinear 模型无法整合协变量,特别是任何日期时间特征,我们给它的上下文窗口没有足够的信息来判断预测是针对周末还是工作日。因此,模型将预测更常见的模式,即工作日,从而导致周末性能较差。当然,通过提供更大的上下文窗口,线性模型将能够找出每周模式,但数据中可能存在每月或每季度模式,这将需要越来越大的上下文。
结论
基于 Transformer 的模型与上述线性基线相比如何?我们不同模型的测试集 MASE 指标如下:
数据集 | Transformer(单变量) | Transformer(多变量) | Informer(单变量) | Informer(多变量) | Autoformer(单变量) | DLinear |
---|---|---|---|---|---|---|
交通 |
0.876 | 1.046 | 0.924 | 1.131 | 0.910 | 0.965 |
正如我们所观察到的,我们去年推出的普通Transformer在这里获得了最佳结果。其次,多变量模型通常差于单变量模型,原因在于难以估计跨序列相关性/关系。估计值带来的额外方差通常会损害最终的预测结果,或者模型会学习到虚假相关性。最近的论文,如CrossFormer(ICLR 23)和CARD,试图解决Transformer模型中的这一问题。多变量模型通常在大量数据上训练时表现良好。然而,与单变量模型相比,特别是在较小的开放数据集上,单变量模型往往能提供更好的指标。通过将线性模型与同等大小的单变量Transformer或其他任何神经网络单变量模型进行比较,通常会获得更好的性能。
总而言之,Transformer 模型在时间序列预测方面绝对远未过时!然而,大规模数据集的可用性对于最大限度地发挥其潜力至关重要。与计算机视觉和自然语言处理领域不同,时间序列领域缺乏可公开访问的大规模数据集。大多数现有时间序列预训练模型都在像 UCR 和 UEA 这样的档案中进行小样本训练,这些档案只包含几千甚至几百个样本。尽管这些基准数据集在时间序列社区的进步中发挥了重要作用,但其有限的样本量和缺乏通用性对深度学习模型的预训练构成了挑战。
因此,开发大规模、通用时间序列数据集(如计算机视觉中的 ImageNet)至关重要。创建此类数据集将极大地促进专门为时间序列分析设计的预训练模型的进一步研究,并提高预训练模型在时间序列预测中的适用性。
致谢
我们感谢 Lysandre Debut 和 Pedro Cuenca 在本项目中提供的富有洞察力的评论和帮助 ❤️。