加速文档

分布式推理

Hugging Face's logo
加入 Hugging Face 社区

并获得增强文档体验

开始使用

分布式推理

分布式推理可以分为三个类别

  1. 将整个模型加载到每个 GPU 上,并一次将批次的块发送到每个 GPU 的模型副本。
  2. 将模型的一部分加载到每个 GPU 上,并一次处理单个输入。
  3. 将模型的一部分加载到每个 GPU 上,并使用称为计划流水线并行的方法来结合前面两种技术。

我们将遍历第一个和最后一个括号,展示如何执行每个括号,因为它们更接近现实场景。

自动将批次的片段发送到每个加载的模型

这是最占用内存的解决方案,因为它要求每个 GPU 在给定时间都将模型的完整副本保存在内存中。

通常在执行此操作时,用户会将模型发送到特定设备以从 CPU 加载它,然后将每个提示移动到不同的设备。

使用diffusers库的基本管道可能如下所示

import torch
import torch.distributed as dist
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)

然后执行基于特定提示的推理

def run_inference(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    pipe.to(rank)

    if torch.distributed.get_rank() == 0:
        prompt = "a dog"
    elif torch.distributed.get_rank() == 1:
        prompt = "a cat"

    result = pipe(prompt).images[0]
    result.save(f"result_{rank}.png")

您会注意到我们必须检查等级才能知道要发送哪个提示,这可能有点乏味。

然后用户可能还会认为,使用 Accelerate 中的Accelerator为此类任务准备数据加载器也可能是一种简单的管理方法。(要了解更多信息,请查看快速入门中的相关部分)

它可以管理它吗?是的。但是,它是否添加了不需要的额外代码:也是的。

使用 Accelerate,我们可以使用Accelerator.split_between_processes()上下文管理器(也存在于PartialStateAcceleratorState中)来简化此过程。此函数会自动将您传递给它的任何数据(无论是提示、一组张量、先前数据的字典等)拆分到所有进程中(可能填充),以便您立即使用。

让我们使用此上下文管理器重写上述示例

from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
    result = pipe(prompt).images[0]
    result.save(f"result_{distributed_state.process_index}.png")

然后要启动代码,我们可以使用 Accelerate

如果您已使用accelerate config生成要使用的配置文件

accelerate launch distributed_inference.py

如果您有要使用的特定配置文件

accelerate launch --config_file my_config.json distributed_inference.py

或者如果您不想创建任何配置文件并在两个 GPU 上启动

注意:您将收到一些关于根据您的系统猜测值的警告。要删除这些警告,您可以执行accelerate config default或遍历accelerate config以创建配置文件。

accelerate launch --num_processes 2 distributed_inference.py

我们现在已经相当轻松地将拆分此数据所需的样板代码减少到几行代码。

但是如果我们对提示到 GPU 的分布有奇数个?例如,如果我们有 3 个提示,但只有 2 个 GPU 呢?

在上下文管理器下,第一个 GPU 将接收前两个提示,第二个 GPU 将接收第三个提示,确保所有提示都被拆分并且不需要任何开销。

但是,如果我们随后想要对所有 GPU的结果执行某些操作呢?(例如,收集所有结果并执行某种后处理)您可以传入apply_padding=True以确保提示列表填充到相同的长度,额外的数据取自最后一个样本。这样,所有 GPU 都会有相同数量的提示,然后您可以收集结果。

仅当尝试执行诸如收集结果之类的操作时才需要这样做,其中每个设备上的数据需要具有相同的长度。基本推理不需要此操作。

例如

from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat", "a chicken"], apply_padding=True) as prompt:
    result = pipe(prompt).images

在第一个 GPU 上,提示将为["a dog", "a cat"],在第二个 GPU 上将为["a chicken", "a chicken"]。确保删除最后一个样本,因为它将是前一个样本的副本。

您可以在此处找到更复杂的示例,例如如何在 LLM 中使用它。

内存高效的流水线并行(实验性)

下一部分将讨论使用流水线并行。这是一个实验性API,它利用torch.distributed.pipelining作为原生解决方案。

流水线并行的一般思路是:假设您有 4 个 GPU 和一个足够大的模型,可以使用device_map="auto"将其拆分到 4 个 GPU 上。使用此方法,您可以一次发送 4 个输入(例如此处,任何数量都可以),每个模型块将处理一个输入,然后在先前块完成时接收下一个输入,使其比前面描述的方法高效且更快。这是 PyTorch 代码库中的一个视觉效果

Pipeline parallelism example

为了说明如何在 Accelerate 中使用它,我们创建了一个示例库,展示了许多不同的模型和情况。在本教程中,我们将对两个 GPU 上的 GPT2 展示此方法。

在继续之前,请确保您已安装最新版本的 PyTorch,方法是运行以下命令

pip install torch

首先在 CPU 上创建模型

from transformers import GPT2ForSequenceClassification, GPT2Config

config = GPT2Config()
model = GPT2ForSequenceClassification(config)
model.eval()

接下来,您需要创建一些要使用的示例输入。这些有助于torch.distributed.pipelining跟踪模型。

但是您如何创建此示例将决定将在给定时间使用/传递给模型的相对批次大小,因此请务必记住有多少项目!
input = torch.randint(
    low=0,
    high=config.vocab_size,
    size=(2, 1024),  # bs x seq_len
    device="cpu",
    dtype=torch.int64,
    requires_grad=False,
)

接下来,我们需要实际执行跟踪并准备好模型。为此,请使用inference.prepare_pippy()函数,它将自动完全包装模型以进行流水线并行

from accelerate.inference import prepare_pippy
example_inputs = {"input_ids": input}
model = prepare_pippy(model, example_args=(input,))

您可以通过prepare_pippy传递各种参数

  • split_points允许您确定在哪里拆分模型。默认情况下,我们使用device_map="auto"声明的位置,例如fcconv1

  • num_chunks确定批次将如何拆分并发送到模型本身(因此,具有四个拆分点/四个 GPU 的num_chunks=1将具有一个朴素的 MP,其中单个输入在四个层拆分点之间传递)

从这里开始,剩下的就是实际执行分布式推理!

在传递输入时,我们强烈建议将其作为参数元组传递。支持使用kwargs,但是,此方法是实验性的。

args = some_more_arguments
with torch.no_grad():
    output = model(*args)

完成后,所有数据将仅位于最后一个进程上

from accelerate import PartialState
if PartialState().is_last_process:
    print(output)

如果您将gather_output=True传递给inference.prepare_pippy(),则输出将随后发送到所有 GPU,而无需is_last_process检查。默认情况下,此值为False,因为它会产生通信调用。

就是这样!要了解更多信息,请查看Accelerate 存储库中的推理示例以及我们的文档,因为我们正在努力改进此集成。

< > 在 GitHub 上更新