Diffusers 文档
分布式推理
并获得增强的文档体验
开始使用
分布式推理
在分布式设置中,您可以跨多个 GPU 运行推理,使用 🤗 Accelerate 或 PyTorch Distributed,这对于并行生成多个提示非常有用。
本指南将向您展示如何使用 🤗 Accelerate 和 PyTorch Distributed 进行分布式推理。
🤗 Accelerate
🤗 Accelerate 是一个旨在简化跨分布式设置进行训练或运行推理的库。它简化了分布式环境的设置过程,使您能够专注于您的 PyTorch 代码。
首先,创建一个 Python 文件并初始化一个 accelerate.PartialState 以创建分布式环境;您的设置会被自动检测到,因此您无需显式定义 rank
或 world_size
。将 DiffusionPipeline 移动到 distributed_state.device
以将 GPU 分配给每个进程。
现在使用 split_between_processes 实用程序作为上下文管理器,以在进程数量之间自动分配提示。
import torch
from accelerate import PartialState
from diffusers import DiffusionPipeline
pipeline = DiffusionPipeline.from_pretrained(
"stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)
distributed_state = PartialState()
pipeline.to(distributed_state.device)
with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
result = pipeline(prompt).images[0]
result.save(f"result_{distributed_state.process_index}.png")
使用 --num_processes
参数指定要使用的 GPU 数量,并调用 accelerate launch
运行脚本
accelerate launch run_distributed.py --num_processes=2
请参阅这个最小示例 脚本,了解如何在多个 GPU 上运行推理。要了解更多信息,请查看 使用 🤗 Accelerate 进行分布式推理 指南。
PyTorch Distributed
PyTorch 支持 DistributedDataParallel
,它启用数据并行。
首先,创建一个 Python 文件并导入 torch.distributed
和 torch.multiprocessing
以设置分布式进程组,并在每个 GPU 上生成用于推理的进程。您还应该初始化一个 DiffusionPipeline
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from diffusers import DiffusionPipeline
sd = DiffusionPipeline.from_pretrained(
"stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)
您需要创建一个函数来运行推理;init_process_group
处理创建分布式环境,包括要使用的后端类型、当前进程的 rank
以及参与进程的 world_size
或数量。如果您在 2 个 GPU 上并行运行推理,则 world_size
为 2。
将 DiffusionPipeline 移动到 rank
并使用 get_rank
为每个进程分配一个 GPU,其中每个进程处理不同的提示
def run_inference(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
sd.to(rank)
if torch.distributed.get_rank() == 0:
prompt = "a dog"
elif torch.distributed.get_rank() == 1:
prompt = "a cat"
image = sd(prompt).images[0]
image.save(f"./{'_'.join(prompt)}.png")
要运行分布式推理,请调用 mp.spawn
以在 world_size
中定义的 GPU 数量上运行 run_inference
函数
def main():
world_size = 2
mp.spawn(run_inference, args=(world_size,), nprocs=world_size, join=True)
if __name__ == "__main__":
main()
完成推理脚本后,使用 --nproc_per_node
参数指定要使用的 GPU 数量,并调用 torchrun
运行脚本
torchrun run_distributed.py --nproc_per_node=2
您可以在 DiffusionPipeline 中使用 device_map
将其模型级组件分布在多个设备上。请参阅 设备放置 指南以了解更多信息。
模型分片
现代扩散系统(如 Flux)非常庞大,并且具有多个模型。例如,Flux.1-Dev 由两个文本编码器(T5-XXL 和 CLIP-L)、一个 扩散 Transformer 和一个 VAE 组成。对于如此大型的模型,在消费级 GPU 上运行推理可能具有挑战性。
模型分片是一种在模型不适合单个 GPU 时将模型分布在多个 GPU 上的技术。以下示例假设有两个 16GB GPU 可用于推理。
首先,使用文本编码器计算文本嵌入。通过设置 device_map="balanced"
将文本编码器保留在两个 GPU 上。balanced
策略将模型均匀地分布在所有可用的 GPU 上。使用 max_memory
参数为每个 GPU 上的每个文本编码器分配最大内存量。
在此步骤中仅加载文本编码器!扩散 Transformer 和 VAE 在后续步骤中加载,以节省内存。
from diffusers import FluxPipeline
import torch
prompt = "a photo of a dog with cat-like look"
pipeline = FluxPipeline.from_pretrained(
"black-forest-labs/FLUX.1-dev",
transformer=None,
vae=None,
device_map="balanced",
max_memory={0: "16GB", 1: "16GB"},
torch_dtype=torch.bfloat16
)
with torch.no_grad():
print("Encoding prompts.")
prompt_embeds, pooled_prompt_embeds, text_ids = pipeline.encode_prompt(
prompt=prompt, prompt_2=None, max_sequence_length=512
)
计算出文本嵌入后,将其从 GPU 中移除,为扩散 Transformer 腾出空间。
import gc
def flush():
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_max_memory_allocated()
torch.cuda.reset_peak_memory_stats()
del pipeline.text_encoder
del pipeline.text_encoder_2
del pipeline.tokenizer
del pipeline.tokenizer_2
del pipeline
flush()
接下来加载具有 125 亿参数的扩散 Transformer。这次,设置 device_map="auto"
以自动将模型分布在两个 16GB GPU 上。auto
策略由 Accelerate 支持,并且是 大型模型推理 功能的一部分。它首先尝试将模型分布在最快的设备(GPU)上,然后再移动到较慢的设备(如 CPU 和硬盘驱动器,如果需要)。将模型参数存储在较慢设备上的权衡是推理延迟会更长。
from diffusers import FluxTransformer2DModel
import torch
transformer = FluxTransformer2DModel.from_pretrained(
"black-forest-labs/FLUX.1-dev",
subfolder="transformer",
device_map="auto",
torch_dtype=torch.bfloat16
)
在任何时候,您都可以尝试 print(pipeline.hf_device_map)
来查看各种模型在设备之间的分布情况。这对于跟踪模型的设备放置非常有用。您也可以尝试 print(transformer.hf_device_map)
来查看 Transformer 模型如何在设备之间分片。
将 Transformer 模型添加到 pipeline 中进行去噪,但将其他模型级组件(如文本编码器和 VAE)设置为 None
,因为您尚不需要它们。
pipeline = FluxPipeline.from_pretrained(
"black-forest-labs/FLUX.1-dev",
text_encoder=None,
text_encoder_2=None,
tokenizer=None,
tokenizer_2=None,
vae=None,
transformer=transformer,
torch_dtype=torch.bfloat16
)
print("Running denoising.")
height, width = 768, 1360
latents = pipeline(
prompt_embeds=prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
num_inference_steps=50,
guidance_scale=3.5,
height=height,
width=width,
output_type="latent",
).images
从内存中移除 pipeline 和 Transformer,因为它们不再需要。
del pipeline.transformer
del pipeline
flush()
最后,使用 VAE 将潜在空间解码为图像。VAE 通常足够小,可以加载到单个 GPU 上。
from diffusers import AutoencoderKL
from diffusers.image_processor import VaeImageProcessor
import torch
vae = AutoencoderKL.from_pretrained(ckpt_id, subfolder="vae", torch_dtype=torch.bfloat16).to("cuda")
vae_scale_factor = 2 ** (len(vae.config.block_out_channels))
image_processor = VaeImageProcessor(vae_scale_factor=vae_scale_factor)
with torch.no_grad():
print("Running decoding.")
latents = FluxPipeline._unpack_latents(latents, height, width, vae_scale_factor)
latents = (latents / vae.config.scaling_factor) + vae.config.shift_factor
image = vae.decode(latents, return_dict=False)[0]
image = image_processor.postprocess(image, output_type="pil")
image[0].save("split_transformer.png")
通过选择性地加载和卸载在给定阶段所需的模型,并将最大的模型分片到多个 GPU 上,可以在消费级 GPU 上运行大型模型的推理。
< > 在 GitHub 上更新