Accelerate 文档
分布式推理
并获得增强的文档体验
开始使用
分布式推理
分布式推理可以分为三个类别:
- 将整个模型加载到每个 GPU 上,并一次通过每个 GPU 的模型副本发送一批数据的块
- 将模型的部分加载到每个 GPU 上,并一次处理单个输入
- 将模型的部分加载到每个 GPU 上,并使用称为计划流水线并行性的技术来结合前两种方法。
我们将介绍第一类和最后一类,展示如何实现每种方法,因为它们是更现实的场景。
自动将批处理块发送到每个加载的模型
这是内存密集程度最高的解决方案,因为它要求每个 GPU 在给定时间在内存中保留模型的完整副本。
通常在执行此操作时,用户会将模型发送到特定设备以从 CPU 加载它,然后将每个提示移动到不同的设备。
使用 diffusers
库的基本流水线可能如下所示
import torch
import torch.distributed as dist
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
然后根据特定提示执行推理
def run_inference(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
pipe.to(rank)
if torch.distributed.get_rank() == 0:
prompt = "a dog"
elif torch.distributed.get_rank() == 1:
prompt = "a cat"
result = pipe(prompt).images[0]
result.save(f"result_{rank}.png")
人们会注意到我们必须检查 rank 才能知道要发送什么提示,这可能有点乏味。
然后,用户也可能会认为,使用 Accelerate,使用 Accelerator
为此类任务准备数据加载器也可能是一种简单的管理方式。(要了解更多信息,请查看快速入门中的相关部分)
它可以管理吗?是的。但是,它是否添加了不必要的额外代码:也是的。
使用 Accelerate,我们可以通过使用 Accelerator.split_between_processes() 上下文管理器(它也存在于 PartialState
和 AcceleratorState
中)来简化此过程。此函数将自动拆分您传递给它的任何数据(无论是提示、张量集、先前数据的字典等),并在所有进程中分配(可能需要填充),供您立即使用。
让我们使用此上下文管理器重写上面的示例
import torch
from accelerate import PartialState # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)
# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
result = pipe(prompt).images[0]
result.save(f"result_{distributed_state.process_index}.png")
然后要启动代码,我们可以使用 Accelerate
如果您已生成要使用的配置文件,请使用 accelerate config
accelerate launch distributed_inference.py
如果您有要使用的特定配置文件
accelerate launch --config_file my_config.json distributed_inference.py
或者,如果您不想创建任何配置文件并在两个 GPU 上启动
注意:您将收到一些关于根据您的系统猜测值的警告。要删除这些警告,您可以执行
accelerate config default
或浏览accelerate config
以创建配置文件。
accelerate launch --num_processes 2 distributed_inference.py
我们现在已将拆分此数据所需的样板代码轻松减少到几行代码。
但是,如果我们的提示到 GPU 的分布不均匀怎么办?例如,如果我们有 3 个提示,但只有 2 个 GPU 怎么办?
在上下文管理器下,第一个 GPU 将接收前两个提示,第二个 GPU 将接收第三个提示,从而确保所有提示都被拆分,并且不需要任何开销。
但是,如果我们随后想要对所有 GPU 的结果执行某些操作怎么办?(例如,将它们全部收集起来并执行某种后处理)您可以传入 apply_padding=True
以确保提示列表被填充到相同的长度,并从最后一个样本中获取额外的数据。这样,所有 GPU 都将具有相同数量的提示,然后您可以收集结果。
仅当尝试执行诸如收集结果之类的操作时才需要这样做,其中每个设备上的数据需要具有相同的长度。基本推理不需要这样做。
例如
import torch
from accelerate import PartialState # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)
# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat", "a chicken"], apply_padding=True) as prompt:
result = pipe(prompt).images
在第一个 GPU 上,提示将是 ["a dog", "a cat"]
,在第二个 GPU 上,提示将是 ["a chicken", "a chicken"]
。请务必删除最后一个样本,因为它将是前一个样本的重复项。
您可以在此处找到更复杂的示例,例如如何将其与 LLM 一起使用。
内存高效的流水线并行 (实验性)
接下来的部分将讨论使用流水线并行。这是一个实验性 API,它利用 torch.distributed.pipelining 作为原生解决方案。
流水线并行的一般思想是:假设您有 4 个 GPU 和一个足够大的模型,可以使用 device_map="auto"
将其拆分到四个 GPU 上。使用此方法,您可以一次发送 4 个输入(例如,这里任何数量都可以),并且每个模型块将处理一个输入,然后在前一个块完成后接收下一个输入,这比之前描述的方法效率更高且速度更快。以下是 PyTorch 存储库中的可视化图示
为了说明如何将此方法与 Accelerate 一起使用,我们创建了一个 示例库,展示了许多不同的模型和情况。在本教程中,我们将展示 GPT2 在两个 GPU 上的这种方法。
在继续之前,请确保您已安装最新的 PyTorch 版本,方法是运行以下命令
pip install torch
首先在 CPU 上创建模型
from transformers import GPT2ForSequenceClassification, GPT2Config
config = GPT2Config()
model = GPT2ForSequenceClassification(config)
model.eval()
接下来,您需要创建一些示例输入以供使用。这些有助于 torch.distributed.pipelining
跟踪模型。
input = torch.randint(
low=0,
high=config.vocab_size,
size=(2, 1024), # bs x seq_len
device="cpu",
dtype=torch.int64,
requires_grad=False,
)
接下来,我们需要实际执行跟踪并准备好模型。为此,请使用 inference.prepare_pippy() 函数,它将自动为流水线并行完全包装模型
from accelerate.inference import prepare_pippy
example_inputs = {"input_ids": input}
model = prepare_pippy(model, example_args=(input,))
您可以通过 prepare_pippy
传递各种参数
split_points
让您确定在哪些层拆分模型。默认情况下,我们使用device_map="auto"
声明的任何位置,例如fc
或conv1
。num_chunks
决定了批处理将如何拆分并发送到模型本身(因此,具有四个拆分点/四个 GPU 的num_chunks=1
将具有朴素的 MP,其中单个输入在四个层拆分点之间传递)
从这里开始,剩下的就是实际执行分布式推理!
在传递输入时,我们强烈建议将它们作为参数元组传递。支持使用 kwargs
,但是,此方法是实验性的。
args = some_more_arguments
with torch.no_grad():
output = model(*args)
完成后,所有数据都将仅在最后一个进程上
from accelerate import PartialState
if PartialState().is_last_process:
print(output)
如果您将 gather_output=True
传递给 inference.prepare_pippy(),则输出将在之后发送到所有 GPU,而无需 is_last_process
检查。默认情况下,此项为 False
,因为它会产生通信调用。
就是这样!要了解更多信息,请查看 Accelerate 存储库中的推理示例和我们的文档,我们将努力改进此集成。
< > 在 GitHub 上更新