Accelerate 文档

分布式推理

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

分布式推理

分布式推理可以分为三类

  1. 将整个模型加载到每个 GPU 上,然后一次性将一个批次的数据块发送到每个 GPU 的模型副本中
  2. 将模型的不同部分加载到每个 GPU 上,一次处理一个输入
  3. 将模型的不同部分加载到每个 GPU 上,并使用所谓的调度流水线并行来结合前两种技术。

我们将介绍第一类和最后一类,展示如何实现它们,因为它们是更现实的场景。

自动将批次数据块发送到每个加载的模型

这是最耗费内存的解决方案,因为它要求每个 GPU 在任何给定时间都在内存中保留模型的完整副本。

通常在这样做时,用户会将模型发送到特定设备以从 CPU 加载,然后将每个提示移动到不同的设备。

使用 diffusers 库的基本流程可能如下所示:

import torch
import torch.distributed as dist
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)

然后根据具体的提示进行推理

def run_inference(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    pipe.to(rank)

    if torch.distributed.get_rank() == 0:
        prompt = "a dog"
    elif torch.distributed.get_rank() == 1:
        prompt = "a cat"

    result = pipe(prompt).images[0]
    result.save(f"result_{rank}.png")

人们会注意到,我们必须检查进程的排名(rank)才能知道发送哪个提示,这可能有点繁琐。

用户可能还会认为,使用 Accelerate 中的 Accelerator 为这样的任务准备一个 dataloader 也是一种简单的管理方式。(要了解更多信息,请查看快速入门中的相关部分)

它能处理吗?是的。但它是否会添加不必要的额外代码:也是的。

使用 Accelerate,我们可以通过使用 Accelerator.split_between_processes() 上下文管理器来简化这个过程(这个管理器也存在于 PartialStateAcceleratorState 中)。这个函数会自动将您传递给它的任何数据(无论是提示、一组张量、还是包含前述数据的字典等)在所有进程中进行拆分(可能会进行填充),以便您立即使用。

让我们用这个上下文管理器重写上面的例子

import torch
from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
    result = pipe(prompt).images[0]
    result.save(f"result_{distributed_state.process_index}.png")

然后要启动代码,我们可以使用 Accelerate

如果您已经使用 accelerate config 生成了要使用的配置文件

accelerate launch distributed_inference.py

如果您想使用特定的配置文件

accelerate launch --config_file my_config.json distributed_inference.py

或者如果您不想创建任何配置文件,只想在两个 GPU 上启动

注意:您会收到一些关于根据您的系统猜测值的警告。要消除这些警告,您可以运行 accelerate config default 或通过 accelerate config 来创建配置文件。

accelerate launch --num_processes 2 distributed_inference.py

我们现在已经将拆分这些数据所需的样板代码轻松地减少到了几行。

但是,如果我们有一个提示与 GPU 的奇数分布呢?例如,如果我们有 3 个提示,但只有 2 个 GPU 怎么办?

在上下文管理器下,第一个 GPU 将接收前两个提示,第二个 GPU 将接收第三个,确保所有提示都被拆分,且不需要额外开销。

然而,如果我们想对所有 GPU 的结果进行某些操作呢?(比如将它们全部收集起来并进行某种后处理)您可以传入 apply_padding=True 来确保提示列表被填充到相同的长度,额外的数据取自最后一个样本。这样所有 GPU 都将拥有相同数量的提示,然后您就可以收集结果了。

这仅在尝试执行诸如收集结果之类的操作时才需要,因为每个设备上的数据需要具有相同的长度。基本的推理不需要这个。

例如

import torch
from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat", "a chicken"], apply_padding=True) as prompt:
    result = pipe(prompt).images

在第一个 GPU 上,提示将是 ["a dog", "a cat"],而在第二个 GPU 上,它将是 ["a chicken", "a chicken"]。请确保丢弃最后一个样本,因为它将是前一个样本的副本。

您可以在这里找到更复杂的示例,例如如何将其与大语言模型(LLM)一起使用。

内存高效的流水线并行(实验性)

下一部分将讨论使用流水线并行。这是一个实验性 API,它利用了 torch.distributed.pipelining 作为原生解决方案。

流水线并行的一般思想是:假设你有 4 个 GPU 和一个足够大的模型,可以使用 device_map="auto" 将其分割到四个 GPU 上。通过这种方法,你可以一次发送 4 个输入(例如这里,任何数量都可以),每个模型块将处理一个输入,然后在前一个块完成后接收下一个输入,使其比之前描述的方法效率高得多 并且速度更快。这里有一个来自 PyTorch 仓库的可视化图示

Pipeline parallelism example

为了说明如何将此方法与 Accelerate 结合使用,我们创建了一个示例集合,展示了许多不同的模型和情况。在本教程中,我们将展示在两个 GPU 上对 GPT2 使用此方法。

在继续之前,请确保您已安装最新的 PyTorch 版本,运行以下命令:

pip install torch

首先在 CPU 上创建模型

from transformers import GPT2ForSequenceClassification, GPT2Config

config = GPT2Config()
model = GPT2ForSequenceClassification(config)
model.eval()

接下来,你需要创建一些示例输入来使用。这些可以帮助 torch.distributed.pipelining 追踪模型。

你创建这个示例的方式将决定在给定时间内将被使用/传递给模型的相对批次大小,所以请务必记住有多少个项目!
input = torch.randint(
    low=0,
    high=config.vocab_size,
    size=(2, 1024),  # bs x seq_len
    device="cpu",
    dtype=torch.int64,
    requires_grad=False,
)

接下来,我们需要实际执行追踪并准备好模型。为此,请使用 inference.prepare_pippy() 函数,它会自动将模型完全包装起来以实现流水线并行。

from accelerate.inference import prepare_pippy
example_inputs = {"input_ids": input}
model = prepare_pippy(model, example_args=(input,))

您可以向 prepare_pippy 传递多种参数

  • split_points 允许您确定在哪些层分割模型。默认情况下,我们使用 device_map="auto" 声明的位置,例如 fcconv1

  • num_chunks 决定了批次将如何被分割并发送到模型本身(因此,num_chunks=1 加上四个分割点/四个 GPU 将会有一个朴素的模型并行,其中单个输入在四个层分割点之间传递)

从这里开始,剩下的就是实际执行分布式推理了!

在传递输入时,我们强烈建议将它们作为参数元组传递。虽然支持使用 kwargs,但这种方法是实验性的。

args = some_more_arguments
with torch.no_grad():
    output = model(*args)

完成后,所有数据将只在最后一个进程上

from accelerate import PartialState
if PartialState().is_last_process:
    print(output)

如果您将 gather_output=True 传递给 inference.prepare_pippy(),输出将在之后发送到所有 GPU,而无需进行 is_last_process 检查。此参数默认为 False,因为它会产生一次通信调用。

就是这样!要探索更多内容,请查看 Accelerate 仓库中的推理示例和我们的文档,我们将继续努力改进这一集成。

< > 在 GitHub 上更新