从 PyTorch DDP 到 Accelerate 再到 Trainer,轻松掌握分布式训练

发布于 2022 年 10 月 21 日
在 GitHub 上更新

概述

本教程假设您对 PyTorch 和如何训练简单模型有基本了解。它将通过分布式数据并行(DDP)过程,通过三个不同抽象级别的示例来展示在多个 GPU 上进行训练

  • 通过 pytorch.distributed 模块实现的原生 PyTorch DDP
  • 利用 🤗 Accelerate 对 pytorch.distributed 进行的轻量级封装,该封装还确保代码可以在单个 GPU 和 TPU 上运行,无需任何代码更改,且对原始代码的修改最少
  • 利用 🤗 Transformer 的高级 Trainer API,它抽象了所有样板代码,并支持各种设备和分布式场景

什么是“分布式”训练,它为何重要?

以下是一些非常基本的 PyTorch 训练代码,它根据 官方 MNIST 示例 设置并训练一个基于 MNIST 的模型

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

我们定义训练设备 (cuda)

device = "cuda"

构建一些 PyTorch DataLoaders

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

将模型移动到 CUDA 设备

model = BasicNet().to(device)

构建一个 PyTorch 优化器

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最后创建一个简单的训练和评估循环,对数据集执行一次完整的迭代并计算测试准确率

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

通常,从这里开始,可以将所有这些代码放入 Python 脚本或在 Jupyter Notebook 中运行。

然而,如果这些资源可用,您如何让这个脚本在两个 GPU 或多台机器上运行,这可以通过_分布式_训练来提高训练速度?简单地执行 python myscript.py 只会使用单个 GPU 运行脚本。这就是 torch.distributed 发挥作用的地方。

PyTorch 分布式数据并行

顾名思义,torch.distributed 旨在在_分布式_设置中工作。这可以包括多节点(您有多台机器,每台机器都带有一个 GPU),或多 GPU(单个系统有多个 GPU),或两者的某种组合。

要将我们的上述代码转换为在分布式设置中工作,必须首先定义一些设置配置,详细信息请参见 DDP 入门教程

首先,必须声明一个 setup 和一个 cleanup 函数。这将打开一个处理组,所有计算进程都可以通过该组进行通信

注意:对于本教程的这一部分,应假定这些函数是在 Python 脚本文件中发送的。稍后将讨论使用 Accelerate 的启动器,它消除了这种必要性。

import os
import torch.distributed as dist

def setup(rank, world_size):
    "Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"

    # Initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    "Cleans up the distributed environment"
    dist.destroy_process_group()

最后一个问题是_如何将我的数据和模型发送到另一个 GPU?_

这就是 DistributedDataParallel 模块发挥作用的地方。它会将您的模型复制到每个 GPU 上,当调用 loss.backward() 时,将执行反向传播,并且所有这些模型副本上的结果梯度将被平均/归约。这确保了在优化器步骤之后每个设备都具有相同的权重。

下面是我们训练设置的示例,它被重构为一个具有此功能的函数

注意:这里的 rank 是当前 GPU 相对于所有可用 GPU 的总 rank,这意味着它们的 rank 为 0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # Train for one epoch
    ddp_model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(rank), target.to(rank)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

优化器需要根据特定设备上的模型(即 ddp_model 而不是 model)进行声明,以便正确计算所有梯度。

最后,为了运行脚本,PyTorch 有一个方便的 torchrun 命令行模块可以提供帮助。只需传入它应该使用的节点数量以及要运行的脚本即可

torchrun --nproc_per_node=2 --nnodes=1 example_script.py

上述命令将在一台机器上的两个 GPU 上运行训练脚本,这是仅使用 PyTorch 执行分布式训练的最低要求。

现在让我们来谈谈 Accelerate,一个旨在使这个过程更无缝并帮助实现一些最佳实践的库

🤗 Accelerate

Accelerate 是一个旨在让您执行我们刚刚上面所做的事情而无需大幅修改代码的库。除此之外,Accelerate 内置的数据管道还可以提高您的代码性能。

首先,让我们将上面执行的所有代码封装到一个函数中,以帮助我们直观地看到差异

def train_ddp(rank, world_size):
    setup(rank, world_size)
    # Build DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Build model
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # Build optimizer
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

    # Train for a single epoch
    ddp_model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(rank), target.to(rank)
        output = ddp_model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(rank), target.to(rank)
            output = ddp_model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

接下来,我们谈谈 Accelerate 如何提供帮助。上面的代码存在一些问题

  1. 这略微低效,因为会根据每个设备创建并推送 n 个数据加载器。
  2. 此代码将**仅**适用于多 GPU,因此需要进行特殊处理才能再次在单节点或 TPU 上运行。

Accelerate 通过 Accelerator 类解决了这个问题。通过它,与单节点和多节点的代码比较时,代码基本保持不变,除了三行代码,如下所示

def train_ddp_accelerate():
    accelerator = Accelerator()
    # Build DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Build model
    model = BasicNet()

    # Build optimizer
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    # Send everything through `accelerator.prepare`
    train_loader, test_loader, model, optimizer = accelerator.prepare(
        train_loader, test_loader, model, optimizer
    )

    # Train for a single epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, target)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

有了它,您的 PyTorch 训练循环现在可以借助 Accelerator 对象在任何分布式设置中运行。然后,此代码仍然可以通过 torchrun CLI 或通过 Accelerate 自己的 CLI 界面 accelerate launch 启动。

因此,使用 Accelerate 进行分布式训练变得轻而易举,并且可以尽可能多地保持 PyTorch 骨架代码不变。

前面提到过,Accelerate 还可以提高 DataLoaders 的效率。这是通过自定义 Sampler 实现的,它可以在训练期间自动将批次的部分发送到不同的设备,从而允许一次只知道一份数据副本,而不是根据配置一次将四份数据副本加载到内存中。此外,内存中总共只有一份原始数据集的完整副本。此数据集的子集在所有用于训练的节点之间进行拆分,从而允许在单个实例上训练更大的数据集,而不会导致内存使用量爆炸式增长。

使用 notebook_launcher

前面提到过,您可以直接从 Jupyter Notebook 中启动分布式代码。这来自 Accelerate 的 notebook_launcher 实用程序,它允许根据 Jupyter Notebook 中的代码启动多 GPU 训练。

使用它就像导入启动器一样简单

from accelerate import notebook_launcher

并将我们之前声明的训练函数、要传递的任何参数以及要使用的进程数(例如 TPU 上的 8 个或两个 GPU 上的 2 个)传递进去。上述两个训练函数都可以运行,但请注意,在您启动一次之后,实例需要重新启动才能再次生成。

notebook_launcher(train_ddp, args=(), num_processes=2)

notebook_launcher(train_ddp_accelerate, args=(), num_processes=2)

使用 🤗 Trainer

最后,我们来到了最高级别的 API -- Hugging Face Trainer

这尽可能地封装了训练过程,同时仍然能够在分布式系统上进行训练,而用户无需进行任何操作。

首先我们需要导入 Trainer

from transformers import Trainer

然后我们定义一些 TrainingArguments 来控制所有常用的超参数。Trainer 也通过字典工作,因此需要创建一个自定义的 collate 函数。

最后,我们对 Trainer 进行子类化并编写我们自己的 compute_loss

之后,这段代码也将在分布式设置中工作,无需编写任何训练代码!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)
trainer.train()
    ***** Running training *****
      Num examples = 60000
      Num Epochs = 1
      Instantaneous batch size per device = 64
      Total train batch size (w. parallel, distributed & accumulation) = 64
      Gradient Accumulation steps = 1
      Total optimization steps = 938
轮次 训练损失 验证损失
1 0.875700 0.282633

与上面使用 notebook_launcher 的示例类似,这里也可以通过将其全部放入训练函数中来实现。

def train_trainer_ddp():
    model = BasicNet()

    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )

    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}

    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            target = inputs["labels"]
            loss = F.nll_loss(outputs, target)
            return (loss, outputs) if return_outputs else loss

    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )

    trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

资源

要了解更多关于 PyTorch 分布式数据并行,请查看这里的文档 这里

要了解更多关于 🤗 Accelerate 的信息,请查看这里的文档 这里

要了解更多关于 🤗 Transformers 的信息,请查看这里的文档 这里

社区

注册登录 发表评论