训练你的第一个决策Transformer

发布于 2022 年 9 月 8 日
在 GitHub 上更新

上一篇文章中,我们宣布在 transformers 库中推出决策Transformer。这种**使用 Transformer 作为决策模型**的新技术越来越受欢迎。

所以今天,**你将学习如何从零开始训练你的第一个离线决策Transformer模型,让半猎豹奔跑起来。**我们将在 Google Colab 上直接训练它,你可以在这里找到它 👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb

*一个“专家”决策Transformer模型,使用离线强化学习在 Gym HalfCheetah 环境中学习。*

听起来很刺激吧?我们开始吧!

什么是决策Transformer?

决策Transformer模型由**Chen L. 等人撰写的《决策Transformer:通过序列建模进行强化学习》**引入。它将强化学习抽象为**条件序列建模问题**。

主要思想是,我们不使用强化学习方法训练策略,例如拟合一个价值函数来告诉我们采取什么行动来最大化回报(累积奖励),而是**使用序列建模算法(Transformer)**,该算法在给定期望回报、过去状态和行动的情况下,将生成未来行动以实现此期望回报。它是一个自回归模型,以期望回报、过去状态和行动为条件,生成实现期望回报的未来行动。

**这是强化学习范式的彻底转变**,因为我们使用生成轨迹建模(建模状态、动作和奖励序列的联合分布)来取代传统的强化学习算法。这意味着在决策Transformer中,我们不最大化回报,而是生成一系列未来动作以实现期望回报。

过程如下:

  1. 我们将**最后 K 个时间步**与三个输入一起送入决策Transformer:
    • 剩余回报(Return-to-go)
    • 状态
    • 行动
  2. 如果状态是向量,**令牌将通过线性层嵌入**;如果是帧,则通过 CNN 编码器嵌入。
  3. **输入由 GPT-2 模型处理**,该模型通过自回归建模预测未来行动。

https://huggingface.co/blog/assets/58_decision-transformers/dt-architecture.gif

决策Transformer架构。状态、动作和回报被送入特定模态的线性嵌入层,并添加了位置情景时间步编码。令牌被送入 GPT 架构,该架构使用因果自注意力掩码自回归地预测动作。图表来自 [1]。

有不同类型的决策Transformer,但今天,我们将训练一个离线决策Transformer,这意味着我们只使用从其他代理或人类演示中收集的数据。**代理不与环境交互**。如果你想了解更多关于离线和在线强化学习之间的区别,请查看这篇文章

现在我们已经理解了离线决策Transformer背后的理论,**让我们看看如何在实践中训练一个。**

训练决策Transformer

在上一篇文章中,我们演示了如何使用 transformers 决策Transformer模型并从 🤗 hub 加载预训练权重。

在这一部分,我们将使用 🤗 Trainer 和自定义数据整理器从头开始训练决策Transformer模型,使用托管在 🤗 hub 上的离线强化学习数据集。你可以在此 Colab notebook中找到本教程的代码。

我们将执行离线强化学习,以学习 mujoco halfcheetah 环境中的以下行为。

*一个“专家”决策Transformer模型,使用离线强化学习在 Gym HalfCheetah 环境中学习。*

加载数据集和构建自定义数据整理器

我们在 hub 上托管了许多离线强化学习数据集。今天我们将使用 hub 上托管的 halfcheetah “专家”数据集进行训练。

首先,我们需要从 🤗 datasets 包中导入 `load_dataset` 函数,并将数据集下载到我们的机器上。

from datasets import load_dataset
dataset = load_dataset("edbeeching/decision_transformer_gym_replay", "halfcheetah-expert-v2")

虽然 hub 上的大多数数据集都可以直接使用,但有时我们希望对数据集进行一些额外的处理或修改。在这种情况下,我们希望与作者的实现相匹配,即我们需要:

  • 通过减去均值并除以标准差来归一化每个特征。
  • 为每个轨迹预计算折扣回报。
  • 将奖励和回报按 1000 的系数进行缩放。
  • 扩充数据集采样分布,使其考虑专家代理轨迹的长度。

为了执行此数据集预处理,我们将使用自定义 🤗 Data Collator

现在,让我们开始为离线强化学习构建自定义数据整理器。

@dataclass
class DecisionTransformerGymDataCollator:
    return_tensors: str = "pt"
    max_len: int = 20 #subsets of the episode we use for training
    state_dim: int = 17  # size of state space
    act_dim: int = 6  # size of action space
    max_ep_len: int = 1000 # max episode length in the dataset
    scale: float = 1000.0  # normalization of rewards/returns
    state_mean: np.array = None  # to store state means
    state_std: np.array = None  # to store state stds
    p_sample: np.array = None  # a distribution to take account trajectory lengths
    n_traj: int = 0 # to store the number of trajectories in the dataset

    def __init__(self, dataset) -> None:
        self.act_dim = len(dataset[0]["actions"][0])
        self.state_dim = len(dataset[0]["observations"][0])
        self.dataset = dataset
        # calculate dataset stats for normalization of states
        states = []
        traj_lens = []
        for obs in dataset["observations"]:
            states.extend(obs)
            traj_lens.append(len(obs))
        self.n_traj = len(traj_lens)
        states = np.vstack(states)
        self.state_mean, self.state_std = np.mean(states, axis=0), np.std(states, axis=0) + 1e-6
        
        traj_lens = np.array(traj_lens)
        self.p_sample = traj_lens / sum(traj_lens)

    def _discount_cumsum(self, x, gamma):
        discount_cumsum = np.zeros_like(x)
        discount_cumsum[-1] = x[-1]
        for t in reversed(range(x.shape[0] - 1)):
            discount_cumsum[t] = x[t] + gamma * discount_cumsum[t + 1]
        return discount_cumsum

    def __call__(self, features):
        batch_size = len(features)
        # this is a bit of a hack to be able to sample of a non-uniform distribution
        batch_inds = np.random.choice(
            np.arange(self.n_traj),
            size=batch_size,
            replace=True,
            p=self.p_sample,  # reweights so we sample according to timesteps
        )
        # a batch of dataset features
        s, a, r, d, rtg, timesteps, mask = [], [], [], [], [], [], []
        
        for ind in batch_inds:
            # for feature in features:
            feature = self.dataset[int(ind)]
            si = random.randint(0, len(feature["rewards"]) - 1)

            # get sequences from dataset
            s.append(np.array(feature["observations"][si : si + self.max_len]).reshape(1, -1, self.state_dim))
            a.append(np.array(feature["actions"][si : si + self.max_len]).reshape(1, -1, self.act_dim))
            r.append(np.array(feature["rewards"][si : si + self.max_len]).reshape(1, -1, 1))

            d.append(np.array(feature["dones"][si : si + self.max_len]).reshape(1, -1))
            timesteps.append(np.arange(si, si + s[-1].shape[1]).reshape(1, -1))
            timesteps[-1][timesteps[-1] >= self.max_ep_len] = self.max_ep_len - 1  # padding cutoff
            rtg.append(
                self._discount_cumsum(np.array(feature["rewards"][si:]), gamma=1.0)[
                    : s[-1].shape[1]   # TODO check the +1 removed here
                ].reshape(1, -1, 1)
            )
            if rtg[-1].shape[1] < s[-1].shape[1]:
                print("if true")
                rtg[-1] = np.concatenate([rtg[-1], np.zeros((1, 1, 1))], axis=1)

            # padding and state + reward normalization
            tlen = s[-1].shape[1]
            s[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, self.state_dim)), s[-1]], axis=1)
            s[-1] = (s[-1] - self.state_mean) / self.state_std
            a[-1] = np.concatenate(
                [np.ones((1, self.max_len - tlen, self.act_dim)) * -10.0, a[-1]],
                axis=1,
            )
            r[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), r[-1]], axis=1)
            d[-1] = np.concatenate([np.ones((1, self.max_len - tlen)) * 2, d[-1]], axis=1)
            rtg[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), rtg[-1]], axis=1) / self.scale
            timesteps[-1] = np.concatenate([np.zeros((1, self.max_len - tlen)), timesteps[-1]], axis=1)
            mask.append(np.concatenate([np.zeros((1, self.max_len - tlen)), np.ones((1, tlen))], axis=1))

        s = torch.from_numpy(np.concatenate(s, axis=0)).float()
        a = torch.from_numpy(np.concatenate(a, axis=0)).float()
        r = torch.from_numpy(np.concatenate(r, axis=0)).float()
        d = torch.from_numpy(np.concatenate(d, axis=0))
        rtg = torch.from_numpy(np.concatenate(rtg, axis=0)).float()
        timesteps = torch.from_numpy(np.concatenate(timesteps, axis=0)).long()
        mask = torch.from_numpy(np.concatenate(mask, axis=0)).float()

        return {
            "states": s,
            "actions": a,
            "rewards": r,
            "returns_to_go": rtg,
            "timesteps": timesteps,
            "attention_mask": mask,
        }

代码很多,简单来说,我们定义了一个类,它接收我们的数据集,执行所需的预处理,并返回给我们批量的**状态**、**动作**、**奖励**、**回报**、**时间步**和**掩码**。这些批量可以直接用于使用 🤗 transformers Trainer 训练决策Transformer模型。

使用 🤗 transformers Trainer 训练决策Transformer模型。

为了使用 🤗 Trainer 类训练模型,我们首先需要确保它返回的字典包含损失,在本例中是模型动作预测和目标之间的 L-2 范数。我们通过创建一个继承自决策Transformer模型的 TrainableDT 类来实现这一点。

class TrainableDT(DecisionTransformerModel):
    def __init__(self, config):
        super().__init__(config)

    def forward(self, **kwargs):
        output = super().forward(**kwargs)
        # add the DT loss
        action_preds = output[1]
        action_targets = kwargs["actions"]
        attention_mask = kwargs["attention_mask"]
        act_dim = action_preds.shape[2]
        action_preds = action_preds.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
        action_targets = action_targets.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
        
        loss = torch.mean((action_preds - action_targets) ** 2)

        return {"loss": loss}

    def original_forward(self, **kwargs):
        return super().forward(**kwargs)

transformers Trainer 类需要许多参数,这些参数在 TrainingArguments 类中定义。我们使用与作者原始实现相同的超参数,但训练迭代次数较少。这在 Colab notebook 中训练大约需要 40 分钟,所以你可以泡杯咖啡或者阅读 🤗 Annotated Diffusion 博客文章,等待期间。作者训练了大约 3 小时,所以我们这里得到的结果不会像他们的那么好。

training_args = TrainingArguments(
    output_dir="output/",
    remove_unused_columns=False,
    num_train_epochs=120,
    per_device_train_batch_size=64,
    learning_rate=1e-4,
    weight_decay=1e-4,
    warmup_ratio=0.1,
    optim="adamw_torch",
    max_grad_norm=0.25,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    data_collator=collator,
)

trainer.train()

现在我们已经解释了决策Transformer的理论,Trainer,以及如何训练它。**你已经准备好从零开始训练你的第一个离线决策Transformer模型,让半猎豹奔跑起来** 👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb Colab 中包含了训练模型的可视化,以及如何将模型保存到 🤗 hub。

结论

这篇文章展示了如何在托管于 🤗 数据集上的离线强化学习数据集上训练决策Transformer。我们使用了 🤗 transformers Trainer 和自定义数据整理器。

除了决策Transformer,**我们还希望支持深度强化学习社区的更多用例和工具**。因此,我们非常期待听到您对决策Transformer模型的反馈,以及更普遍地,我们可以与您一起构建的任何对强化学习有用的工具。请随时**与我们联系**。

接下来是什么?

在接下来的几周和几个月里,**我们计划支持生态系统中的其他工具**

  • 扩展我们的决策Transformer模型库,包括在线设置中训练或微调的模型 [2]
  • 集成 sample-factory 版本 2.0

保持联系的最佳方式是**加入我们的 discord 服务器**,与我们和社区交流。

参考文献

[1] Chen, Lili, et al. "决策Transformer:通过序列建模进行强化学习。" 神经信息处理系统进展 34 (2021)。

[2] Zheng, Qinqing 和 Zhang, Amy 和 Grover, Aditya “在线决策Transformer” (arXiv 预印本, 2022)

社区

几条评论/问题

  • `print("if true")` 语句可能是遗留代码
  • 从整理器返回的许多数据未使用,为什么保留它?

有什么办法可以将此环境迁移到mujoco v4吗?我在让gym/gymnasium与mujoco或mujoco_py良好配合方面遇到困难。即使我正确安装了它,它也一直说模块未找到。

注册登录发表评论