使用 🤗 Transformers 进行概率时间序列预测

发布于 2022 年 12 月 1 日
在 GitHub 上更新
Open In Colab

简介

时间序列预测是一个重要的科学和商业问题,因此最近也出现了很多创新,除了经典方法外,还使用了基于深度学习的模型。像 ARIMA 这样的经典方法和新颖的深度学习方法之间有一个重要的区别。

概率预测

通常,经典方法是针对数据集中的每个时间序列单独进行拟合的。这些方法通常被称为“单一”或“局部”方法。然而,当某些应用需要处理大量时间序列时,训练一个对所有可用时间序列都适用的“全局”模型会更有益,这能使模型从许多不同来源中学习潜在表示。

一些经典方法是点值预测(也就是说,它们只为每个时间步输出一个单一值),模型通过最小化相对于真实数据的 L2 或 L1 类型的损失来进行训练。然而,由于预测通常用于某些现实世界的决策流程中,即使有人工参与,提供预测的不确定性也更有益。这也被称为“概率预测”,与“点预测”相对。这需要对一个概率分布进行建模,然后可以从中进行采样。

简而言之,我们希望训练的是**全局概率**模型,而不是局部点预测模型。深度学习非常适合这个任务,因为神经网络可以从多个相关的时间序列中学习表示,并对数据的不确定性进行建模。

在概率设定中,通常会学习某个选定的参数分布(如高斯分布或学生 t 分布)的未来参数,或者学习条件分位数函数,或者使用适用于时间序列设定的保形预测框架。方法的选择不影响建模方面,因此通常可以被视为另一个超参数。人们总是可以通过取经验均值或中位数将概率模型转换为点预测模型。

Time Series Transformer

在建模具有序列性质的时间序列数据方面,可以想象,研究人员已经提出了使用循环神经网络(RNN)如 LSTM 或 GRU、卷积网络(CNN)的模型,以及最近基于 Transformer 的方法,这些方法很自然地适用于时间序列预测的设定。

在这篇博客文章中,我们将利用原生的 Transformer 模型 (Vaswani et al., 2017)来完成**单变量**概率预测任务(即,单独预测每个时间序列的一维分布)。编码器-解码器 Transformer 是预测任务的自然选择,因为它很好地融入了多种归纳偏置。

首先,在推理时,使用编码器-解码器架构非常有帮助,因为我们通常希望根据一些已记录的数据来预测未来的几个步骤。这可以被看作是类似于文本生成任务,在给定一些上下文的情况下,我们采样下一个词元并将其传回解码器(也称为“自回归生成”)。类似地,在这里我们也可以根据给定的分布类型进行采样,以提供直到我们期望的预测范围的预测。这被称为祖先采样(Ancestral Sampling)。这里有一篇关于语言模型上下文中采样的优秀博客文章。

其次,Transformer 帮助我们训练可能包含数千个时间点的时间序列数据。由于注意力机制的时间和内存限制,一次性将时间序列的*所有*历史输入到模型中可能不可行。因此,可以考虑一个合适的上下文窗口,并在构建用于随机梯度下降(SGD)的批次时,从训练数据中采样这个窗口以及随后的预测长度大小的窗口。上下文大小的窗口可以传递给编码器,预测窗口可以传递给一个带有*因果掩码*的解码器。这意味着解码器在学习下一个值时只能看到之前的时间步。这等同于训练一个用于机器翻译的原生 Transformer 的方式,被称为“教师强制”(teacher forcing)。

与其它架构相比,Transformer 的另一个好处是我们可以将缺失值(在时间序列设定中很常见)作为额外的掩码加入到编码器或解码器中,并且仍然可以在不依赖填充或插值的情况下进行训练。这等同于像 BERT 和 GPT-2 这样的模型在 Transformers 库中的 `attention_mask`,用于在计算注意力矩阵时不包括填充词元。

Transformer 架构的一个缺点是上下文和预测窗口大小受限于原生 Transformer 的二次计算和内存需求,参见 Tay et al., 2020。此外,由于 Transformer 是一个强大的架构,它可能比其他方法更容易过拟合或学习到虚假的关联。

🤗 Transformers 库带有一个原生的概率时间序列 Transformer 模型,简称为 Time Series Transformer。在下面的章节中,我们将展示如何在一个自定义数据集上训练这样一个模型。

设置环境

首先,我们来安装必要的库:🤗 Transformers、🤗 Datasets、🤗 Evaluate、🤗 Accelerate 和 GluonTS

正如我们将要展示的,GluonTS 将用于转换数据以创建特征,以及创建适当的训练、验证和测试批次。

!pip install -q transformers

!pip install -q datasets

!pip install -q evaluate

!pip install -q accelerate

!pip install -q gluonts ujson

加载数据集

在这篇博客文章中,我们将使用 `tourism_monthly` 数据集,该数据集可在 Hugging Face Hub 上找到。这个数据集包含了澳大利亚 366 个地区每月的旅游量。

该数据集是 Monash 时间序列预测 存储库的一部分,该存储库收集了来自多个领域的时间序列数据集。它可以被视为时间序列预测的 GLUE 基准。

from datasets import load_dataset

dataset = load_dataset("monash_tsf", "tourism_monthly")

可以看出,数据集包含 3 个划分:训练集、验证集和测试集。

dataset

>>> DatasetDict({
        train: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        test: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        validation: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
    })

每个样本都包含几个键,其中 `start` 和 `target` 是最重要的。让我们看看数据集中的第一个时间序列。

train_example = dataset['train'][0]
train_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

`start` 仅表示时间序列的开始时间(以日期时间格式),而 `target` 包含时间序列的实际值。

`start` 将有助于为时间序列值添加与时间相关的特征,作为模型的额外输入(例如“一年中的月份”)。由于我们知道数据的频率是 `monthly`(每月),我们知道例如第二个值的时间戳是 `1979-02-01`,等等。

print(train_example['start'])
print(train_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5772.876953125]

验证集包含与训练集相同的数据,只是时间上延长了一个 `prediction_length` 的长度。这使我们能够将模型的预测与真实值进行验证。

测试集的数据比验证集又长一个 `prediction_length`(或者比训练集长若干个 `prediction_length`,用于在多个滚动窗口上进行测试)。

validation_example = dataset['validation'][0]
validation_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

初始值与相应的训练样本完全相同。

print(validation_example['start'])
print(validation_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5985.830078125]

然而,这个样本比训练样本多了 `prediction_length=24` 个额外的值。我们来验证一下。

freq = "1M"
prediction_length = 24

assert len(train_example["target"]) + prediction_length == len(
    validation_example["target"]
)

我们来可视化一下。

import matplotlib.pyplot as plt

figure, axes = plt.subplots()
axes.plot(train_example["target"], color="blue")
axes.plot(validation_example["target"], color="red", alpha=0.5)

plt.show()

png

让我们拆分数据

train_dataset = dataset["train"]
test_dataset = dataset["test"]

更新 `start` 为 `pd.Period`

我们要做的第一件事是使用数据的 `freq` 将每个时间序列的 `start` 特征转换为 pandas 的 `Period` 索引。

from functools import lru_cache

import pandas as pd
import numpy as np

@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
    return pd.Period(date, freq)

def transform_start_field(batch, freq):
    batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
    return batch

我们现在使用 `datasets` 的 `set_transform` 功能来实时地、原地完成这个操作。

from functools import partial

train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))

定义模型

接下来,让我们实例化一个模型。这个模型将从头开始训练,因此我们不会使用 `from_pretrained` 方法,而是从一个 `config` 文件中随机初始化模型。

我们为模型指定了几个额外的参数。

  • `prediction_length`(在我们的例子中是 `24` 个月):这是 Transformer 的解码器将要学习预测的时间范围;
  • `context_length`:如果没有指定 `context_length`,模型会将 `context_length`(编码器的输入)设置为等于 `prediction_length`;
  • 给定频率的 `lags`:这些指定了我们要“回看”多远,作为额外的特征。例如,对于 `Daily`(每日)频率,我们可能会考虑回看 `[1, 2, 7, 30, ...]` 天,换句话说,就是回看 1 天、2 天……;而对于 `Minute`(分钟)数据,我们可能会考虑 `[1, 30, 60, 60*24, ...]` 等等;
  • 时间特征的数量:在我们的例子中,这将是 `2`,因为我们将添加 `MonthOfYear`(年中的月份)和 `Age`(年龄)特征;
  • 静态类别特征的数量:在我们的例子中,这将只是 `1`,因为我们将添加一个单一的“时间序列ID”特征;
  • 基数:每个静态类别特征的值的数量,以列表形式表示。在我们的例子中,这将是 `[366]`,因为我们有 366 个不同的时间序列。
  • 嵌入维度:每个静态类别特征的嵌入维度,以列表形式表示。例如,`[3]` 意味着模型将为 `366` 个时间序列(地区)中的每一个学习一个大小为 `3` 的嵌入向量。

让我们使用 GluonTS 为给定频率(“monthly”)提供的默认滞后值。

from gluonts.time_feature import get_lags_for_frequency

lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)

>>> [1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 23, 24, 25, 35, 36, 37]

这意味着我们将在每个时间步回看最多 37 个月,作为额外的特征。

我们再看看 GluonTS 提供给我们的默认时间特征。

from gluonts.time_feature import time_features_from_frequency_str

time_features = time_features_from_frequency_str(freq)
print(time_features)

>>> [<function month_of_year at 0x7fa496d0ca70>]

在这种情况下,只有一个特征,即“一年中的月份”。这意味着对于每个时间步,我们会将月份作为一个标量值添加进去(例如,如果时间戳是“一月”,值为 `1`;如果是“二月”,值为 `2`,以此类推)。

现在我们拥有定义模型所需的一切。

from transformers import TimeSeriesTransformerConfig, TimeSeriesTransformerForPrediction

config = TimeSeriesTransformerConfig(
    prediction_length=prediction_length,
    # context length:
    context_length=prediction_length * 2,
    # lags coming from helper given the freq:
    lags_sequence=lags_sequence,
    # we'll add 2 time features ("month of year" and "age", see further):
    num_time_features=len(time_features) + 1,
    # we have a single static categorical feature, namely time series ID:
    num_static_categorical_features=1,
    # it has 366 possible values:
    cardinality=[len(train_dataset)],
    # the model will learn an embedding of size 2 for each of the 366 possible values:
    embedding_dimension=[2],
    
    # transformer params:
    encoder_layers=4,
    decoder_layers=4,
    d_model=32,
)

model = TimeSeriesTransformerForPrediction(config)

请注意,与 🤗 Transformers 库中的其他模型类似,`TimeSeriesTransformerModel` 对应的是没有顶部任何头的编码器-解码器 Transformer,而 `TimeSeriesTransformerForPrediction` 对应的是在 `TimeSeriesTransformerModel` 之上加了一个**分布头**。默认情况下,模型使用学生 t 分布(但这是可配置的)。

model.config.distribution_output

>>> student_t

这与用于自然语言处理(NLP)的 Transformer 有一个重要区别,后者的头通常由一个实现为 `nn.Linear` 层的固定类别分布组成。

定义转换

接下来,我们定义数据的转换,特别是用于创建时间特征(基于数据集或通用特征)的转换。

同样,我们将使用 GluonTS 库来完成此操作。我们定义一个转换 `Chain`(这有点类似于图像处理中的 `torchvision.transforms.Compose`)。它允许我们将多个转换组合成一个单一的流水线。

from gluonts.time_feature import (
    time_features_from_frequency_str,
    TimeFeature,
    get_lags_for_frequency,
)
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

下面的转换都附有注释,以解释它们的作用。总的来说,我们将遍历数据集中的各个时间序列,并添加/删除字段或特征。

from transformers import PretrainedConfig

def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)

    # a bit like torchvision.transforms.Compose
    return Chain(
        # step 1: remove static/dynamic fields if not specified
        [RemoveFields(field_names=remove_field_names)]
        # step 2: convert the data to NumPy (potentially not needed)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # we expect an extra dim for the multivariate case:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # step 3: handle the NaN's by filling in the target with zero
            # and return the mask (which is in the observed values)
            # true for observed values, false for nan's
            # the decoder uses this mask (no loss is incurred for unobserved values)
            # see loss_weights inside the xxxForPrediction model
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # step 4: add temporal features based on freq of the dataset
            # month of year in the case when freq="M"
            # these serve as positional encodings
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # step 5: add another temporal feature (just a single number)
            # tells the model where in its life the value of the time series is,
            # sort of a running counter
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # step 6: vertically stack all the temporal features into the key FEAT_TIME
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # step 7: rename to match HuggingFace names
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

定义 `InstanceSplitter`

为了进行训练/验证/测试,我们接下来创建一个 `InstanceSplitter`,它用于从数据集中采样窗口(因为,请记住,由于时间和内存限制,我们不能将整个历史值传递给 Transformer)。

实例分割器从数据中随机采样 `context_length` 大小和随后的 `prediction_length` 大小的窗口,并为相应窗口中 `time_series_fields` 内的任何时间键添加 `past_` 或 `future_` 前缀。实例分割器可以配置为三种不同的模式。

  1. mode="train":在这种模式下,我们从给定数据集(训练数据集)中随机采样上下文和预测长度窗口。
  2. mode="validation":在这种模式下,我们从给定数据集(用于回溯测试或验证似然计算)中采样最后一个上下文长度窗口和预测窗口。
  3. mode="test":在这种模式下,我们仅采样最后一个上下文长度窗口(用于预测用例)。
from gluonts.transform.sampler import InstanceSampler
from typing import Optional

def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

创建数据加载器

接下来,是时候创建 DataLoaders 了,它让我们能够获得成批的(输入,输出)对——或者换句话说,(`past_values`,`future_values`)。

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)

    # we initialize a Training instance
    instance_splitter = create_instance_splitter(config, "train")

    # the instance splitter will sample a window of
    # context length + lags + prediction length (from the 366 possible transformed time series)
    # randomly from within the target time series and return an iterator.
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(stream)
    
    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )
def create_backtest_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data)

    # we create a Validation Instance splitter which will sample the very last
    # context window seen during training only for the encoder.
    instance_sampler = create_instance_splitter(config, "validation")

    # we apply the transformations in train mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=True)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

为了完整性,我们提供了一个测试数据加载器的辅助函数,尽管我们在这里不会使用它。这在生产环境中很有用,当我们想要从给定时间序列的末尾开始预测时。因此,测试数据加载器将从提供的数据集中采样最后一个上下文窗口,并将其传递给模型。

def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)

    # We create a test Instance splitter to sample the very last
    # context window from the dataset provided.
    instance_sampler = create_instance_splitter(config, "test")

    # We apply the transformations in test mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )
train_dataloader = create_train_dataloader(
    config=config,
    freq=freq,
    data=train_dataset,
    batch_size=256,
    num_batches_per_epoch=100,
)

test_dataloader = create_backtest_dataloader(
    config=config,
    freq=freq,
    data=test_dataset,
    batch_size=64,
)

我们来看看第一个批次的数据。

batch = next(iter(train_dataloader))
for k, v in batch.items():
    print(k, v.shape, v.type())

>>> past_time_features torch.Size([256, 85, 2]) torch.FloatTensor
    past_values torch.Size([256, 85]) torch.FloatTensor
    past_observed_mask torch.Size([256, 85]) torch.FloatTensor
    future_time_features torch.Size([256, 24, 2]) torch.FloatTensor
    static_categorical_features torch.Size([256, 1]) torch.LongTensor
    future_values torch.Size([256, 24]) torch.FloatTensor
    future_observed_mask torch.Size([256, 24]) torch.FloatTensor

可以看到,我们没有像 NLP 模型那样向编码器提供 `input_ids` 和 `attention_mask`,而是提供了 `past_values`,以及 `past_observed_mask`、`past_time_features` 和 `static_categorical_features`。

解码器的输入包括 `future_values`、`future_observed_mask` 和 `future_time_features`。`future_values` 可以看作是 NLP 中 `decoder_input_ids` 的等价物。

有关每个参数的详细解释,请参阅文档

前向传播

让我们用刚刚创建的批次进行一次前向传播。

# perform forward pass
outputs = model(
    past_values=batch["past_values"],
    past_time_features=batch["past_time_features"],
    past_observed_mask=batch["past_observed_mask"],
    static_categorical_features=batch["static_categorical_features"]
    if config.num_static_categorical_features > 0
    else None,
    static_real_features=batch["static_real_features"]
    if config.num_static_real_features > 0
    else None,
    future_values=batch["future_values"],
    future_time_features=batch["future_time_features"],
    future_observed_mask=batch["future_observed_mask"],
    output_hidden_states=True,
)
print("Loss:", outputs.loss.item())

>>> Loss: 9.069628715515137

请注意,模型正在返回一个损失值。这是可能的,因为解码器会自动将 `future_values` 向右移动一个位置,以获得标签。这使得可以计算预测值和标签之间的损失。

另外,请注意解码器使用因果掩码来防止看到未来,因为它需要预测的值在 `future_values` 张量中。

训练模型

是时候训练模型了!我们将使用标准的 PyTorch 训练循环。

我们将在这里使用 🤗 Accelerate 库,它会自动将模型、优化器和数据加载器放置在适当的 `device` 上。

from accelerate import Accelerator
from torch.optim import AdamW

accelerator = Accelerator()
device = accelerator.device

model.to(device)
optimizer = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)

model, optimizer, train_dataloader = accelerator.prepare(
    model,
    optimizer,
    train_dataloader,
)

model.train()
for epoch in range(40):
    for idx, batch in enumerate(train_dataloader):
        optimizer.zero_grad()
        outputs = model(
            static_categorical_features=batch["static_categorical_features"].to(device)
            if config.num_static_categorical_features > 0
            else None,
            static_real_features=batch["static_real_features"].to(device)
            if config.num_static_real_features > 0
            else None,
            past_time_features=batch["past_time_features"].to(device),
            past_values=batch["past_values"].to(device),
            future_time_features=batch["future_time_features"].to(device),
            future_values=batch["future_values"].to(device),
            past_observed_mask=batch["past_observed_mask"].to(device),
            future_observed_mask=batch["future_observed_mask"].to(device),
        )
        loss = outputs.loss

        # Backpropagation
        accelerator.backward(loss)
        optimizer.step()

        if idx % 100 == 0:
            print(loss.item())

推理

在推理时,建议使用 `generate()` 方法进行自回归生成,类似于 NLP 模型。

预测过程包括从测试实例采样器获取数据,该采样器将从数据集中每个时间序列的最后一个 `context_length` 大小的窗口中采样值,并将其传递给模型。请注意,我们将 `future_time_features`(这些是预先知道的)传递给解码器。

模型将从预测的分布中自回归地采样一定数量的值,并将它们传回解码器以返回预测输出。

model.eval()

forecasts = []

for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts.append(outputs.sequences.cpu().numpy())

模型输出一个形状为(`batch_size`,`number of samples`,`prediction length`)的张量。

在这种情况下,我们为接下来的 `24` 个月获得了 `100` 个可能的值(对于批次中大小为 `64` 的每个样本)。

forecasts[0].shape

>>> (64, 100, 24)

我们将它们垂直堆叠,以获得测试数据集中所有时间序列的预测。

forecasts = np.vstack(forecasts)
print(forecasts.shape)

>>> (366, 100, 24)

我们可以根据测试集中存在的样本外真实值来评估最终的预测结果。我们将使用 MASEsMAPE 指标,并为数据集中的每个时间序列计算这些指标。

from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")

forecast_median = np.median(forecasts, 1)

mase_metrics = []
smape_metrics = []
for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
        training=np.array(training_data), 
        periodicity=get_seasonality(freq))
    mase_metrics.append(mase["mase"])
    
    smape = smape_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
    )
    smape_metrics.append(smape["smape"])
print(f"MASE: {np.mean(mase_metrics)}")

>>> MASE: 1.2564196892177717

print(f"sMAPE: {np.mean(smape_metrics)}")

>>> sMAPE: 0.1609541520852549

我们还可以绘制数据集中每个时间序列的单个指标,并观察到少数几个时间序列对最终测试指标的贡献很大。

plt.scatter(mase_metrics, smape_metrics, alpha=0.3)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

png

为了绘制任何时间序列相对于真实测试数据的预测图,我们定义了以下辅助函数。

import matplotlib.dates as mdates

def plot(ts_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=test_dataset[ts_index][FieldName.START],
        periods=len(test_dataset[ts_index][FieldName.TARGET]),
        freq=freq,
    ).to_timestamp()

    # Major ticks every half year, minor ticks every month,
    ax.xaxis.set_major_locator(mdates.MonthLocator(bymonth=(1, 7)))
    ax.xaxis.set_minor_locator(mdates.MonthLocator())

    ax.plot(
        index[-2*prediction_length:], 
        test_dataset[ts_index]["target"][-2*prediction_length:],
        label="actual",
    )

    plt.plot(
        index[-prediction_length:], 
        np.median(forecasts[ts_index], axis=0),
        label="median",
    )
    
    plt.fill_between(
        index[-prediction_length:],
        forecasts[ts_index].mean(0) - forecasts[ts_index].std(axis=0), 
        forecasts[ts_index].mean(0) + forecasts[ts_index].std(axis=0), 
        alpha=0.3, 
        interpolate=True,
        label="+/- 1-std",
    )
    plt.legend()
    plt.show()

例如:

plot(334)

png

我们如何与其他模型进行比较?Monash 时间序列存储库有一个测试集 MASE 指标的比较表,我们可以补充我们的结果。

数据集 SES Theta TBATS ETS (DHR-)ARIMA PR CatBoost FFNN DeepAR N-BEATS WaveNet **Transformer** (我们的)
Tourism Monthly 3.306 1.649 1.751 1.526 1.589 1.678 1.699 1.582 1.409 1.574 1.482 1.256

请注意,使用我们的模型,我们击败了所有已报告的其他模型(另见相应论文中的表 2),而且我们没有进行任何超参数调整。我们只是将 Transformer 训练了 40 个周期。

当然,我们必须小心,不能仅仅宣称用神经网络在时间序列上取得了最先进的结果,因为似乎“通常你只需要 XGBoost”。我们只是非常好奇,想看看神经网络能带我们走多远,以及 Transformer 在这个领域是否会有用。这个特定的数据集似乎表明,这绝对值得探索。

后续步骤

我们鼓励读者使用 Hub 上的其他时间序列数据集来尝试这个 notebook,并替换适当的频率和预测长度参数。对于您自己的数据集,您需要将其转换为 GluonTS 使用的约定,其文档这里有很好的解释。我们还准备了一个示例 notebook,向您展示如何将您的数据集转换为 🤗 datasets 格式,点此查看

时间序列研究者们会知道,将基于 Transformer 的模型应用于时间序列问题已经引起了广泛的兴趣。原生 Transformer 只是众多基于注意力的模型之一,因此有必要向库中添加更多的模型。

目前,没有什么能阻止我们对多变量时间序列进行建模,但是为此需要用一个多变量分布头来实例化模型。目前支持对角独立的分布,其他多变量分布将会被添加。敬请期待未来包含教程的博客文章。

路线图上的另一件事是时间序列分类。这需要在库中添加一个带有分类头的时间序列模型,例如用于异常检测任务。

当前模型假设时间序列值伴随着日期时间信息,但这对于现实世界中的每个时间序列可能并非如此。例如,可以看看像 WOODS 的神经科学数据集。因此,需要泛化当前模型,使整个流水线中的某些输入成为可选的。

最后,NLP/视觉领域从大型预训练模型中获益匪浅,而据我们所知,时间序列领域并非如此。基于 Transformer 的模型似乎是追求这一研究方向的明显选择,我们迫不及待地想看看研究人员和实践者会拿出什么样的成果!

社区

注册登录 发表评论