深度强化学习课程文档

动手实践

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

动手实践

Ask a Question Open In Colab

既然我们已经学习了 Reinforce 背后的理论,你现在准备好使用 PyTorch 编写你的 Reinforce 智能体代码了。你将使用 CartPole-v1 和 PixelCopter 来测试其鲁棒性。

然后你将能够迭代并改进此实现,以适应更高级的环境。

Environments

为了验证此动手实践以获得认证,你需要将你训练的模型推送到 Hub,并且

  • Cartpole-v1 的结果需获得 >= 350
  • PixelCopter 的结果需获得 >= 5。

要查找你的结果,请转到排行榜并找到你的模型,结果 = mean_reward - reward 的标准差如果你在排行榜上看不到你的模型,请转到排行榜页面的底部,然后单击刷新按钮

如果你找不到你的模型,请转到页面底部并单击刷新按钮。

有关认证过程的更多信息,请查看此部分 👉 https://huggingface.co/deep-rl-course/en/unit0/introduction#certification-process

你可以在这里查看你的进度 👉 https://huggingface.co/spaces/ThomasSimonini/Check-my-progress-Deep-RL-Course

要开始动手实践,请单击“在 Colab 中打开”按钮 👇

Open In Colab

我们强烈建议学生使用 Google Colab 进行动手练习,而不是在他们的个人计算机上运行。

通过使用 Google Colab,你可以专注于学习和实验,而无需担心设置环境的技术方面

单元 4:使用 PyTorch 编写你的第一个深度强化学习算法:Reinforce。并测试其鲁棒性 💪

thumbnail

在本笔记本中,你将从头开始编写你的第一个深度强化学习算法:Reinforce(也称为蒙特卡洛策略梯度)。

Reinforce 是一种基于策略的方法:一种深度强化学习算法,它尝试直接优化策略,而无需使用动作值函数

更准确地说,Reinforce 是一种策略梯度方法,是基于策略的方法的一个子类,旨在通过使用梯度上升来估计最优策略的权重,从而直接优化策略

为了测试其鲁棒性,我们将在 2 个不同的简单环境中对其进行训练

  • Cartpole-v1
  • PixelcopterEnv

⬇️ 这是 你将在本笔记本末尾实现的目标示例。 ⬇️

Environments

🎮 环境:

📚 RL 库:

  • Python
  • PyTorch

我们一直在努力改进我们的教程,所以如果你在本笔记本中发现任何问题,请在 GitHub Repo 上打开一个 issue

本笔记本的目标 🏆

在本笔记本结束时,你将

  • 能够使用 PyTorch 从头开始编写 Reinforce 算法。
  • 能够使用简单的环境测试你的智能体的鲁棒性。
  • 能够将你训练的智能体推送到 Hub,并附带精美的视频回放和评估分数 🔥。

先决条件 🏗️

在深入学习笔记本之前,你需要

🔲 📚 学习单元 4 阅读策略梯度

让我们从头开始编写 Reinforce 算法 🔥

一些建议 💡

最好在你的 Google Drive 副本中运行此 colab,这样如果超时,你仍然可以在你的 Google Drive 上保存笔记本,而无需从头开始填写所有内容。

为此,你可以执行 Ctrl + S文件 > 在 Google Drive 中保存副本。

设置 GPU 💪

  • 为了加速智能体的训练,我们将使用 GPU。为此,请转到 运行时 > 更改运行时类型
GPU Step 1
  • 硬件加速器 > GPU
GPU Step 2

创建虚拟显示器 🖥

在本笔记本中,我们将需要生成一个回放视频。为此,使用 colab,我们需要有一个虚拟屏幕才能渲染环境(从而记录帧)。

以下单元将安装库并创建和运行虚拟屏幕 🖥

%%capture
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip install pyvirtualdisplay
!pip install pyglet==1.5.1
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

安装依赖项 🔽

第一步是安装依赖项。我们将安装多个依赖项

  • gym
  • gym-games:使用 PyGame 制作的额外 gym 环境。
  • huggingface_hub:Hub 作为一个中心位置,任何人都可以共享和探索模型和数据集。它具有版本控制、指标、可视化和其他功能,使你能够轻松地与他人协作。

你可能想知道为什么我们安装 gym 而不是 gymnasium,gym 的更新版本?因为我们正在使用的 gym-games 尚未更新到 gymnasium

你将在此处遇到的差异

  • gym 中,我们没有 terminatedtruncated,只有 done
  • gym 中,使用 env.step() 返回 state, reward, done, info

你可以在此处了解有关 Gym 和 Gymnasium 之间差异的更多信息 👉 https://gymnasium.org.cn/content/migration-guide/

你可以在此处查看所有可用的 Reinforce 模型 👉 https://huggingface.co/models?other=reinforce

你可以在此处找到所有深度强化学习模型 👉 https://huggingface.co/models?pipeline_tag=reinforcement-learning

!pip install -r https://raw.githubusercontent.com/huggingface/deep-rl-class/main/notebooks/unit4/requirements-unit4.txt

导入包 📦

除了导入已安装的库外,我们还导入了

  • imageio:一个将帮助我们生成回放视频的库
import numpy as np

from collections import deque

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Gym
import gym
import gym_pygame

# Hugging Face Hub
from huggingface_hub import notebook_login # To log to our Hugging Face account to be able to upload models to the Hub.
import imageio

检查我们是否拥有 GPU

  • 让我们检查一下我们是否拥有 GPU
  • 如果是这种情况,你应该看到 device:cuda0
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

我们现在准备好实现我们的 Reinforce 算法了 🔥

第一个智能体:玩 CartPole-v1 🤖

创建 CartPole 环境并了解其工作原理

环境 🎮

为什么我们使用像 CartPole-v1 这样的简单环境?

正如 强化学习技巧和窍门 中解释的那样,当你从头开始实现你的智能体时,你需要确保它工作正常,并在深入研究之前在简单的环境中找到错误,因为在简单的环境中查找错误会容易得多。

尝试在玩具问题上获得一些“生命迹象”

通过使其在越来越难的环境中运行来验证实现(你可以将结果与 RL zoo 进行比较)。你通常需要为该步骤运行超参数优化。

CartPole-v1 环境

一根杆通过一个未驱动的关节连接到一个可以在无摩擦轨道上移动的小车。钟摆直立放置在小车上,目标是通过在小车上施加左右方向的力来平衡杆。

因此,我们从 CartPole-v1 开始。目标是向左或向右推动小车,使杆保持平衡。

如果发生以下情况,则剧集结束

  • 杆的角度大于 ±12°
  • 小车的位置大于 ±2.4
  • 剧集长度大于 500

每当杆保持平衡时,我们都会获得 +1 的奖励 💰。

env_id = "CartPole-v1"
# Create the env
env = gym.make(env_id)

# Create the evaluation env
eval_env = gym.make(env_id)

# Get the state space and action space
s_size = env.observation_space.shape[0]
a_size = env.action_space.n
print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample())  # Get a random observation
print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample())  # Take a random action

让我们构建 Reinforce 架构

此实现基于三个实现

Reinforce

所以我们想要

  • 两个全连接层(fc1 和 fc2)。
  • 使用 ReLU 作为 fc1 的激活函数
  • 使用 Softmax 输出动作上的概率分布
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        # Create two fully connected layers



    def forward(self, x):
        # Define the forward pass
        # state goes to fc1 then we apply ReLU activation function

        # fc1 outputs goes to fc2

        # We output the softmax

    def act(self, state):
        """
        Given a state, take action
        """
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = np.argmax(m)
        return action.item(), m.log_prob(action)

解决方案

class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = np.argmax(m)
        return action.item(), m.log_prob(action)

我犯了一个错误,你能猜到在哪里吗?

  • 为了找出答案,让我们进行一次前向传递
debug_policy = Policy(s_size, a_size, 64).to(device)
debug_policy.act(env.reset())
  • 在这里我们看到错误提示 ValueError: The value argument to log_prob must be a Tensor

  • 这意味着 m.log_prob(action) 中的 action 必须是 Tensor 但它不是。

  • 你知道为什么吗?检查 act 函数并尝试看看为什么它不起作用。

建议 💡:此实现中存在问题。请记住,对于 act 函数,我们希望从动作的概率分布中采样一个动作

(真正的)解决方案

class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

通过使用 CartPole,调试变得更容易,因为我们知道错误来自我们的集成,而不是来自我们简单的环境

  • 由于我们希望从动作的概率分布中采样一个动作,因此我们不能使用 action = np.argmax(m),因为它总是输出具有最高概率的动作。

  • 我们需要将其替换为 action = m.sample(),这将从概率分布 P(.|s) 中采样一个动作

让我们构建 Reinforce 训练算法

这是 Reinforce 算法伪代码

Policy gradient pseudocode
  • 当我们计算回报 Gt(第 6 行)时,我们看到我们计算的是从时间步 t 开始的折扣奖励总和。

  • 为什么?因为我们的策略应该只根据后果来强化动作:因此在采取行动之前获得的奖励是无用的(因为它们不是因为该动作),只有在行动之后发生的奖励才重要

  • 在编写代码之前,您应该阅读本节 不要让过去分散你的注意力,其中解释了为什么我们使用 reward-to-go 策略梯度。

我们使用了 Chris1nexus 编写的有趣技术来有效地计算每个时间步的回报。注释解释了该过程。也不要犹豫 查看 PR 解释。但总的来说,这个想法是有效地计算每个时间步的回报

您可能会问的第二个问题是,我们为什么要最小化损失?我们之前不是谈论梯度上升,而不是梯度下降吗?

  • 我们想要最大化我们的效用函数 $J(\theta)$,但在 PyTorch 和 TensorFlow 中,最好是最小化目标函数。
    • 假设我们想在某个时间步强化动作 3。在训练之前,此动作 P 为 0.25。
    • 所以我们想要修改thetatheta 使得πθ(a3s;θ)>0.25\pi_\theta(a_3|s; \theta) > 0.25
    • 因为所有 P 的总和必须为 1,所以最大化piθ(a3s;θ)pi_\theta(a_3|s; \theta)最小化其他动作的概率。
    • 因此我们应该告诉 PyTorch 最小化 1πθ(a3s;θ)1 - \pi_\theta(a_3|s; \theta)
    • πθ(a3s;θ)\pi_\theta(a_3|s; \theta)接近 1 时,此损失函数接近 0。
    • 因此,我们鼓励梯度最大化πθ(a3s;θ)\pi_\theta(a_3|s; \theta)
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    # Help us to calculate the score during the training
    scores_deque = deque(maxlen=100)
    scores = []
    # Line 3 of pseudocode
    for i_episode in range(1, n_training_episodes+1):
        saved_log_probs = []
        rewards = []
        state = # TODO: reset the environment
        # Line 4 of pseudocode
        for t in range(max_t):
            action, log_prob = # TODO get the action
            saved_log_probs.append(log_prob)
            state, reward, done, _ = # TODO: take an env step
            rewards.append(reward)
            if done:
                break
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        # Line 6 of pseudocode: calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)
        # Compute the discounted returns at each timestep,
        # as the sum of the gamma-discounted return at time t (G_t) + the reward at time t

        # In O(N) time, where N is the number of time steps
        # (this definition of the discounted return G_t follows the definition of this quantity
        # shown at page 44 of Sutton&Barto 2017 2nd draft)
        # G_t = r_(t+1) + r_(t+2) + ...

        # Given this formulation, the returns at each timestep t can be computed
        # by re-using the computed future returns G_(t+1) to compute the current return G_t
        # G_t = r_(t+1) + gamma*G_(t+1)
        # G_(t-1) = r_t + gamma* G_t
        # (this follows a dynamic programming approach, with which we memorize solutions in order
        # to avoid computing them multiple times)

        # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft)
        # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ...


        ## Given the above, we calculate the returns at timestep t as:
        #               gamma[t] * return[t] + reward[t]
        #
        ## We compute this starting from the last timestep to the first, in order
        ## to employ the formula presented above and avoid redundant computations that would be needed
        ## if we were to do it from first to last.

        ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps
        ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1)
        ## a normal python list would instead require O(N) to do this.
        for t in range(n_steps)[::-1]:
            disc_return_t = (returns[0] if len(returns)>0 else 0)
            returns.appendleft(    ) # TODO: complete here

        ## standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()

        ## eps is the smallest representable float, which is
        # added to the standard deviation of the returns to avoid numerical instabilities
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        # Line 7:
        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        # Line 8: PyTorch prefers gradient descent
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))

    return scores

解决方案

def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    # Help us to calculate the score during the training
    scores_deque = deque(maxlen=100)
    scores = []
    # Line 3 of pseudocode
    for i_episode in range(1, n_training_episodes + 1):
        saved_log_probs = []
        rewards = []
        state = env.reset()
        # Line 4 of pseudocode
        for t in range(max_t):
            action, log_prob = policy.act(state)
            saved_log_probs.append(log_prob)
            state, reward, done, _ = env.step(action)
            rewards.append(reward)
            if done:
                break
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        # Line 6 of pseudocode: calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)
        # Compute the discounted returns at each timestep,
        # as
        #      the sum of the gamma-discounted return at time t (G_t) + the reward at time t
        #
        # In O(N) time, where N is the number of time steps
        # (this definition of the discounted return G_t follows the definition of this quantity
        # shown at page 44 of Sutton&Barto 2017 2nd draft)
        # G_t = r_(t+1) + r_(t+2) + ...

        # Given this formulation, the returns at each timestep t can be computed
        # by re-using the computed future returns G_(t+1) to compute the current return G_t
        # G_t = r_(t+1) + gamma*G_(t+1)
        # G_(t-1) = r_t + gamma* G_t
        # (this follows a dynamic programming approach, with which we memorize solutions in order
        # to avoid computing them multiple times)

        # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft)
        # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ...

        ## Given the above, we calculate the returns at timestep t as:
        #               gamma[t] * return[t] + reward[t]
        #
        ## We compute this starting from the last timestep to the first, in order
        ## to employ the formula presented above and avoid redundant computations that would be needed
        ## if we were to do it from first to last.

        ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps
        ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1)
        ## a normal python list would instead require O(N) to do this.
        for t in range(n_steps)[::-1]:
            disc_return_t = returns[0] if len(returns) > 0 else 0
            returns.appendleft(gamma * disc_return_t + rewards[t])

        ## standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()
        ## eps is the smallest representable float, which is
        # added to the standard deviation of the returns to avoid numerical instabilities
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        # Line 7:
        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        # Line 8: PyTorch prefers gradient descent
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print("Episode {}\tAverage Score: {:.2f}".format(i_episode, np.mean(scores_deque)))

    return scores

训练它

  • 我们现在准备好训练我们的代理。
  • 但首先,我们定义一个包含所有训练超参数的变量。
  • 您可以更改训练参数(并且应该 😉)
cartpole_hyperparameters = {
    "h_size": 16,
    "n_training_episodes": 1000,
    "n_evaluation_episodes": 10,
    "max_t": 1000,
    "gamma": 1.0,
    "lr": 1e-2,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}
# Create policy and place it to the device
cartpole_policy = Policy(
    cartpole_hyperparameters["state_space"],
    cartpole_hyperparameters["action_space"],
    cartpole_hyperparameters["h_size"],
).to(device)
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"])
scores = reinforce(
    cartpole_policy,
    cartpole_optimizer,
    cartpole_hyperparameters["n_training_episodes"],
    cartpole_hyperparameters["max_t"],
    cartpole_hyperparameters["gamma"],
    100,
)

定义评估方法 📝

  • 在这里,我们定义了评估方法,我们将使用该方法来测试我们的 Reinforce 代理。
def evaluate_agent(env, max_steps, n_eval_episodes, policy):
    """
    Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
    :param env: The evaluation environment
    :param n_eval_episodes: Number of episode to evaluate the agent
    :param policy: The Reinforce agent
    """
    episode_rewards = []
    for episode in range(n_eval_episodes):
        state = env.reset()
        step = 0
        done = False
        total_rewards_ep = 0

        for step in range(max_steps):
            action, _ = policy.act(state)
            new_state, reward, done, info = env.step(action)
            total_rewards_ep += reward

            if done:
                break
            state = new_state
        episode_rewards.append(total_rewards_ep)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)

    return mean_reward, std_reward

评估我们的代理 📈

evaluate_agent(
    eval_env, cartpole_hyperparameters["max_t"], cartpole_hyperparameters["n_evaluation_episodes"], cartpole_policy
)

在 Hub 上发布我们训练好的模型 🔥

既然我们看到在训练后获得了良好的结果,我们就可以通过一行代码将我们训练好的模型发布到 Hub 🤗 上。

这是一个模型卡片的示例

推送到 Hub

请勿修改此代码

from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.repocard import metadata_eval_result, metadata_save

from pathlib import Path
import datetime
import json
import imageio

import tempfile

import os
def record_video(env, policy, out_directory, fps=30):
    """
    Generate a replay video of the agent
    :param env
    :param Qtable: Qtable of our agent
    :param out_directory
    :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
    """
    images = []
    done = False
    state = env.reset()
    img = env.render(mode="rgb_array")
    images.append(img)
    while not done:
        # Take the action (index) that have the maximum expected future reward given that state
        action, _ = policy.act(state)
        state, reward, done, info = env.step(action)  # We directly put next_state = state for recording logic
        img = env.render(mode="rgb_array")
        images.append(img)
    imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)
def push_to_hub(repo_id,
                model,
                hyperparameters,
                eval_env,
                video_fps=30
                ):
  """
  Evaluate, Generate a video and Upload a model to Hugging Face Hub.
  This method does the complete pipeline:
  - It evaluates the model
  - It generates the model card
  - It generates a replay video of the agent
  - It pushes everything to the Hub

  :param repo_id: repo_id: id of the model repository from the Hugging Face Hub
  :param model: the pytorch model we want to save
  :param hyperparameters: training hyperparameters
  :param eval_env: evaluation environment
  :param video_fps: how many frame per seconds to record our video replay
  """

  _, repo_name = repo_id.split("/")
  api = HfApi()

  # Step 1: Create the repo
  repo_url = api.create_repo(
        repo_id=repo_id,
        exist_ok=True,
  )

  with tempfile.TemporaryDirectory() as tmpdirname:
    local_directory = Path(tmpdirname)

    # Step 2: Save the model
    torch.save(model, local_directory / "model.pt")

    # Step 3: Save the hyperparameters to JSON
    with open(local_directory / "hyperparameters.json", "w") as outfile:
      json.dump(hyperparameters, outfile)

    # Step 4: Evaluate the model and build JSON
    mean_reward, std_reward = evaluate_agent(eval_env,
                                            hyperparameters["max_t"],
                                            hyperparameters["n_evaluation_episodes"],
                                            model)
    # Get datetime
    eval_datetime = datetime.datetime.now()
    eval_form_datetime = eval_datetime.isoformat()

    evaluate_data = {
          "env_id": hyperparameters["env_id"],
          "mean_reward": mean_reward,
          "n_evaluation_episodes": hyperparameters["n_evaluation_episodes"],
          "eval_datetime": eval_form_datetime,
    }

    # Write a JSON file
    with open(local_directory / "results.json", "w") as outfile:
        json.dump(evaluate_data, outfile)

    # Step 5: Create the model card
    env_name = hyperparameters["env_id"]

    metadata = {}
    metadata["tags"] = [
          env_name,
          "reinforce",
          "reinforcement-learning",
          "custom-implementation",
          "deep-rl-class"
      ]

    # Add metrics
    eval = metadata_eval_result(
        model_pretty_name=repo_name,
        task_pretty_name="reinforcement-learning",
        task_id="reinforcement-learning",
        metrics_pretty_name="mean_reward",
        metrics_id="mean_reward",
        metrics_value=f"{mean_reward:.2f} +/- {std_reward:.2f}",
        dataset_pretty_name=env_name,
        dataset_id=env_name,
      )

    # Merges both dictionaries
    metadata = {**metadata, **eval}

    model_card = f"""
  # **Reinforce** Agent playing **{env_id}**
  This is a trained model of a **Reinforce** agent playing **{env_id}** .
  To learn to use this model and train yours check Unit 4 of the Deep Reinforcement Learning Course: https://huggingface.co/deep-rl-course/unit4/introduction
  """

    readme_path = local_directory / "README.md"
    readme = ""
    if readme_path.exists():
        with readme_path.open("r", encoding="utf8") as f:
          readme = f.read()
    else:
      readme = model_card

    with readme_path.open("w", encoding="utf-8") as f:
      f.write(readme)

    # Save our metrics to Readme metadata
    metadata_save(readme_path, metadata)

    # Step 6: Record a video
    video_path =  local_directory / "replay.mp4"
    record_video(env, model, video_path, video_fps)

    # Step 7. Push everything to the Hub
    api.upload_folder(
          repo_id=repo_id,
          folder_path=local_directory,
          path_in_repo=".",
    )

    print(f"Your model is pushed to the Hub. You can view your model here: {repo_url}")

通过使用 push_to_hub您可以评估、录制回放、生成代理的模型卡片并将其推送到 Hub

这样一来

为了能够与社区分享您的模型,还需要遵循三个步骤

1️⃣ (如果尚未完成)创建 HF 帐户 ➡ https://huggingface.co/join

2️⃣ 登录,然后您需要存储来自 Hugging Face 网站的身份验证令牌。

Create HF Token
notebook_login()

如果您不想使用 Google Colab 或 Jupyter Notebook,则需要改用此命令:huggingface-cli login(或 login

3️⃣ 我们现在准备好使用 package_to_hub() 函数将我们训练好的代理推送到 🤗 Hub 🔥

repo_id = ""  # TODO Define your repo id {username/Reinforce-{model-id}}
push_to_hub(
    repo_id,
    cartpole_policy,  # The model we want to save
    cartpole_hyperparameters,  # Hyperparameters
    eval_env,  # Evaluation environment
    video_fps=30
)

既然我们测试了我们实现的稳健性,让我们尝试一个更复杂的环境:PixelCopter 🚁

第二个代理:PixelCopter 🚁

研究 PixelCopter 环境 👀

env_id = "Pixelcopter-PLE-v0"
env = gym.make(env_id)
eval_env = gym.make(env_id)
s_size = env.observation_space.shape[0]
a_size = env.action_space.n
print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample())  # Get a random observation
print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample())  # Take a random action

观察空间 (7) 👀

  • 玩家 y 位置
  • 玩家速度
  • 玩家到地板的距离
  • 玩家到天花板的距离
  • 下一个障碍物 x 距离玩家的距离
  • 下一个障碍物顶部 y 位置
  • 下一个障碍物底部 y 位置

动作空间 (2) 🎮

  • 向上(按加速器)
  • 无所事事(不按加速器)

奖励函数 💰

  • 对于通过的每个垂直障碍物,它获得 +1 的正奖励。每次达到终止状态时,它都会收到 -1 的负奖励。

定义新的策略 🧠

  • 我们需要更深层的神经网络,因为环境更加复杂
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        # Define the three layers here

    def forward(self, x):
        # Define the forward process here
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

解决方案

class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, h_size * 2)
        self.fc3 = nn.Linear(h_size * 2, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

定义超参数 ⚙️

  • 因为这个环境更加复杂。
  • 特别是对于隐藏层大小,我们需要更多的神经元。
pixelcopter_hyperparameters = {
    "h_size": 64,
    "n_training_episodes": 50000,
    "n_evaluation_episodes": 10,
    "max_t": 10000,
    "gamma": 0.99,
    "lr": 1e-4,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}

训练它

  • 我们现在准备好训练我们的代理 🔥 了。
# Create policy and place it to the device
# torch.manual_seed(50)
pixelcopter_policy = Policy(
    pixelcopter_hyperparameters["state_space"],
    pixelcopter_hyperparameters["action_space"],
    pixelcopter_hyperparameters["h_size"],
).to(device)
pixelcopter_optimizer = optim.Adam(pixelcopter_policy.parameters(), lr=pixelcopter_hyperparameters["lr"])
scores = reinforce(
    pixelcopter_policy,
    pixelcopter_optimizer,
    pixelcopter_hyperparameters["n_training_episodes"],
    pixelcopter_hyperparameters["max_t"],
    pixelcopter_hyperparameters["gamma"],
    1000,
)

在 Hub 上发布我们训练好的模型 🔥

repo_id = ""  # TODO Define your repo id {username/Reinforce-{model-id}}
push_to_hub(
    repo_id,
    pixelcopter_policy,  # The model we want to save
    pixelcopter_hyperparameters,  # Hyperparameters
    eval_env,  # Evaluation environment
    video_fps=30
)

一些额外的挑战 🏆

最好的学习方法是自己尝试!正如您所看到的,当前的代理表现不佳。作为第一个建议,您可以进行更多步骤的训练。但也尝试找到更好的参数。

排行榜 中,您将找到您的代理。你能登上榜首吗?

以下是一些登上排行榜的想法

  • 训练更多步骤
  • 通过查看您的同学所做的工作,尝试不同的超参数 👉 https://huggingface.co/models?other=reinforce
  • 在 Hub 上推送您新的训练好的模型 🔥
  • 改进实现以适应更复杂的环境(例如,将网络更改为卷积神经网络以处理帧作为观察结果怎么样?)

恭喜您完成本单元! 这里有很多信息。并祝贺您完成本教程。您刚刚从头开始使用 PyTorch 编写了您的第一个深度强化学习代理,并将其分享到 Hub 🥳。

不要犹豫在本单元上迭代,通过改进实现以适应更复杂的环境(例如,将网络更改为卷积神经网络以处理帧作为观察结果怎么样?)

在下一个单元中,我们将通过在 Unity 环境中训练代理来了解更多关于 Unity MLAgents 的信息。这样,您将准备好参加 AI 与 AI 挑战赛,在挑战赛中,您将训练您的代理在雪球大战和足球比赛中与其他代理竞争。

听起来有趣吗?下次见!

最后,我们很想听听您对本课程的看法以及我们如何改进它。如果您有任何反馈,请 👉 填写此表格

第五单元见! 🔥

保持学习,保持出色 🤗

< > 在 GitHub 上更新