Accelerate 文档

分布式推理

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

分布式推理

分布式推理可以分为三个类别:

  1. 将整个模型加载到每个 GPU 上,并一次通过每个 GPU 的模型副本发送一批数据的块
  2. 将模型的部分加载到每个 GPU 上,并一次处理单个输入
  3. 将模型的部分加载到每个 GPU 上,并使用称为计划流水线并行性的技术来结合前两种方法。

我们将介绍第一类和最后一类,展示如何实现每种方法,因为它们是更现实的场景。

自动将批处理块发送到每个加载的模型

这是内存密集程度最高的解决方案,因为它要求每个 GPU 在给定时间在内存中保留模型的完整副本。

通常在执行此操作时,用户会将模型发送到特定设备以从 CPU 加载它,然后将每个提示移动到不同的设备。

使用 diffusers 库的基本流水线可能如下所示

import torch
import torch.distributed as dist
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)

然后根据特定提示执行推理

def run_inference(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    pipe.to(rank)

    if torch.distributed.get_rank() == 0:
        prompt = "a dog"
    elif torch.distributed.get_rank() == 1:
        prompt = "a cat"

    result = pipe(prompt).images[0]
    result.save(f"result_{rank}.png")

人们会注意到我们必须检查 rank 才能知道要发送什么提示,这可能有点乏味。

然后,用户也可能会认为,使用 Accelerate,使用 Accelerator 为此类任务准备数据加载器也可能是一种简单的管理方式。(要了解更多信息,请查看快速入门中的相关部分)

它可以管理吗?是的。但是,它是否添加了不必要的额外代码:也是的。

使用 Accelerate,我们可以通过使用 Accelerator.split_between_processes() 上下文管理器(它也存在于 PartialStateAcceleratorState 中)来简化此过程。此函数将自动拆分您传递给它的任何数据(无论是提示、张量集、先前数据的字典等),并在所有进程中分配(可能需要填充),供您立即使用。

让我们使用此上下文管理器重写上面的示例

import torch
from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
    result = pipe(prompt).images[0]
    result.save(f"result_{distributed_state.process_index}.png")

然后要启动代码,我们可以使用 Accelerate

如果您已生成要使用的配置文件,请使用 accelerate config

accelerate launch distributed_inference.py

如果您有要使用的特定配置文件

accelerate launch --config_file my_config.json distributed_inference.py

或者,如果您不想创建任何配置文件并在两个 GPU 上启动

注意:您将收到一些关于根据您的系统猜测值的警告。要删除这些警告,您可以执行 accelerate config default 或浏览 accelerate config 以创建配置文件。

accelerate launch --num_processes 2 distributed_inference.py

我们现在已将拆分此数据所需的样板代码轻松减少到几行代码。

但是,如果我们的提示到 GPU 的分布不均匀怎么办?例如,如果我们有 3 个提示,但只有 2 个 GPU 怎么办?

在上下文管理器下,第一个 GPU 将接收前两个提示,第二个 GPU 将接收第三个提示,从而确保所有提示都被拆分,并且不需要任何开销。

但是,如果我们随后想要对所有 GPU 的结果执行某些操作怎么办?(例如,将它们全部收集起来并执行某种后处理)您可以传入 apply_padding=True 以确保提示列表被填充到相同的长度,并从最后一个样本中获取额外的数据。这样,所有 GPU 都将具有相同数量的提示,然后您可以收集结果。

仅当尝试执行诸如收集结果之类的操作时才需要这样做,其中每个设备上的数据需要具有相同的长度。基本推理不需要这样做。

例如

import torch
from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat", "a chicken"], apply_padding=True) as prompt:
    result = pipe(prompt).images

在第一个 GPU 上,提示将是 ["a dog", "a cat"],在第二个 GPU 上,提示将是 ["a chicken", "a chicken"]。请务必删除最后一个样本,因为它将是前一个样本的重复项。

您可以在此处找到更复杂的示例,例如如何将其与 LLM 一起使用。

内存高效的流水线并行 (实验性)

接下来的部分将讨论使用流水线并行。这是一个实验性 API,它利用 torch.distributed.pipelining 作为原生解决方案。

流水线并行的一般思想是:假设您有 4 个 GPU 和一个足够大的模型,可以使用 device_map="auto" 将其拆分到四个 GPU 上。使用此方法,您可以一次发送 4 个输入(例如,这里任何数量都可以),并且每个模型块将处理一个输入,然后在前一个块完成后接收下一个输入,这比之前描述的方法效率更高且速度更快。以下是 PyTorch 存储库中的可视化图示

Pipeline parallelism example

为了说明如何将此方法与 Accelerate 一起使用,我们创建了一个 示例库,展示了许多不同的模型和情况。在本教程中,我们将展示 GPT2 在两个 GPU 上的这种方法。

在继续之前,请确保您已安装最新的 PyTorch 版本,方法是运行以下命令

pip install torch

首先在 CPU 上创建模型

from transformers import GPT2ForSequenceClassification, GPT2Config

config = GPT2Config()
model = GPT2ForSequenceClassification(config)
model.eval()

接下来,您需要创建一些示例输入以供使用。这些有助于 torch.distributed.pipelining 跟踪模型。

但是,您创建此示例的方式将决定在给定时间将使用/传递到模型的相对批处理大小,因此请务必记住有多少项!
input = torch.randint(
    low=0,
    high=config.vocab_size,
    size=(2, 1024),  # bs x seq_len
    device="cpu",
    dtype=torch.int64,
    requires_grad=False,
)

接下来,我们需要实际执行跟踪并准备好模型。为此,请使用 inference.prepare_pippy() 函数,它将自动为流水线并行完全包装模型

from accelerate.inference import prepare_pippy
example_inputs = {"input_ids": input}
model = prepare_pippy(model, example_args=(input,))

您可以通过 prepare_pippy 传递各种参数

  • split_points 让您确定在哪些层拆分模型。默认情况下,我们使用 device_map="auto" 声明的任何位置,例如 fcconv1

  • num_chunks 决定了批处理将如何拆分并发送到模型本身(因此,具有四个拆分点/四个 GPU 的 num_chunks=1 将具有朴素的 MP,其中单个输入在四个层拆分点之间传递)

从这里开始,剩下的就是实际执行分布式推理!

在传递输入时,我们强烈建议将它们作为参数元组传递。支持使用 kwargs,但是,此方法是实验性的。

args = some_more_arguments
with torch.no_grad():
    output = model(*args)

完成后,所有数据都将仅在最后一个进程上

from accelerate import PartialState
if PartialState().is_last_process:
    print(output)

如果您将 gather_output=True 传递给 inference.prepare_pippy(),则输出将在之后发送到所有 GPU,而无需 is_last_process 检查。默认情况下,此项为 False,因为它会产生通信调用。

就是这样!要了解更多信息,请查看 Accelerate 存储库中的推理示例和我们的文档,我们将努力改进此集成。

< > 在 GitHub 上更新