是的,Transformer 模型对于时间序列预测是有效的 (+ Autoformer)

发布日期:2023年6月16日
在 GitHub 上更新
Open In Colab

引言

几个月前,我们介绍了 Informer 模型(Zhou, Haoyi, et al., 2021),它是一个时间序列 Transformer 模型,并获得了 AAAI 2021 最佳论文奖。我们还提供了使用 Informer 进行多元概率预测的示例。在这篇文章中,我们将讨论一个问题:Transformer 模型对时间序列预测是否有效?(AAAI 2023)。正如我们将看到的,它们是有效的。

首先,我们将提供经验证据表明 **Transformer 模型确实对时间序列预测有效**。我们的比较表明,简单的线性模型 DLinear 并不像所声称的那样优于 Transformer 模型。当与相同设置下相同大小的线性模型进行比较时,基于 Transformer 的模型在我们考虑的测试集指标上表现更好。之后,我们将介绍 Autoformer 模型(Wu, Haixu, et al., 2021),该模型于 Informer 模型之后,在 NeurIPS 2021 上发表。Autoformer 模型现已在 🤗 Transformers 中可用。最后,我们将讨论 DLinear 模型,它是一个简单的前馈网络,使用了 Autoformer 的分解层。DLinear 模型首次在《Transformer 模型对时间序列预测是否有效?》中介绍,并声称在时间序列预测中优于基于 Transformer 的模型。

冲呀!

基准测试 - Transformer 模型 vs. DLinear

在最近发表在 AAAI 2023 的论文 《Transformer 模型对时间序列预测是否有效?》中,作者声称 Transformer 模型对时间序列预测无效。他们将基于 Transformer 的模型与他们称为 DLinear 的简单线性模型进行比较。DLinear 模型使用了 Autoformer 模型中的分解层,我们将在本文后面介绍。作者声称 DLinear 模型在时间序列预测中优于基于 Transformer 的模型。是这样吗?让我们来一探究竟。

数据集 Autoformer (univariate) MASE DLinear MASE
交通 0.910 0.965
汇率 1.087 1.690
电力 0.751 0.831

上表显示了 Autoformer 和 DLinear 模型在论文中使用的三个数据集上的比较结果。
结果表明,Autoformer 模型在所有三个数据集上都优于 DLinear 模型。

接下来,我们将介绍新的 Autoformer 模型以及 DLinear 模型。我们将展示如何将它们与上表中的交通数据集进行比较,并解释我们获得的结果。

总结: 尽管简单的线性模型在某些情况下具有优势,但与 Transformer 等更复杂的模型相比,在单变量设置下,它无法融合协变量。

Autoformer - 内部解析

Autoformer 建立在将时间序列分解为季节性和趋势-周期分量的传统方法之上。这通过引入一个*分解层*来实现,该层增强了模型准确捕获这些分量的能力。此外,Autoformer 引入了一种创新的自相关机制,取代了香草 Transformer 中使用的标准自注意力。这种机制使模型能够利用基于周期的依赖性进行注意力,从而提高整体性能。

在接下来的部分中,我们将深入探讨 Autoformer 的两个关键贡献:*分解层*和*注意力(自相关)机制*。我们还将提供代码示例,以说明这些组件在 Autoformer 架构中的功能。

分解层

分解长期以来一直是时间序列分析中流行的方法,但在 Autoformer 论文问世之前,它尚未广泛地融入深度学习模型。在对概念进行简要解释之后,我们将使用 PyTorch 代码演示如何在 Autoformer 中应用这一思想。

时间序列分解

在时间序列分析中,分解是一种将时间序列分解为三个系统性成分的方法:趋势-周期、季节性变化和随机波动。趋势成分表示时间序列的长期方向,它可以随时间增加、减少或保持稳定。季节性成分表示时间序列中出现的周期性模式,例如年度或季度周期。最后,随机(有时称为“不规则”)成分表示数据中无法通过趋势或季节性成分解释的随机噪声。

分解主要有两种类型:加法分解和乘法分解,它们在优秀的 statsmodels 库中实现。通过将时间序列分解为这些成分,我们可以更好地理解和建模数据中的潜在模式。

但是,我们如何将分解纳入 Transformer 架构呢?让我们看看 Autoformer 是如何做到的。

Autoformer 中的分解

autoformer_architecture
来自论文的 Autoformer 架构

Autoformer 将分解块作为模型的内部操作,如 Autoformer 架构图中所示。可以看出,编码器和解码器都使用分解块来聚合趋势-周期部分并从序列中逐步提取季节性部分。自 Autoformer 发表以来,内部分解的概念已证明其有用性。随后,它已被其他几篇时间序列论文采用,例如 FEDformer(Zhou, Tian, et al., ICML 2022)和 DLinear (Zeng, Ailing, et al., AAAI 2023),这凸显了它在时间序列建模中的重要性。

现在,我们来正式定义分解层

对于长度为 LL 的输入序列 XRL×d\mathcal{X} \in \mathbb{R}^{L \times d},分解层返回 Xtrend,Xseasonal\mathcal{X}_\textrm{trend}, \mathcal{X}_\textrm{seasonal},定义为

Xtrend=AvgPool(Padding(X))Xseasonal=XXtrend \mathcal{X}_\textrm{trend} = \textrm{AvgPool(Padding(} \mathcal{X} \textrm{))} \\ \mathcal{X}_\textrm{seasonal} = \mathcal{X} - \mathcal{X}_\textrm{trend}

以及 PyTorch 中的实现

import torch
from torch import nn

class DecompositionLayer(nn.Module):
    """
    Returns the trend and the seasonal parts of the time series.
    """

    def __init__(self, kernel_size):
        super().__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=1, padding=0) # moving average 

    def forward(self, x):
        """Input shape: Batch x Time x EMBED_DIM"""
        # padding on the both ends of time series
        num_of_pads = (self.kernel_size - 1) // 2
        front = x[:, 0:1, :].repeat(1, num_of_pads, 1)
        end = x[:, -1:, :].repeat(1, num_of_pads, 1)
        x_padded = torch.cat([front, x, end], dim=1)

        # calculate the trend and seasonal part of the series
        x_trend = self.avg(x_padded.permute(0, 2, 1)).permute(0, 2, 1)
        x_seasonal = x - x_trend
        return x_seasonal, x_trend

如您所见,这个实现非常简单,并且可以用于其他模型,正如我们将在 DLinear 中看到的那样。现在,让我们解释第二个贡献——*注意力(自相关)机制*。

注意力(自相关)机制

autoformer_autocorrelation_vs_full_attention
香草自注意力机制 vs 自相关机制,来自论文

除了分解层,Autoformer 还采用了一种新颖的自相关机制,它无缝地取代了自注意力。在香草时间序列 Transformer中,注意力权重在时域中计算并逐点聚合。另一方面,如上图所示,Autoformer 在频域中(使用快速傅里叶变换)计算它们,并通过时间延迟进行聚合。

在接下来的部分中,我们将详细深入探讨这些主题,并提供代码示例进行解释。

频域注意力

autoformer_autocorrelation_only_attention
利用 FFT 在频域中计算注意力权重,来自论文

理论上,给定时间滞后 τ\tau,单个离散变量 yy 的*自相关*用于衡量变量当前值在时间 tt 与其过去值在时间 tτt-\tau 之间的“关系”(皮尔逊相关性)

Autocorrelation(τ)=Corr(yt,ytτ) \textrm{Autocorrelation}(\tau) = \textrm{Corr}(y_t, y_{t-\tau})

Autoformer 利用自相关性从查询和键中提取基于频率的依赖关系,而不是它们之间的标准点积。您可以将其视为自注意力中 QKTQK^T 项的替代。

实际上,查询和键的自相关性会通过 FFT 一次性计算出**所有滞后**的自相关性。通过这样做,自相关机制实现了 O(LlogL)O(L \log L) 的时间复杂度(其中 LL 是输入时间长度),类似于 Informer 的 ProbSparse 注意力。请注意,使用 FFT 计算自相关性的理论基础是维纳-辛钦定理,这超出了本博客文章的范围。

现在,我们准备好查看 PyTorch 代码

import torch 

def autocorrelation(query_states, key_states):
    """
    Computes autocorrelation(Q,K) using `torch.fft`. 
    Think about it as a replacement for the QK^T in the self-attention.
    
    Assumption: states are resized to same shape of [batch_size, time_length, embedding_dim].
    """
    query_states_fft = torch.fft.rfft(query_states, dim=1)
    key_states_fft = torch.fft.rfft(key_states, dim=1)
    attn_weights = query_states_fft * torch.conj(key_states_fft)
    attn_weights = torch.fft.irfft(attn_weights, dim=1)  
    
    return attn_weights

很简单!😎 请注意,这只是 autocorrelation(Q,K) 的部分实现,完整实现可以在 🤗 Transformers 中找到。

接下来,我们将了解如何通过时间延迟将 attn_weights 与值进行聚合,这一过程称为*时间延迟聚合*。

时间延迟聚合

autoformer_autocorrelation_only_aggregation
按时间延迟聚合,来自Autoformer 论文

我们把自相关(称为 attn_weights)看作 RQ,K\mathcal{R_{Q,K}}。问题来了:我们如何将这些 RQ,K(τ1),RQ,K(τ2),...,RQ,K(τk)\mathcal{R_{Q,K}}(\tau_1), \mathcal{R_{Q,K}}(\tau_2), ..., \mathcal{R_{Q,K}}(\tau_k)V\mathcal{V} 进行聚合?在标准的自注意力机制中,这种聚合是通过点积完成的。然而,在 Autoformer 中,我们采用了一种不同的方法。首先,我们通过计算 V\mathcal{V} 在每个时间延迟 τ1,τ2,...τk\tau_1, \tau_2, ... \tau_k 的值来对其进行对齐,这也被称为*滚动*。随后,我们对对齐后的 V\mathcal{V} 和自相关进行逐元素相乘。在所提供的图中,您可以看到左侧展示了按时间延迟滚动的 V\mathcal{V},而右侧则展示了与自相关的逐元素相乘。

可以用以下公式总结

τ1,τ2,...τk=arg Top-k(RQ,K(τ))R^Q,K(τ1),R^Q,K(τ2),...,R^Q,K)=Softmax(RQ,K(τ1),RQ,K(τ2),...,RQ,K(τk))Autocorrelation-Attention=i=1kRoll(V,τi)R^Q,K(τi) \tau_1, \tau_2, ... \tau_k = \textrm{arg Top-k}(\mathcal{R_{Q,K}}(\tau)) \\ \hat{\mathcal{R}}\mathcal{_{Q,K}}(\tau _1), \hat{\mathcal{R}}\mathcal{_{Q,K}}(\tau _2), ..., \hat{\mathcal{R}}\mathcal{_{Q,K}}(\tau _k) = \textrm{Softmax}(\mathcal{R_{Q,K}}(\tau _1), \mathcal{R_{Q,K}}(\tau_2), ..., \mathcal{R_{Q,K}}(\tau_k)) \\ \textrm{Autocorrelation-Attention} = \sum_{i=1}^k \textrm{Roll}(\mathcal{V}, \tau_i) \cdot \hat{\mathcal{R}}\mathcal{_{Q,K}}(\tau _i)

就是这样!请注意,kk 由超参数 autocorrelation_factor 控制(类似于 Informer 中的 sampling_factor),并在乘法之前将 softmax 应用于自相关。

现在,我们准备好查看最终代码

import torch
import math

def time_delay_aggregation(attn_weights, value_states, autocorrelation_factor=2):
    """
    Computes aggregation as value_states.roll(delay) * top_k_autocorrelations(delay).
    The final result is the autocorrelation-attention output.
    Think about it as a replacement of the dot-product between attn_weights and value states.
    
    The autocorrelation_factor is used to find top k autocorrelations delays.
    Assumption: value_states and attn_weights shape: [batch_size, time_length, embedding_dim]
    """
    bsz, num_heads, tgt_len, channel = ...
    time_length = value_states.size(1)
    autocorrelations = attn_weights.view(bsz, num_heads, tgt_len, channel)

    # find top k autocorrelations delays
    top_k = int(autocorrelation_factor * math.log(time_length))
    autocorrelations_mean = torch.mean(autocorrelations, dim=(1, -1))  # bsz x tgt_len
    top_k_autocorrelations, top_k_delays = torch.topk(autocorrelations_mean, top_k, dim=1)

    # apply softmax on the channel dim
    top_k_autocorrelations = torch.softmax(top_k_autocorrelations, dim=-1)  # bsz x top_k

    # compute aggregation: value_states.roll(delay) * top_k_autocorrelations(delay)
    delays_agg = torch.zeros_like(value_states).float()  # bsz x time_length x channel
    for i in range(top_k):
        value_states_roll_delay = value_states.roll(shifts=-int(top_k_delays[i]), dims=1)
        top_k_at_delay = top_k_autocorrelations[:, i]
        # aggregation
        top_k_resized = top_k_at_delay.view(-1, 1, 1).repeat(num_heads, tgt_len, channel)
        delays_agg += value_states_roll_delay * top_k_resized

    attn_output = delays_agg.contiguous()
    return attn_output

我们成功了!Autoformer 模型现已在 🤗 Transformers 库中可用,并简称为 AutoformerModel

我们使用此模型的策略是,展示单变量 Transformer 模型与 DLinear 模型(本质上是单变量模型,将在接下来展示)的性能比较。我们还将展示在相同数据上训练的**两个**多变量 Transformer 模型的评估结果。

DLinear - 内部解析

实际上,DLinear 的概念很简单:它只是一个带有 Autoformer 的 DecompositionLayer 的全连接网络。它使用上述 DecompositionLayer 将输入时间序列分解为残差(季节性)和趋势部分。在前向传播中,每个部分都会通过其自己的线性层,将信号投影到适当的 prediction_length 大小输出。最终输出是点预测模型中两个相应输出的总和

def forward(self, context):
    seasonal, trend = self.decomposition(context)
    seasonal_output = self.linear_seasonal(seasonal)
    trend_output = self.linear_trend(trend)
    return seasonal_output + trend_output

在概率设置中,可以将上下文长度数组通过 linear_seasonallinear_trend 层投影到 prediction-length * hidden 维度。然后将得到的输出相加并重塑为 (prediction_length, hidden)。最后,一个概率头部将大小为 hidden 的潜在表示映射到某个分布的参数。

在我们的基准测试中,我们使用来自 GluonTS 的 DLinear 实现。

示例:交通数据集

我们希望通过在 traffic 数据集上进行基准测试来实证展示库中基于 Transformer 的模型的性能,该数据集包含 862 个时间序列。我们将在每个独立时间序列(即单变量设置)上训练一个共享模型。每个时间序列表示一个传感器的占用值,范围在 [0, 1] 之间。我们将为所有模型固定以下超参数:

# Traffic prediction_length is 24. Reference:
# https://github.com/awslabs/gluonts/blob/6605ab1278b6bf92d5e47343efcf0d22bc50b2ec/src/gluonts/dataset/repository/_lstnet.py#L105

prediction_length = 24
context_length = prediction_length*2
batch_size = 128
num_batches_per_epoch = 100
epochs = 50
scaling = "std"

Transformer 模型都相对较小,具有

encoder_layers=2
decoder_layers=2
d_model=16

我们不再展示如何使用 Autoformer 训练模型,而是可以直接用新的 Autoformer 模型替换之前的两篇博客文章(TimeSeriesTransformerInformer)中的模型,并在 traffic 数据集上进行训练。为了避免重复,我们已经训练了模型并将其推送到 HuggingFace Hub。我们将使用这些模型进行评估。

加载数据集

我们首先安装必要的库

!pip install -q transformers datasets evaluate accelerate "gluonts[torch]" ujson tqdm

Lai 等人 (2017) 使用的 traffic 数据集包含旧金山交通数据。它包含 862 个小时时间序列,显示 2015 年至 2016 年旧金山湾区高速公路上的道路占用率,范围在 [0,1][0, 1] 之间。

from gluonts.dataset.repository.datasets import get_dataset

dataset = get_dataset("traffic")
freq = dataset.metadata.freq
prediction_length = dataset.metadata.prediction_length

我们来可视化数据集中的一个时间序列,并绘制训练/测试分割。

import matplotlib.pyplot as plt

train_example = next(iter(dataset.train))
test_example = next(iter(dataset.test))

num_of_samples = 4*prediction_length

figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
    test_example["target"][-num_of_samples - prediction_length :],
    color="red",
    alpha=0.5,
)

plt.show()

png

我们来定义训练/测试分割

train_dataset = dataset.train
test_dataset = dataset.test

定义转换

接下来,我们定义数据的转换,特别是用于创建时间特征(基于数据集或通用特征)的转换。

我们定义了一个 GluonTS 的 Chain 转换(有点类似于图像的 torchvision.transforms.Compose)。它允许我们将多个转换组合成一个单一的管道。

下面的转换都带有注释,以解释它们的功能。从高层次来看,我们将遍历数据集中的各个时间序列并添加/删除字段或特征。

from transformers import PretrainedConfig
from gluonts.time_feature import time_features_from_frequency_str

from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    # create a list of fields to remove later
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)

    return Chain(
        # step 1: remove static/dynamic fields if not specified
        [RemoveFields(field_names=remove_field_names)]
        # step 2: convert the data to NumPy (potentially not needed)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # we expect an extra dim for the multivariate case:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # step 3: handle the NaN's by filling in the target with zero
            # and return the mask (which is in the observed values)
            # true for observed values, false for nan's
            # the decoder uses this mask (no loss is incurred for unobserved values)
            # see loss_weights inside the xxxForPrediction model
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # step 4: add temporal features based on freq of the dataset
            # these serve as positional encodings
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # step 5: add another temporal feature (just a single number)
            # tells the model where in the life the value of the time series is
            # sort of running counter
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # step 6: vertically stack all the temporal features into the key FEAT_TIME
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # step 7: rename to match HuggingFace names
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

定义 InstanceSplitter

为了进行训练/验证/测试,我们接下来创建一个 InstanceSplitter,它用于从数据集中采样窗口(因为,请记住,由于时间和内存限制,我们无法将整个历史值传递给模型)。

实例分割器从数据中采样随机的 context_length 大小和随后的 prediction_length 大小的窗口,并为相应窗口中的任何时间序列键在 time_series_fields 中附加 past_future_ 前缀。实例分割器可以配置为三种不同的模式:

  1. mode="train":在这种模式下,我们从给定数据集(训练数据集)中随机采样上下文和预测长度窗口。
  2. mode="validation":在这种模式下,我们从给定数据集(用于回溯测试或验证似然计算)中采样最后一个上下文长度窗口和预测窗口。
  3. mode="test":在这种模式下,我们仅采样最后一个上下文长度窗口(用于预测用例)。
from gluonts.transform import InstanceSplitter
from gluonts.transform.sampler import InstanceSampler
from typing import Optional


def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

创建 PyTorch DataLoaders

接下来,是时候创建 PyTorch DataLoaders 了,它们允许我们拥有批量的(输入,输出)对——换句话说,(past_valuesfuture_values)。

from typing import Iterable

import torch
from gluonts.itertools import Cyclic, Cached
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)

    # we initialize a Training instance
    instance_splitter = create_instance_splitter(config, "train")

    # the instance splitter will sample a window of
    # context length + lags + prediction length (from the 366 possible transformed time series)
    # randomly from within the target time series and return an iterator.
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(stream)

    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )

def create_backtest_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data)

    # we create a Validation Instance splitter which will sample the very last
    # context window seen during training only for the encoder.
    instance_sampler = create_instance_splitter(config, "validation")

    # we apply the transformations in train mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=True)

    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)

    # We create a test Instance splitter to sample the very last
    # context window from the dataset provided.
    instance_sampler = create_instance_splitter(config, "test")

    # We apply the transformations in test mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

在 Autoformer 上评估

我们已经在这个数据集上预训练了一个 Autoformer 模型,所以我们只需获取模型并在测试集上对其进行评估

from transformers import AutoformerConfig, AutoformerForPrediction

config = AutoformerConfig.from_pretrained("kashif/autoformer-traffic-hourly")
model = AutoformerForPrediction.from_pretrained("kashif/autoformer-traffic-hourly")

test_dataloader = create_backtest_dataloader(
    config=config,
    freq=freq,
    data=test_dataset,
    batch_size=64,
)

在推理时,我们将使用模型的 generate() 方法,从训练集中每个时间序列的最后一个上下文窗口预测未来 prediction_length 步。

from accelerate import Accelerator

accelerator = Accelerator()
device = accelerator.device
model.to(device)
model.eval()

forecasts_ = []
for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts_.append(outputs.sequences.cpu().numpy())

模型输出的张量形状为 (batch_size, 样本数, 预测长度, 输入大小)。

在这种情况下,我们为测试数据加载器批次中的每个时间序列(您还记得上面是 64)的未来 24 小时获得 100 个可能的值。

forecasts_[0].shape

>>> (64, 100, 24)

我们将它们垂直堆叠,以获取测试数据集中所有时间序列的预测结果:测试集中有 7 个滚动窗口,这就是为什么我们最终得到总计 7 * 862 = 6034 个预测。

import numpy as np

forecasts = np.vstack(forecasts_)
print(forecasts.shape)

>>> (6034, 100, 24)

我们可以根据测试集中存在的样本外真实值评估由此产生的预测。为此,我们将使用 🤗 Evaluate 库,其中包括 MASE 指标。

我们计算数据集中每个时间序列的指标并返回平均值

from tqdm.autonotebook import tqdm
from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")

forecast_median = np.median(forecasts, 1)

mase_metrics = []
for item_id, ts in enumerate(tqdm(test_dataset)):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
        training=np.array(training_data), 
        periodicity=get_seasonality(freq))
    mase_metrics.append(mase["mase"])

因此,Autoformer 模型的结果是

print(f"Autoformer univariate MASE: {np.mean(mase_metrics):.3f}")

>>> Autoformer univariate MASE: 0.910

为了绘制任何时间序列相对于真实测试数据的预测图,我们定义了以下辅助函数

import matplotlib.dates as mdates
import pandas as pd

test_ds = list(test_dataset)

def plot(ts_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=test_ds[ts_index][FieldName.START],
        periods=len(test_ds[ts_index][FieldName.TARGET]),
        freq=test_ds[ts_index][FieldName.START].freq,
    ).to_timestamp()

    ax.plot(
        index[-5*prediction_length:], 
        test_ds[ts_index]["target"][-5*prediction_length:],
        label="actual",
    )

    plt.plot(
        index[-prediction_length:], 
        np.median(forecasts[ts_index], axis=0),
        label="median",
    )
    
    plt.gcf().autofmt_xdate()
    plt.legend(loc="best")
    plt.show()

例如,对于测试集中索引为 4 的时间序列

plot(4)

png

在 DLinear 上评估

概率 DLinear 在 gluonts 中实现,因此我们可以在这里相对快速地对其进行训练和评估

from gluonts.torch.model.d_linear.estimator import DLinearEstimator

# Define the DLinear model with the same parameters as the Autoformer model
estimator = DLinearEstimator(
    prediction_length=dataset.metadata.prediction_length,
    context_length=dataset.metadata.prediction_length*2,
    scaling=scaling,
    hidden_dimension=2,
    
    batch_size=batch_size,
    num_batches_per_epoch=num_batches_per_epoch,
    trainer_kwargs=dict(max_epochs=epochs)
)

训练模型

predictor = estimator.train(
    training_data=train_dataset, 
    cache_data=True, 
    shuffle_buffer_length=1024
)

>>> INFO:pytorch_lightning.callbacks.model_summary:
      | Name  | Type         | Params
    ---------------------------------------
    0 | model | DLinearModel | 4.7 K 
    ---------------------------------------
    4.7 K     Trainable params
    0         Non-trainable params
    4.7 K     Total params
    0.019     Total estimated model params size (MB)

    Training: 0it [00:00, ?it/s]
    ...
    INFO:pytorch_lightning.utilities.rank_zero:Epoch 49, global step 5000: 'train_loss' was not in top 1
    INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=50` reached.

并在测试集上对其进行评估

from gluonts.evaluation import make_evaluation_predictions, Evaluator

forecast_it, ts_it = make_evaluation_predictions(
    dataset=dataset.test,
    predictor=predictor,
)

d_linear_forecasts = list(forecast_it)
d_linear_tss = list(ts_it)

evaluator = Evaluator()

agg_metrics, _ = evaluator(iter(d_linear_tss), iter(d_linear_forecasts))

因此,DLinear 模型的结果是

dlinear_mase = agg_metrics["MASE"]
print(f"DLinear MASE: {dlinear_mase:.3f}")

>>> DLinear MASE: 0.965

和以前一样,我们通过此辅助函数绘制了我们训练过的 DLinear 模型的预测图

def plot_gluonts(index):
    plt.plot(d_linear_tss[index][-4 * dataset.metadata.prediction_length:].to_timestamp(), label="target")
    d_linear_forecasts[index].plot(show_label=True,  color='g')
    plt.legend()
    plt.gcf().autofmt_xdate()
    plt.show()
plot_gluonts(4)

png

traffic 数据集在工作日和周末之间的传感器模式存在分布偏移。那么这里发生了什么?由于 DLinear 模型无法整合协变量,特别是任何日期时间特征,我们给它的上下文窗口没有足够的信息来判断预测是针对周末还是工作日。因此,模型将预测更常见的模式,即工作日,从而导致周末性能较差。当然,通过提供更大的上下文窗口,线性模型将能够找出每周模式,但数据中可能存在每月或每季度模式,这将需要越来越大的上下文。

结论

基于 Transformer 的模型与上述线性基线相比如何?我们不同模型的测试集 MASE 指标如下:

数据集 Transformer(单变量) Transformer(多变量) Informer(单变量) Informer(多变量) Autoformer(单变量) DLinear
交通 0.876 1.046 0.924 1.131 0.910 0.965

正如我们所观察到的,我们去年推出的普通Transformer在这里获得了最佳结果。其次,多变量模型通常差于单变量模型,原因在于难以估计跨序列相关性/关系。估计值带来的额外方差通常会损害最终的预测结果,或者模型会学习到虚假相关性。最近的论文,如CrossFormer(ICLR 23)和CARD,试图解决Transformer模型中的这一问题。多变量模型通常在大量数据上训练时表现良好。然而,与单变量模型相比,特别是在较小的开放数据集上,单变量模型往往能提供更好的指标。通过将线性模型与同等大小的单变量Transformer或其他任何神经网络单变量模型进行比较,通常会获得更好的性能。

总而言之,Transformer 模型在时间序列预测方面绝对远未过时!然而,大规模数据集的可用性对于最大限度地发挥其潜力至关重要。与计算机视觉和自然语言处理领域不同,时间序列领域缺乏可公开访问的大规模数据集。大多数现有时间序列预训练模型都在像 UCR 和 UEA 这样的档案中进行小样本训练,这些档案只包含几千甚至几百个样本。尽管这些基准数据集在时间序列社区的进步中发挥了重要作用,但其有限的样本量和缺乏通用性对深度学习模型的预训练构成了挑战。

因此,开发大规模、通用时间序列数据集(如计算机视觉中的 ImageNet)至关重要。创建此类数据集将极大地促进专门为时间序列分析设计的预训练模型的进一步研究,并提高预训练模型在时间序列预测中的适用性。

致谢

我们感谢 Lysandre DebutPedro Cuenca 在本项目中提供的富有洞察力的评论和帮助 ❤️。

社区

“一个简单的线性模型,虽然在某些情况下有优势,但与单变量设置中更复杂的模型(如 Transformer)相比,它无法整合协变量。”

我认为这个比较没有切中要害。这里您专门讨论的是日期相关特征。确实,我希望编码过去输入的神经网络架构(RNN、CNN、Transformer 等)能够学习针对当前预测任务优化的特定特征。正如您所说,DLinear 可用的协变量次优,它将受益于更大的窗口。

简而言之,Transformer 更擅长创建特征,但不一定更擅长整合协变量。
有没有一种公平的比较,其中 Transformer 和 DLinear 都使用具有适当特征工程的相同数据集?例如,您在基准测试中使用电力数据。DLinear 可以很容易地识别温度和太阳辐射等天气协变量之间的相关性。在这种情况下,Transformer 与 DLinear 等模型相比是否仍具有显著优势?

注册登录 发表评论