训练你的第一个决策Transformer
在上一篇文章中,我们宣布在 transformers 库中推出决策Transformer。这种**使用 Transformer 作为决策模型**的新技术越来越受欢迎。
所以今天,**你将学习如何从零开始训练你的第一个离线决策Transformer模型,让半猎豹奔跑起来。**我们将在 Google Colab 上直接训练它,你可以在这里找到它 👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb
听起来很刺激吧?我们开始吧!
什么是决策Transformer?
决策Transformer模型由**Chen L. 等人撰写的《决策Transformer:通过序列建模进行强化学习》**引入。它将强化学习抽象为**条件序列建模问题**。
主要思想是,我们不使用强化学习方法训练策略,例如拟合一个价值函数来告诉我们采取什么行动来最大化回报(累积奖励),而是**使用序列建模算法(Transformer)**,该算法在给定期望回报、过去状态和行动的情况下,将生成未来行动以实现此期望回报。它是一个自回归模型,以期望回报、过去状态和行动为条件,生成实现期望回报的未来行动。
**这是强化学习范式的彻底转变**,因为我们使用生成轨迹建模(建模状态、动作和奖励序列的联合分布)来取代传统的强化学习算法。这意味着在决策Transformer中,我们不最大化回报,而是生成一系列未来动作以实现期望回报。
过程如下:
- 我们将**最后 K 个时间步**与三个输入一起送入决策Transformer:
- 剩余回报(Return-to-go)
- 状态
- 行动
- 如果状态是向量,**令牌将通过线性层嵌入**;如果是帧,则通过 CNN 编码器嵌入。
- **输入由 GPT-2 模型处理**,该模型通过自回归建模预测未来行动。
决策Transformer架构。状态、动作和回报被送入特定模态的线性嵌入层,并添加了位置情景时间步编码。令牌被送入 GPT 架构,该架构使用因果自注意力掩码自回归地预测动作。图表来自 [1]。
有不同类型的决策Transformer,但今天,我们将训练一个离线决策Transformer,这意味着我们只使用从其他代理或人类演示中收集的数据。**代理不与环境交互**。如果你想了解更多关于离线和在线强化学习之间的区别,请查看这篇文章。
现在我们已经理解了离线决策Transformer背后的理论,**让我们看看如何在实践中训练一个。**
训练决策Transformer
在上一篇文章中,我们演示了如何使用 transformers 决策Transformer模型并从 🤗 hub 加载预训练权重。
在这一部分,我们将使用 🤗 Trainer 和自定义数据整理器从头开始训练决策Transformer模型,使用托管在 🤗 hub 上的离线强化学习数据集。你可以在此 Colab notebook中找到本教程的代码。
我们将执行离线强化学习,以学习 mujoco halfcheetah 环境中的以下行为。
加载数据集和构建自定义数据整理器
我们在 hub 上托管了许多离线强化学习数据集。今天我们将使用 hub 上托管的 halfcheetah “专家”数据集进行训练。
首先,我们需要从 🤗 datasets 包中导入 `load_dataset` 函数,并将数据集下载到我们的机器上。
from datasets import load_dataset
dataset = load_dataset("edbeeching/decision_transformer_gym_replay", "halfcheetah-expert-v2")
虽然 hub 上的大多数数据集都可以直接使用,但有时我们希望对数据集进行一些额外的处理或修改。在这种情况下,我们希望与作者的实现相匹配,即我们需要:
- 通过减去均值并除以标准差来归一化每个特征。
- 为每个轨迹预计算折扣回报。
- 将奖励和回报按 1000 的系数进行缩放。
- 扩充数据集采样分布,使其考虑专家代理轨迹的长度。
为了执行此数据集预处理,我们将使用自定义 🤗 Data Collator。
现在,让我们开始为离线强化学习构建自定义数据整理器。
@dataclass
class DecisionTransformerGymDataCollator:
return_tensors: str = "pt"
max_len: int = 20 #subsets of the episode we use for training
state_dim: int = 17 # size of state space
act_dim: int = 6 # size of action space
max_ep_len: int = 1000 # max episode length in the dataset
scale: float = 1000.0 # normalization of rewards/returns
state_mean: np.array = None # to store state means
state_std: np.array = None # to store state stds
p_sample: np.array = None # a distribution to take account trajectory lengths
n_traj: int = 0 # to store the number of trajectories in the dataset
def __init__(self, dataset) -> None:
self.act_dim = len(dataset[0]["actions"][0])
self.state_dim = len(dataset[0]["observations"][0])
self.dataset = dataset
# calculate dataset stats for normalization of states
states = []
traj_lens = []
for obs in dataset["observations"]:
states.extend(obs)
traj_lens.append(len(obs))
self.n_traj = len(traj_lens)
states = np.vstack(states)
self.state_mean, self.state_std = np.mean(states, axis=0), np.std(states, axis=0) + 1e-6
traj_lens = np.array(traj_lens)
self.p_sample = traj_lens / sum(traj_lens)
def _discount_cumsum(self, x, gamma):
discount_cumsum = np.zeros_like(x)
discount_cumsum[-1] = x[-1]
for t in reversed(range(x.shape[0] - 1)):
discount_cumsum[t] = x[t] + gamma * discount_cumsum[t + 1]
return discount_cumsum
def __call__(self, features):
batch_size = len(features)
# this is a bit of a hack to be able to sample of a non-uniform distribution
batch_inds = np.random.choice(
np.arange(self.n_traj),
size=batch_size,
replace=True,
p=self.p_sample, # reweights so we sample according to timesteps
)
# a batch of dataset features
s, a, r, d, rtg, timesteps, mask = [], [], [], [], [], [], []
for ind in batch_inds:
# for feature in features:
feature = self.dataset[int(ind)]
si = random.randint(0, len(feature["rewards"]) - 1)
# get sequences from dataset
s.append(np.array(feature["observations"][si : si + self.max_len]).reshape(1, -1, self.state_dim))
a.append(np.array(feature["actions"][si : si + self.max_len]).reshape(1, -1, self.act_dim))
r.append(np.array(feature["rewards"][si : si + self.max_len]).reshape(1, -1, 1))
d.append(np.array(feature["dones"][si : si + self.max_len]).reshape(1, -1))
timesteps.append(np.arange(si, si + s[-1].shape[1]).reshape(1, -1))
timesteps[-1][timesteps[-1] >= self.max_ep_len] = self.max_ep_len - 1 # padding cutoff
rtg.append(
self._discount_cumsum(np.array(feature["rewards"][si:]), gamma=1.0)[
: s[-1].shape[1] # TODO check the +1 removed here
].reshape(1, -1, 1)
)
if rtg[-1].shape[1] < s[-1].shape[1]:
print("if true")
rtg[-1] = np.concatenate([rtg[-1], np.zeros((1, 1, 1))], axis=1)
# padding and state + reward normalization
tlen = s[-1].shape[1]
s[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, self.state_dim)), s[-1]], axis=1)
s[-1] = (s[-1] - self.state_mean) / self.state_std
a[-1] = np.concatenate(
[np.ones((1, self.max_len - tlen, self.act_dim)) * -10.0, a[-1]],
axis=1,
)
r[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), r[-1]], axis=1)
d[-1] = np.concatenate([np.ones((1, self.max_len - tlen)) * 2, d[-1]], axis=1)
rtg[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), rtg[-1]], axis=1) / self.scale
timesteps[-1] = np.concatenate([np.zeros((1, self.max_len - tlen)), timesteps[-1]], axis=1)
mask.append(np.concatenate([np.zeros((1, self.max_len - tlen)), np.ones((1, tlen))], axis=1))
s = torch.from_numpy(np.concatenate(s, axis=0)).float()
a = torch.from_numpy(np.concatenate(a, axis=0)).float()
r = torch.from_numpy(np.concatenate(r, axis=0)).float()
d = torch.from_numpy(np.concatenate(d, axis=0))
rtg = torch.from_numpy(np.concatenate(rtg, axis=0)).float()
timesteps = torch.from_numpy(np.concatenate(timesteps, axis=0)).long()
mask = torch.from_numpy(np.concatenate(mask, axis=0)).float()
return {
"states": s,
"actions": a,
"rewards": r,
"returns_to_go": rtg,
"timesteps": timesteps,
"attention_mask": mask,
}
代码很多,简单来说,我们定义了一个类,它接收我们的数据集,执行所需的预处理,并返回给我们批量的**状态**、**动作**、**奖励**、**回报**、**时间步**和**掩码**。这些批量可以直接用于使用 🤗 transformers Trainer 训练决策Transformer模型。
使用 🤗 transformers Trainer 训练决策Transformer模型。
为了使用 🤗 Trainer 类训练模型,我们首先需要确保它返回的字典包含损失,在本例中是模型动作预测和目标之间的 L-2 范数。我们通过创建一个继承自决策Transformer模型的 TrainableDT 类来实现这一点。
class TrainableDT(DecisionTransformerModel):
def __init__(self, config):
super().__init__(config)
def forward(self, **kwargs):
output = super().forward(**kwargs)
# add the DT loss
action_preds = output[1]
action_targets = kwargs["actions"]
attention_mask = kwargs["attention_mask"]
act_dim = action_preds.shape[2]
action_preds = action_preds.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
action_targets = action_targets.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
loss = torch.mean((action_preds - action_targets) ** 2)
return {"loss": loss}
def original_forward(self, **kwargs):
return super().forward(**kwargs)
transformers Trainer 类需要许多参数,这些参数在 TrainingArguments 类中定义。我们使用与作者原始实现相同的超参数,但训练迭代次数较少。这在 Colab notebook 中训练大约需要 40 分钟,所以你可以泡杯咖啡或者阅读 🤗 Annotated Diffusion 博客文章,等待期间。作者训练了大约 3 小时,所以我们这里得到的结果不会像他们的那么好。
training_args = TrainingArguments(
output_dir="output/",
remove_unused_columns=False,
num_train_epochs=120,
per_device_train_batch_size=64,
learning_rate=1e-4,
weight_decay=1e-4,
warmup_ratio=0.1,
optim="adamw_torch",
max_grad_norm=0.25,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset["train"],
data_collator=collator,
)
trainer.train()
现在我们已经解释了决策Transformer的理论,Trainer,以及如何训练它。**你已经准备好从零开始训练你的第一个离线决策Transformer模型,让半猎豹奔跑起来** 👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb Colab 中包含了训练模型的可视化,以及如何将模型保存到 🤗 hub。
结论
这篇文章展示了如何在托管于 🤗 数据集上的离线强化学习数据集上训练决策Transformer。我们使用了 🤗 transformers Trainer 和自定义数据整理器。
除了决策Transformer,**我们还希望支持深度强化学习社区的更多用例和工具**。因此,我们非常期待听到您对决策Transformer模型的反馈,以及更普遍地,我们可以与您一起构建的任何对强化学习有用的工具。请随时**与我们联系**。
接下来是什么?
在接下来的几周和几个月里,**我们计划支持生态系统中的其他工具**
- 扩展我们的决策Transformer模型库,包括在线设置中训练或微调的模型 [2]
- 集成 sample-factory 版本 2.0
保持联系的最佳方式是**加入我们的 discord 服务器**,与我们和社区交流。
参考文献
[1] Chen, Lili, et al. "决策Transformer:通过序列建模进行强化学习。" 神经信息处理系统进展 34 (2021)。
[2] Zheng, Qinqing 和 Zhang, Amy 和 Grover, Aditya “在线决策Transformer” (arXiv 预印本, 2022)