训练扩散模型
无条件图像生成是扩散模型的一种流行应用,它可以生成看起来像训练数据集中图像的图像。通常,微调在特定数据集上预训练的模型可以获得最佳效果。您可以在 Hub 上找到许多这样的检查点,但如果您找不到喜欢的检查点,您可以随时训练自己的模型!
本教程将教会您如何从头开始在 Smithsonian Butterflies 数据集的子集上训练 UNet2DModel,以生成您自己的 🦋 蝴蝶 🦋。
💡 本训练教程基于 使用 🧨 Diffusers 进行训练 笔记本。有关扩散模型的更多详细信息和背景信息,例如它们的工作原理,请查看笔记本!
在开始之前,请确保您已安装 🤗 Datasets 来加载和预处理图像数据集,以及 🤗 Accelerate,以简化在任意数量的 GPU 上进行训练。以下命令还会安装 TensorBoard 来可视化训练指标(您也可以使用 Weights & Biases 来跟踪您的训练)。
# uncomment to install the necessary libraries in Colab
#!pip install diffusers[training]
我们鼓励您与社区共享您的模型,为此,您需要登录您的 Hugging Face 帐户(如果您还没有帐户,请在 此处 创建一个!)。您可以从笔记本中登录,并在提示时输入您的令牌。确保您的令牌具有写入角色。
>>> from huggingface_hub import notebook_login
>>> notebook_login()
或从终端登录
huggingface-cli login
由于模型检查点非常大,请安装 Git-LFS 来版本控制这些大型文件
!sudo apt -qq install git-lfs !git config --global credential.helper store
训练配置
为方便起见,创建一个 TrainingConfig
类,其中包含训练超参数(您可以随意调整它们)
>>> from dataclasses import dataclass
>>> @dataclass
... class TrainingConfig:
... image_size = 128 # the generated image resolution
... train_batch_size = 16
... eval_batch_size = 16 # how many images to sample during evaluation
... num_epochs = 50
... gradient_accumulation_steps = 1
... learning_rate = 1e-4
... lr_warmup_steps = 500
... save_image_epochs = 10
... save_model_epochs = 30
... mixed_precision = "fp16" # `no` for float32, `fp16` for automatic mixed precision
... output_dir = "ddpm-butterflies-128" # the model name locally and on the HF Hub
... push_to_hub = True # whether to upload the saved model to the HF Hub
... hub_model_id = "<your-username>/<my-awesome-model>" # the name of the repository to create on the HF Hub
... hub_private_repo = False
... overwrite_output_dir = True # overwrite the old model when re-running the notebook
... seed = 0
>>> config = TrainingConfig()
加载数据集
您可以使用 🤗 Datasets 库轻松加载 Smithsonian Butterflies 数据集
>>> from datasets import load_dataset
>>> config.dataset_name = "huggan/smithsonian_butterflies_subset"
>>> dataset = load_dataset(config.dataset_name, split="train")
💡 您可以在 HugGan 社区活动 中找到其他数据集,或者通过创建本地 ImageFolder
来使用自己的数据集。如果数据集来自 HugGan 社区活动,则将 config.dataset_name
设置为数据集的存储库 ID,或者如果您使用的是自己的图像,则设置为 imagefolder
。
🤗 Datasets 使用 Image 功能自动解码图像数据并将其加载为 PIL.Image
,我们可以将其可视化
>>> import matplotlib.pyplot as plt
>>> fig, axs = plt.subplots(1, 4, figsize=(16, 4))
>>> for i, image in enumerate(dataset[:4]["image"]):
... axs[i].imshow(image)
... axs[i].set_axis_off()
>>> fig.show()
但是,这些图像的大小各不相同,因此您需要先对其进行预处理
Resize
将图像大小更改为config.image_size
中定义的大小。RandomHorizontalFlip
通过随机镜像图像来增强数据集。Normalize
很重要,它可以将像素值重新缩放到 [-1, 1] 范围,这是模型所期望的。
>>> from torchvision import transforms
>>> preprocess = transforms.Compose(
... [
... transforms.Resize((config.image_size, config.image_size)),
... transforms.RandomHorizontalFlip(),
... transforms.ToTensor(),
... transforms.Normalize([0.5], [0.5]),
... ]
... )
使用 🤗 Datasets 的 set_transform 方法,在训练期间动态应用 preprocess
函数
>>> def transform(examples):
... images = [preprocess(image.convert("RGB")) for image in examples["image"]]
... return {"images": images}
>>> dataset.set_transform(transform)
您可以随意再次可视化图像,以确认它们已调整大小。现在,您已准备好将数据集包装到 DataLoader 中进行训练!
>>> import torch
>>> train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=config.train_batch_size, shuffle=True)
创建一个 UNet2DModel
🧨 Diffusers 中的预训练模型可以通过其模型类轻松创建,并使用您想要的参数。例如,要创建一个 UNet2DModel
>>> from diffusers import UNet2DModel
>>> model = UNet2DModel(
... sample_size=config.image_size, # the target image resolution
... in_channels=3, # the number of input channels, 3 for RGB images
... out_channels=3, # the number of output channels
... layers_per_block=2, # how many ResNet layers to use per UNet block
... block_out_channels=(128, 128, 256, 256, 512, 512), # the number of output channels for each UNet block
... down_block_types=(
... "DownBlock2D", # a regular ResNet downsampling block
... "DownBlock2D",
... "DownBlock2D",
... "DownBlock2D",
... "AttnDownBlock2D", # a ResNet downsampling block with spatial self-attention
... "DownBlock2D",
... ),
... up_block_types=(
... "UpBlock2D", # a regular ResNet upsampling block
... "AttnUpBlock2D", # a ResNet upsampling block with spatial self-attention
... "UpBlock2D",
... "UpBlock2D",
... "UpBlock2D",
... "UpBlock2D",
... ),
... )
通常最好快速检查示例图像形状是否与模型输出形状匹配
>>> sample_image = dataset[0]["images"].unsqueeze(0)
>>> print("Input shape:", sample_image.shape)
Input shape: torch.Size([1, 3, 128, 128])
>>> print("Output shape:", model(sample_image, timestep=0).sample.shape)
Output shape: torch.Size([1, 3, 128, 128])
太棒了!接下来,您需要一个调度器,以便将一些噪声添加到图像中。
创建一个调度器
调度器根据您是使用模型进行训练还是推理而表现不同。在推理期间,调度器从噪声中生成图像。在训练期间,调度器会获取模型输出(或来自扩散过程中的特定点的样本),并根据噪声计划和更新规则对图像应用噪声。
让我们看一下 DDPMScheduler,并使用 add_noise
方法将一些随机噪声添加到之前的 sample_image
中
>>> import torch
>>> from PIL import Image
>>> from diffusers import DDPMScheduler
>>> noise_scheduler = DDPMScheduler(num_train_timesteps=1000)
>>> noise = torch.randn(sample_image.shape)
>>> timesteps = torch.LongTensor([50])
>>> noisy_image = noise_scheduler.add_noise(sample_image, noise, timesteps)
>>> Image.fromarray(((noisy_image.permute(0, 2, 3, 1) + 1.0) * 127.5).type(torch.uint8).numpy()[0])
模型的训练目标是预测添加到图像中的噪声。此步骤的损失可以通过以下公式计算得出:
>>> import torch.nn.functional as F
>>> noise_pred = model(noisy_image, timesteps).sample
>>> loss = F.mse_loss(noise_pred, noise)
训练模型
到目前为止,您已经拥有了开始训练模型的大部分组件,剩下的就是将所有东西组合在一起。
首先,您需要一个优化器和一个学习率调度器
>>> from diffusers.optimization import get_cosine_schedule_with_warmup
>>> optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate)
>>> lr_scheduler = get_cosine_schedule_with_warmup(
... optimizer=optimizer,
... num_warmup_steps=config.lr_warmup_steps,
... num_training_steps=(len(train_dataloader) * config.num_epochs),
... )
然后,您需要一种方法来评估模型。对于评估,您可以使用 DDPMPipeline 生成一批样本图像,并将其保存为网格
>>> from diffusers import DDPMPipeline
>>> from diffusers.utils import make_image_grid
>>> import os
>>> def evaluate(config, epoch, pipeline):
... # Sample some images from random noise (this is the backward diffusion process).
... # The default pipeline output type is `List[PIL.Image]`
... images = pipeline(
... batch_size=config.eval_batch_size,
... generator=torch.Generator(device='cpu').manual_seed(config.seed), # Use a separate torch generator to avoid rewinding the random state of the main training loop
... ).images
... # Make a grid out of the images
... image_grid = make_image_grid(images, rows=4, cols=4)
... # Save the images
... test_dir = os.path.join(config.output_dir, "samples")
... os.makedirs(test_dir, exist_ok=True)
... image_grid.save(f"{test_dir}/{epoch:04d}.png")
现在,您可以使用 🤗 Accelerate 将所有这些组件包装到一个训练循环中,以方便使用 TensorBoard 日志记录、梯度累积和混合精度训练。要将模型上传到 Hub,请编写一个函数来获取您的存储库名称和信息,然后将其推送到 Hub。
💡 下面的训练循环可能看起来很吓人而且很长,但当你只需一行代码就能启动训练时,它将非常有用!如果你等不及想要开始生成图像,请随意复制并运行下面的代码。你也可以随时回来更仔细地检查训练循环,例如当你等待模型完成训练时。🤗
>>> from accelerate import Accelerator
>>> from huggingface_hub import create_repo, upload_folder
>>> from tqdm.auto import tqdm
>>> from pathlib import Path
>>> import os
>>> def train_loop(config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler):
... # Initialize accelerator and tensorboard logging
... accelerator = Accelerator(
... mixed_precision=config.mixed_precision,
... gradient_accumulation_steps=config.gradient_accumulation_steps,
... log_with="tensorboard",
... project_dir=os.path.join(config.output_dir, "logs"),
... )
... if accelerator.is_main_process:
... if config.output_dir is not None:
... os.makedirs(config.output_dir, exist_ok=True)
... if config.push_to_hub:
... repo_id = create_repo(
... repo_id=config.hub_model_id or Path(config.output_dir).name, exist_ok=True
... ).repo_id
... accelerator.init_trackers("train_example")
... # Prepare everything
... # There is no specific order to remember, you just need to unpack the
... # objects in the same order you gave them to the prepare method.
... model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
... model, optimizer, train_dataloader, lr_scheduler
... )
... global_step = 0
... # Now you train the model
... for epoch in range(config.num_epochs):
... progress_bar = tqdm(total=len(train_dataloader), disable=not accelerator.is_local_main_process)
... progress_bar.set_description(f"Epoch {epoch}")
... for step, batch in enumerate(train_dataloader):
... clean_images = batch["images"]
... # Sample noise to add to the images
... noise = torch.randn(clean_images.shape, device=clean_images.device)
... bs = clean_images.shape[0]
... # Sample a random timestep for each image
... timesteps = torch.randint(
... 0, noise_scheduler.config.num_train_timesteps, (bs,), device=clean_images.device,
... dtype=torch.int64
... )
... # Add noise to the clean images according to the noise magnitude at each timestep
... # (this is the forward diffusion process)
... noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)
... with accelerator.accumulate(model):
... # Predict the noise residual
... noise_pred = model(noisy_images, timesteps, return_dict=False)[0]
... loss = F.mse_loss(noise_pred, noise)
... accelerator.backward(loss)
... if accelerator.sync_gradients:
... accelerator.clip_grad_norm_(model.parameters(), 1.0)
... optimizer.step()
... lr_scheduler.step()
... optimizer.zero_grad()
... progress_bar.update(1)
... logs = {"loss": loss.detach().item(), "lr": lr_scheduler.get_last_lr()[0], "step": global_step}
... progress_bar.set_postfix(**logs)
... accelerator.log(logs, step=global_step)
... global_step += 1
... # After each epoch you optionally sample some demo images with evaluate() and save the model
... if accelerator.is_main_process:
... pipeline = DDPMPipeline(unet=accelerator.unwrap_model(model), scheduler=noise_scheduler)
... if (epoch + 1) % config.save_image_epochs == 0 or epoch == config.num_epochs - 1:
... evaluate(config, epoch, pipeline)
... if (epoch + 1) % config.save_model_epochs == 0 or epoch == config.num_epochs - 1:
... if config.push_to_hub:
... upload_folder(
... repo_id=repo_id,
... folder_path=config.output_dir,
... commit_message=f"Epoch {epoch}",
... ignore_patterns=["step_*", "epoch_*"],
... )
... else:
... pipeline.save_pretrained(config.output_dir)
Whew,代码真不少!但你现在终于可以使用 🤗 Accelerate 的 notebook_launcher 函数启动训练了。将训练循环、所有训练参数和用于训练的进程数(可以将此值更改为可用的 GPU 数量)传递给函数。
>>> from accelerate import notebook_launcher
>>> args = (config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler)
>>> notebook_launcher(train_loop, args, num_processes=1)
训练完成后,看看你的扩散模型生成的最终 🦋 图像 🦋!
>>> import glob
>>> sample_images = sorted(glob.glob(f"{config.output_dir}/samples/*.png"))
>>> Image.open(sample_images[-1])
下一步
无条件图像生成是可训练的任务之一。你可以通过访问 🧨 Diffusers 训练示例 页面来探索其他任务和训练技术。以下是一些你可以学到的示例。
- 文本反转 是一种算法,它教会模型一个特定的视觉概念并将其整合到生成的图像中。
- DreamBooth 是一种技术,它用于根据主题的多个输入图像生成主题的个性化图像。
- 指南 用于微调你自己数据集上的 Stable Diffusion 模型。
- 指南 用于使用 LoRA,这是一种内存高效的技术,可以更快地微调非常大的模型。