Accelerate 文档
分布式推理
并获得增强的文档体验
开始使用
分布式推理
分布式推理可以分为三类
- 将整个模型加载到每个 GPU 上,然后一次性将一个批次的数据块发送到每个 GPU 的模型副本中
- 将模型的不同部分加载到每个 GPU 上,一次处理一个输入
- 将模型的不同部分加载到每个 GPU 上,并使用所谓的调度流水线并行来结合前两种技术。
我们将介绍第一类和最后一类,展示如何实现它们,因为它们是更现实的场景。
自动将批次数据块发送到每个加载的模型
这是最耗费内存的解决方案,因为它要求每个 GPU 在任何给定时间都在内存中保留模型的完整副本。
通常在这样做时,用户会将模型发送到特定设备以从 CPU 加载,然后将每个提示移动到不同的设备。
使用 diffusers
库的基本流程可能如下所示:
import torch
import torch.distributed as dist
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
然后根据具体的提示进行推理
def run_inference(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
pipe.to(rank)
if torch.distributed.get_rank() == 0:
prompt = "a dog"
elif torch.distributed.get_rank() == 1:
prompt = "a cat"
result = pipe(prompt).images[0]
result.save(f"result_{rank}.png")
人们会注意到,我们必须检查进程的排名(rank)才能知道发送哪个提示,这可能有点繁琐。
用户可能还会认为,使用 Accelerate 中的 Accelerator
为这样的任务准备一个 dataloader 也是一种简单的管理方式。(要了解更多信息,请查看快速入门中的相关部分)
它能处理吗?是的。但它是否会添加不必要的额外代码:也是的。
使用 Accelerate,我们可以通过使用 Accelerator.split_between_processes() 上下文管理器来简化这个过程(这个管理器也存在于 PartialState
和 AcceleratorState
中)。这个函数会自动将您传递给它的任何数据(无论是提示、一组张量、还是包含前述数据的字典等)在所有进程中进行拆分(可能会进行填充),以便您立即使用。
让我们用这个上下文管理器重写上面的例子
import torch
from accelerate import PartialState # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)
# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
result = pipe(prompt).images[0]
result.save(f"result_{distributed_state.process_index}.png")
然后要启动代码,我们可以使用 Accelerate
如果您已经使用 accelerate config
生成了要使用的配置文件
accelerate launch distributed_inference.py
如果您想使用特定的配置文件
accelerate launch --config_file my_config.json distributed_inference.py
或者如果您不想创建任何配置文件,只想在两个 GPU 上启动
注意:您会收到一些关于根据您的系统猜测值的警告。要消除这些警告,您可以运行
accelerate config default
或通过accelerate config
来创建配置文件。
accelerate launch --num_processes 2 distributed_inference.py
我们现在已经将拆分这些数据所需的样板代码轻松地减少到了几行。
但是,如果我们有一个提示与 GPU 的奇数分布呢?例如,如果我们有 3 个提示,但只有 2 个 GPU 怎么办?
在上下文管理器下,第一个 GPU 将接收前两个提示,第二个 GPU 将接收第三个,确保所有提示都被拆分,且不需要额外开销。
然而,如果我们想对所有 GPU 的结果进行某些操作呢?(比如将它们全部收集起来并进行某种后处理)您可以传入 apply_padding=True
来确保提示列表被填充到相同的长度,额外的数据取自最后一个样本。这样所有 GPU 都将拥有相同数量的提示,然后您就可以收集结果了。
这仅在尝试执行诸如收集结果之类的操作时才需要,因为每个设备上的数据需要具有相同的长度。基本的推理不需要这个。
例如
import torch
from accelerate import PartialState # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)
# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat", "a chicken"], apply_padding=True) as prompt:
result = pipe(prompt).images
在第一个 GPU 上,提示将是 ["a dog", "a cat"]
,而在第二个 GPU 上,它将是 ["a chicken", "a chicken"]
。请确保丢弃最后一个样本,因为它将是前一个样本的副本。
您可以在这里找到更复杂的示例,例如如何将其与大语言模型(LLM)一起使用。
内存高效的流水线并行(实验性)
下一部分将讨论使用流水线并行。这是一个实验性 API,它利用了 torch.distributed.pipelining 作为原生解决方案。
流水线并行的一般思想是:假设你有 4 个 GPU 和一个足够大的模型,可以使用 device_map="auto"
将其分割到四个 GPU 上。通过这种方法,你可以一次发送 4 个输入(例如这里,任何数量都可以),每个模型块将处理一个输入,然后在前一个块完成后接收下一个输入,使其比之前描述的方法效率高得多 并且速度更快。这里有一个来自 PyTorch 仓库的可视化图示
为了说明如何将此方法与 Accelerate 结合使用,我们创建了一个示例集合,展示了许多不同的模型和情况。在本教程中,我们将展示在两个 GPU 上对 GPT2 使用此方法。
在继续之前,请确保您已安装最新的 PyTorch 版本,运行以下命令:
pip install torch
首先在 CPU 上创建模型
from transformers import GPT2ForSequenceClassification, GPT2Config
config = GPT2Config()
model = GPT2ForSequenceClassification(config)
model.eval()
接下来,你需要创建一些示例输入来使用。这些可以帮助 torch.distributed.pipelining
追踪模型。
input = torch.randint(
low=0,
high=config.vocab_size,
size=(2, 1024), # bs x seq_len
device="cpu",
dtype=torch.int64,
requires_grad=False,
)
接下来,我们需要实际执行追踪并准备好模型。为此,请使用 inference.prepare_pippy() 函数,它会自动将模型完全包装起来以实现流水线并行。
from accelerate.inference import prepare_pippy
example_inputs = {"input_ids": input}
model = prepare_pippy(model, example_args=(input,))
您可以向 prepare_pippy
传递多种参数
split_points
允许您确定在哪些层分割模型。默认情况下,我们使用device_map="auto"
声明的位置,例如fc
或conv1
。num_chunks
决定了批次将如何被分割并发送到模型本身(因此,num_chunks=1
加上四个分割点/四个 GPU 将会有一个朴素的模型并行,其中单个输入在四个层分割点之间传递)
从这里开始,剩下的就是实际执行分布式推理了!
在传递输入时,我们强烈建议将它们作为参数元组传递。虽然支持使用 kwargs
,但这种方法是实验性的。
args = some_more_arguments
with torch.no_grad():
output = model(*args)
完成后,所有数据将只在最后一个进程上
from accelerate import PartialState
if PartialState().is_last_process:
print(output)
如果您将 gather_output=True
传递给 inference.prepare_pippy(),输出将在之后发送到所有 GPU,而无需进行 is_last_process
检查。此参数默认为 False
,因为它会产生一次通信调用。
就是这样!要探索更多内容,请查看 Accelerate 仓库中的推理示例和我们的文档,我们将继续努力改进这一集成。
< > 在 GitHub 上更新