Diffusers 文档

分布式推理

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

分布式推理

在分布式设置中,您可以跨多个 GPU 运行推理,使用 🤗 AcceleratePyTorch Distributed,这对于并行生成多个提示非常有用。

本指南将向您展示如何使用 🤗 Accelerate 和 PyTorch Distributed 进行分布式推理。

🤗 Accelerate

🤗 Accelerate 是一个旨在简化跨分布式设置进行训练或运行推理的库。它简化了分布式环境的设置过程,使您能够专注于您的 PyTorch 代码。

首先,创建一个 Python 文件并初始化一个 accelerate.PartialState 以创建分布式环境;您的设置会被自动检测到,因此您无需显式定义 rankworld_size。将 DiffusionPipeline 移动到 distributed_state.device 以将 GPU 分配给每个进程。

现在使用 split_between_processes 实用程序作为上下文管理器,以在进程数量之间自动分配提示。

import torch
from accelerate import PartialState
from diffusers import DiffusionPipeline

pipeline = DiffusionPipeline.from_pretrained(
    "stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)
distributed_state = PartialState()
pipeline.to(distributed_state.device)

with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
    result = pipeline(prompt).images[0]
    result.save(f"result_{distributed_state.process_index}.png")

使用 --num_processes 参数指定要使用的 GPU 数量,并调用 accelerate launch 运行脚本

accelerate launch run_distributed.py --num_processes=2

请参阅这个最小示例 脚本,了解如何在多个 GPU 上运行推理。要了解更多信息,请查看 使用 🤗 Accelerate 进行分布式推理 指南。

PyTorch Distributed

PyTorch 支持 DistributedDataParallel,它启用数据并行。

首先,创建一个 Python 文件并导入 torch.distributedtorch.multiprocessing 以设置分布式进程组,并在每个 GPU 上生成用于推理的进程。您还应该初始化一个 DiffusionPipeline

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

from diffusers import DiffusionPipeline

sd = DiffusionPipeline.from_pretrained(
    "stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)

您需要创建一个函数来运行推理;init_process_group 处理创建分布式环境,包括要使用的后端类型、当前进程的 rank 以及参与进程的 world_size 或数量。如果您在 2 个 GPU 上并行运行推理,则 world_size 为 2。

DiffusionPipeline 移动到 rank 并使用 get_rank 为每个进程分配一个 GPU,其中每个进程处理不同的提示

def run_inference(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    sd.to(rank)

    if torch.distributed.get_rank() == 0:
        prompt = "a dog"
    elif torch.distributed.get_rank() == 1:
        prompt = "a cat"

    image = sd(prompt).images[0]
    image.save(f"./{'_'.join(prompt)}.png")

要运行分布式推理,请调用 mp.spawn 以在 world_size 中定义的 GPU 数量上运行 run_inference 函数

def main():
    world_size = 2
    mp.spawn(run_inference, args=(world_size,), nprocs=world_size, join=True)


if __name__ == "__main__":
    main()

完成推理脚本后,使用 --nproc_per_node 参数指定要使用的 GPU 数量,并调用 torchrun 运行脚本

torchrun run_distributed.py --nproc_per_node=2

您可以在 DiffusionPipeline 中使用 device_map 将其模型级组件分布在多个设备上。请参阅 设备放置 指南以了解更多信息。

模型分片

现代扩散系统(如 Flux)非常庞大,并且具有多个模型。例如,Flux.1-Dev 由两个文本编码器(T5-XXLCLIP-L)、一个 扩散 Transformer 和一个 VAE 组成。对于如此大型的模型,在消费级 GPU 上运行推理可能具有挑战性。

模型分片是一种在模型不适合单个 GPU 时将模型分布在多个 GPU 上的技术。以下示例假设有两个 16GB GPU 可用于推理。

首先,使用文本编码器计算文本嵌入。通过设置 device_map="balanced" 将文本编码器保留在两个 GPU 上。balanced 策略将模型均匀地分布在所有可用的 GPU 上。使用 max_memory 参数为每个 GPU 上的每个文本编码器分配最大内存量。

在此步骤中加载文本编码器!扩散 Transformer 和 VAE 在后续步骤中加载,以节省内存。

from diffusers import FluxPipeline
import torch

prompt = "a photo of a dog with cat-like look"

pipeline = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-dev",
    transformer=None,
    vae=None,
    device_map="balanced",
    max_memory={0: "16GB", 1: "16GB"},
    torch_dtype=torch.bfloat16
)
with torch.no_grad():
    print("Encoding prompts.")
    prompt_embeds, pooled_prompt_embeds, text_ids = pipeline.encode_prompt(
        prompt=prompt, prompt_2=None, max_sequence_length=512
    )

计算出文本嵌入后,将其从 GPU 中移除,为扩散 Transformer 腾出空间。

import gc 

def flush():
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.reset_max_memory_allocated()
    torch.cuda.reset_peak_memory_stats()

del pipeline.text_encoder
del pipeline.text_encoder_2
del pipeline.tokenizer
del pipeline.tokenizer_2
del pipeline

flush()

接下来加载具有 125 亿参数的扩散 Transformer。这次,设置 device_map="auto" 以自动将模型分布在两个 16GB GPU 上。auto 策略由 Accelerate 支持,并且是 大型模型推理 功能的一部分。它首先尝试将模型分布在最快的设备(GPU)上,然后再移动到较慢的设备(如 CPU 和硬盘驱动器,如果需要)。将模型参数存储在较慢设备上的权衡是推理延迟会更长。

from diffusers import FluxTransformer2DModel
import torch 

transformer = FluxTransformer2DModel.from_pretrained(
    "black-forest-labs/FLUX.1-dev", 
    subfolder="transformer",
    device_map="auto",
    torch_dtype=torch.bfloat16
)

在任何时候,您都可以尝试 print(pipeline.hf_device_map) 来查看各种模型在设备之间的分布情况。这对于跟踪模型的设备放置非常有用。您也可以尝试 print(transformer.hf_device_map) 来查看 Transformer 模型如何在设备之间分片。

将 Transformer 模型添加到 pipeline 中进行去噪,但将其他模型级组件(如文本编码器和 VAE)设置为 None,因为您尚不需要它们。

pipeline = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-dev",
    text_encoder=None,
    text_encoder_2=None,
    tokenizer=None,
    tokenizer_2=None,
    vae=None,
    transformer=transformer,
    torch_dtype=torch.bfloat16
)

print("Running denoising.")
height, width = 768, 1360
latents = pipeline(
    prompt_embeds=prompt_embeds,
    pooled_prompt_embeds=pooled_prompt_embeds,
    num_inference_steps=50,
    guidance_scale=3.5,
    height=height,
    width=width,
    output_type="latent",
).images

从内存中移除 pipeline 和 Transformer,因为它们不再需要。

del pipeline.transformer
del pipeline

flush()

最后,使用 VAE 将潜在空间解码为图像。VAE 通常足够小,可以加载到单个 GPU 上。

from diffusers import AutoencoderKL
from diffusers.image_processor import VaeImageProcessor
import torch 

vae = AutoencoderKL.from_pretrained(ckpt_id, subfolder="vae", torch_dtype=torch.bfloat16).to("cuda")
vae_scale_factor = 2 ** (len(vae.config.block_out_channels))
image_processor = VaeImageProcessor(vae_scale_factor=vae_scale_factor)

with torch.no_grad():
    print("Running decoding.")
    latents = FluxPipeline._unpack_latents(latents, height, width, vae_scale_factor)
    latents = (latents / vae.config.scaling_factor) + vae.config.shift_factor

    image = vae.decode(latents, return_dict=False)[0]
    image = image_processor.postprocess(image, output_type="pil")
    image[0].save("split_transformer.png")

通过选择性地加载和卸载在给定阶段所需的模型,并将最大的模型分片到多个 GPU 上,可以在消费级 GPU 上运行大型模型的推理。

< > 在 GitHub 上更新