Accelerate 文档

加速器

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

Accelerator

The Accelerator 是用于在任何类型的训练设置上启用分布式训练的主要类。阅读“将 Accelerator 添加到你的代码”教程,以了解有关如何将 Accelerator 添加到你的脚本的更多信息。

Accelerator

class accelerate.Accelerator

< >

( device_placement: bool = True split_batches: bool = <object object at 0x7eff015f54b0> mixed_precision: PrecisionType | str | None = None gradient_accumulation_steps: int = 1 cpu: bool = False dataloader_config: DataLoaderConfiguration | None = None deepspeed_plugin: DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None = None fsdp_plugin: FullyShardedDataParallelPlugin | None = None torch_tp_plugin: TorchTensorParallelPlugin | None = None megatron_lm_plugin: MegatronLMPlugin | None = None rng_types: list[str | RNGType] | None = None log_with: str | LoggerType | GeneralTracker | list[str | LoggerType | GeneralTracker] | None = None project_dir: str | os.PathLike | None = None project_config: ProjectConfiguration | None = None gradient_accumulation_plugin: GradientAccumulationPlugin | None = None step_scheduler_with_optimizer: bool = True kwargs_handlers: list[KwargsHandler] | None = None dynamo_backend: DynamoBackend | str | None = None dynamo_plugin: TorchDynamoPlugin | None = None deepspeed_plugins: DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None = None )

参数

  • device_placement (bool, optional, defaults to True) — 加速器是否应将对象放在设备上(dataloader、模型等产生的张量...)。
  • mixed_precision (str, optional) — 是否使用混合精度训练。从 “no”、“fp16”、“bf16” 或 “fp8” 中选择。将默认为环境变量 ACCELERATE_MIXED_PRECISION 中的值,该值将使用当前系统的 accelerate 配置中的默认值或通过 accelerate.launch 命令传递的标志。“fp8” 需要安装 transformers-engine。
  • gradient_accumulation_steps (int, optional, default to 1) — 在累积梯度之前应经过的步数。大于 1 的数字应与 Accelerator.accumulate 结合使用。如果未传递,将默认为环境变量 ACCELERATE_GRADIENT_ACCUMULATION_STEPS 中的值。也可以通过 GradientAccumulationPlugin 进行配置。
  • cpu (bool, optional) — 是否强制脚本在 CPU 上执行。如果设置为 True,将忽略可用的 GPU 并强制仅在一个进程上执行。
  • dataloader_config (DataLoaderConfiguration, optional) — dataloader 在分布式场景中应如何处理的配置。
  • deepspeed_plugin (DeepSpeedPlugin or dict of strDeepSpeedPlugin, optional): 使用此参数调整与 DeepSpeed 相关的参数。此参数是可选的,可以直接使用 accelerate config 进行配置。如果使用多个插件,请使用每个插件的配置 key 属性从 accelerator.state.get_deepspeed_plugin(key) 访问它们。deepspeed_plugins 的别名。
  • fsdp_plugin (FullyShardedDataParallelPlugin, optional) — 使用此参数调整与 FSDP 相关的参数。此参数是可选的,可以直接使用 accelerate config 进行配置
  • torch_tp_plugin (TorchTensorParallelPlugin, optional) — 调整你的 torch tensor parallel。此参数是可选的,可以直接使用 accelerate config 进行配置
  • megatron_lm_plugin (MegatronLMPlugin, optional) — 使用此参数调整与 MegatronLM 相关的参数。此参数是可选的,可以直接使用 accelerate config 进行配置
  • rng_types (list of str or RNGType) — 要在准备好的 dataloader 中每次迭代开始时同步的随机数生成器列表。应为以下一项或多项:

    • "torch": 基本 torch 随机数生成器
    • "cuda": CUDA 随机数生成器(仅限 GPU)
    • "xla": XLA 随机数生成器(仅限 TPU)
    • "generator": 采样器(或批量采样器,如果你的 dataloader 中没有采样器)或可迭代数据集(如果存在)的 torch.Generator,如果底层数据集是该类型。

    对于 PyTorch 版本 <=1.5.1,将默认为 ["torch"],对于 PyTorch 版本 >= 1.6,将默认为 ["generator"]

  • log_with (list of str, LoggerType or GeneralTracker, optional) — 要为实验跟踪设置的记录器列表。应为以下一项或多项:

    • "all"
    • "tensorboard"
    • "wandb"
    • "comet_ml" 如果选择 "all",将拾取环境中所有可用的跟踪器并初始化它们。还可以接受 GeneralTracker 的实现以用于自定义跟踪器,并且可以与 "all" 结合使用。
  • project_config (ProjectConfiguration, optional) — 用于处理状态保存方式的配置。
  • project_dir (str, os.PathLike, optional) — 用于存储数据的目录路径,例如本地兼容记录器的日志以及可能保存的检查点。
  • step_scheduler_with_optimizer (bool, optional, defaults to True) — 如果学习率调度器与优化器同时进行步进,则设置为 True;如果仅在特定情况下(例如在每个 epoch 结束时)执行,则设置为 False
  • kwargs_handlers (list of KwargsHandler, optional) — KwargsHandler 的列表,用于自定义与分布式训练、性能分析或混合精度相关的对象的创建方式。有关更多信息,请参见 kwargs
  • dynamo_backend (strDynamoBackend, optional, defaults to "no") — 设置为可能的 dynamo 后端之一,以使用 torch dynamo 优化您的训练。
  • dynamo_plugin (TorchDynamoPlugin, optional) — 用于配置 torch dynamo 应如何处理,如果需要比 backendmode 更多的调整。
  • gradient_accumulation_plugin (GradientAccumulationPlugin, optional) — 用于配置梯度累积应如何处理,如果需要比 gradient_accumulation_steps 更多的调整。

创建用于分布式训练或混合精度训练的加速器实例。

可用属性

  • device (torch.device) — 要使用的设备。
  • distributed_type (DistributedType) — 分布式训练配置。
  • local_process_index (int) — 当前机器上的进程索引。
  • mixed_precision (str) — 配置的混合精度模式。
  • num_processes (int) — 用于训练的总进程数。
  • optimizer_step_was_skipped (bool) — 优化器更新是否被跳过(由于混合精度中的梯度溢出),在这种情况下,不应更改学习率。
  • process_index (int) — 所有进程中当前进程的总体索引。
  • state (AcceleratorState) — 分布式设置状态。
  • sync_gradients (bool) — 当前是否正在跨所有进程同步梯度。
  • use_distributed (bool) — 当前配置是否用于分布式训练。

accumulate

< >

( *models )

参数

  • *models (list of torch.nn.Module) — 使用 Accelerator.prepare 准备的 PyTorch 模块列表。传递给 accumulate() 的模型将在分布式训练的反向传播期间跳过梯度同步

一个上下文管理器,它将轻微包装并自动执行梯度累积

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=1)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)

>>> for input, output in dataloader:
...     with accelerator.accumulate(model):
...         outputs = model(input)
...         loss = loss_func(outputs)
...         loss.backward()
...         optimizer.step()
...         scheduler.step()
...         optimizer.zero_grad()

autocast

< >

( autocast_handler: AutocastKwargs = None )

如果启用了自动混合精度,将在该上下文管理器的块内应用自动混合精度。否则,不会发生任何不同的事情。

可以传入不同的 autocast_handler 来覆盖在 Accelerator 对象中设置的处理器。这在 autocast 下您想要恢复为 fp32 的块中很有用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(mixed_precision="fp16")
>>> with accelerator.autocast():
...     train()

backward

< >

( loss **kwargs )

根据 GradientAccumulationPlugin 缩放梯度,并根据配置调用正确的 backward()

应代替 loss.backward() 使用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> outputs = model(inputs)
>>> loss = loss_fn(outputs, labels)
>>> accelerator.backward(loss)

check_trigger

< >

( )

检查任何进程中的内部触发张量是否已设置为 1。如果是,将返回 True 并将触发张量重置为 0。

注意:不需要 wait_for_everyone()

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
...     accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_trigger():
...     break

clear

< >

( *objects )

Accelerate.free_memory 的别名,释放对存储的内部对象的所有引用,并调用垃圾回收器。您应该在两个使用不同模型/优化器的训练之间调用此方法。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, scheduler = ...
>>> model, optimizer, scheduler = accelerator.prepare(model, optimizer, scheduler)
>>> model, optimizer, scheduler = accelerator.clear(model, optimizer, scheduler)

clip_grad_norm_

< >

( parameters max_norm norm_type = 2 ) torch.Tensor

返回

torch.Tensor

参数梯度的总范数(视为单个向量)。

应代替 torch.nn.utils.clip_grad_norm_ 使用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)

>>> for input, target in dataloader:
...     optimizer.zero_grad()
...     output = model(input)
...     loss = loss_func(output, target)
...     accelerator.backward(loss)
...     if accelerator.sync_gradients:
...         accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
...     optimizer.step()

clip_grad_value_

< >

( parameters clip_value )

应代替 torch.nn.utils.clip_grad_value_ 使用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)

>>> for input, target in dataloader:
...     optimizer.zero_grad()
...     output = model(input)
...     loss = loss_func(output, target)
...     accelerator.backward(loss)
...     if accelerator.sync_gradients:
...         accelerator.clip_grad_value_(model.parameters(), clip_value)
...     optimizer.step()

end_training

< >

( )

运行任何特殊的结束训练行为,例如仅在主进程上停止跟踪器或销毁进程组。如果使用实验跟踪,则应始终在脚本末尾调用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(log_with="tensorboard")
>>> accelerator.init_trackers("my_project")
>>> # Do training
>>> accelerator.end_training()

free_memory

< >

( *objects )

将释放对存储的内部对象的所有引用,并调用垃圾回收器。您应该在两个使用不同模型/优化器的训练之间调用此方法。 还会将 Accelerator.step 重置为 0。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, scheduler = ...
>>> model, optimizer, scheduler = accelerator.prepare(model, optimizer, scheduler)
>>> model, optimizer, scheduler = accelerator.free_memory(model, optimizer, scheduler)

gather

< >

( tensor ) torch.Tensor,或 torch.Tensor 的嵌套元组/列表/字典

参数

  • tensor (torch.Tensor,或 torch.Tensor 的嵌套元组/列表/字典) — 要跨所有进程收集的张量。

返回

torch.Tensor,或 torch.Tensor 的嵌套元组/列表/字典

收集的张量。 请注意,结果的第一个维度是num_processes 乘以输入张量的第一个维度。

在所有进程中收集tensor中的值,并将它们连接在第一个维度上。在进行评估时,可用于重新组合来自所有进程的预测。

注意:此收集发生在所有进程中。

示例

>>> # Assuming four processes
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> process_tensor = torch.tensor([accelerator.process_index])
>>> gathered_tensor = accelerator.gather(process_tensor)
>>> gathered_tensor
tensor([0, 1, 2, 3])

gather_for_metrics

< >

( input_data use_gather_object = False )

参数

  • input (torch.Tensor, object, torch.Tensor 的嵌套元组/列表/字典,或 object 的嵌套元组/列表/字典) — 用于跨所有进程计算指标的张量或对象
  • use_gather_object(bool) — 是否强制使用 gather_object 而不是 gather (如果传递的所有对象都不包含张量,则已完成)。 此标志对于收集大小不同的张量很有用,我们不希望沿第一个维度进行填充和连接。 将其与 GPU 张量一起使用效果不佳且效率低下,因为它会产生 GPU -> CPU 传输,因为张量将被 pickle 化。

收集 input_data,并且如果在分布式系统上,则可能会删除最后一批中的重复项。 应用于收集输入和目标以进行指标计算。

示例

>>> # Assuming two processes, with a batch size of 5 on a dataset with 9 samples
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader = torch.utils.data.DataLoader(range(9), batch_size=5)
>>> dataloader = accelerator.prepare(dataloader)
>>> batch = next(iter(dataloader))
>>> gathered_items = accelerator.gather_for_metrics(batch)
>>> len(gathered_items)
9

get_state_dict

< >

( model unwrap = True ) dict

参数

  • model (torch.nn.Module) — 通过 Accelerator.prepare() 发送的 PyTorch 模型
  • unwrap (bool, optional, defaults to True) — 是否返回 model 的原始底层 state_dict,还是返回包装后的 state_dict

返回

dict

模型的 state dictionary,可能没有完整精度。

返回通过 Accelerator.prepare() 发送的模型的 state dictionary,可能没有完整精度。

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> net = torch.nn.Linear(2, 2)
>>> net = accelerator.prepare(net)
>>> state_dict = accelerator.get_state_dict(net)

get_tracker

< >

( name: str unwrap: bool = False ) GeneralTracker

参数

  • name (str) — 跟踪器的名称,对应于 .name 属性。
  • unwrap (bool) — 是否返回内部跟踪机制,还是返回包装后的跟踪器(推荐)。

返回

GeneralTracker

如果存在,则返回与 name 对应的跟踪器。

仅在主进程上,基于 nameself.trackers 返回一个 tracker

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(log_with="tensorboard")
>>> accelerator.init_trackers("my_project")
>>> tensorboard_tracker = accelerator.get_tracker("tensorboard")

join_uneven_inputs

< >

( joinables even_batches = None )

参数

  • joinables (list[torch.distributed.algorithms.Joinable]) — 继承自 torch.distributed.algorithms.Joinable 的模型或优化器列表。最常见的是使用 Accelerator.prepare 为分布式数据并行训练准备的 PyTorch 模块。
  • even_batches (bool, optional) — 如果设置,这将覆盖在 Accelerator 中设置的 even_batches 值。如果未提供,则将使用默认的 Accelerator 值。

一个上下文管理器,用于在不均匀输入上进行分布式训练或评估,它充当 torch.distributed.algorithms.join 的包装器。当总批次大小不能均匀地划分数据集长度时,这非常有用。

join_uneven_inputs 仅支持在多个 GPU 上的分布式数据并行训练。对于任何其他配置,此方法将不起作用。

覆盖 even_batches 不会影响可迭代样式的数据加载器。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(even_batches=True)
>>> ddp_model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)

>>> with accelerator.join_uneven_inputs([ddp_model], even_batches=False):
...     for input, output in dataloader:
...         outputs = model(input)
...         loss = loss_func(outputs)
...         loss.backward()
...         optimizer.step()
...         optimizer.zero_grad()

load_state

< >

( input_dir: str = None **load_model_func_kwargs )

参数

  • input_dir (stros.PathLike) — 保存所有相关权重和状态的文件夹名称。如果使用 automatic_checkpoint_naming,则可以为 None,并将从最新的检查点恢复。
  • load_model_func_kwargs (dict, optional) — 用于加载模型的附加关键字参数,可以传递给底层加载函数,例如 DeepSpeed 的 load_checkpoint 函数的可选参数,或者用于在何处加载模型和优化器的 map_location

加载模型、优化器、scaler、RNG 生成器和已注册对象的当前状态。

应仅与 Accelerator.save_state() 结合使用。如果文件未注册以进行检查点保存,即使存储在目录中也不会被加载。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, lr_scheduler = ...
>>> model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)
>>> accelerator.load_state("my_checkpoint")

local_main_process_first

< >

( )

允许本地主进程进入 with 代码块。

其他进程将在主进程退出后进入 with 代码块。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> with accelerator.local_main_process_first():
...     # This will be printed first by local process 0 then in a seemingly
...     # random order by the other processes.
...     print(f"This will be printed by process {accelerator.local_process_index}")

lomo_backward

< >

( loss: torch.Tensor learning_rate: float )

在 LOMO 优化器上运行反向传播。

main_process_first

< >

( )

允许主进程首先进入 with 代码块。

其他进程将在主进程退出后进入 with 代码块。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> with accelerator.main_process_first():
...     # This will be printed first by process 0 then in a seemingly
...     # random order by the other processes.
...     print(f"This will be printed by process {accelerator.process_index}")

no_sync

< >

( model )

参数

  • model (torch.nn.Module) — 使用 Accelerator.prepare 准备的 PyTorch 模块

一个上下文管理器,用于通过调用 torch.nn.parallel.DistributedDataParallel.no_sync 来禁用跨 DDP 进程的梯度同步。

如果 model 不在 DDP 中,则此上下文管理器不执行任何操作。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)
>>> input_a = next(iter(dataloader))
>>> input_b = next(iter(dataloader))

>>> with accelerator.no_sync():
...     outputs = model(input_a)
...     loss = loss_func(outputs)
...     accelerator.backward(loss)
...     # No synchronization across processes, only accumulate gradients
>>> outputs = model(input_b)
>>> accelerator.backward(loss)
>>> # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()

on_last_process

< >

( function: Callable[..., Any] )

参数

  • function (Callable) — 要装饰的函数。

一个装饰器,它将仅在最后一个进程上运行被装饰的函数。也可以使用 PartialState 类调用。

示例

# Assume we have 4 processes.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_last_process
def print_something():
    print(f"Printed on process {accelerator.process_index}")


print_something()
"Printed on process 3"

on_local_main_process

< >

( function: Callable[..., Any] = None )

参数

  • function (Callable) — 要装饰的函数。

一个装饰器,它将仅在本地主进程上运行被装饰的函数。也可以使用 PartialState 类调用。

示例

# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_local_main_process
def print_something():
    print("This will be printed by process 0 only on each server.")


print_something()
# On server 1:
"This will be printed by process 0 only"
# On server 2:
"This will be printed by process 0 only"

on_local_process

< >

( function: Callable[..., Any] = None local_process_index: int = None )

参数

  • function (Callable, optional) — 要装饰的函数。
  • local_process_index (int, optional) — 在其上运行函数的本地进程的索引。

一个装饰器,它将仅在给定的本地进程索引上运行被装饰的函数。也可以使用 PartialState 类调用。

示例

# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_local_process(local_process_index=2)
def print_something():
    print(f"Printed on process {accelerator.local_process_index}")


print_something()
# On server 1:
"Printed on process 2"
# On server 2:
"Printed on process 2"

on_main_process

< >

( function: Callable[..., Any] = None )

参数

  • function (Callable) — 要装饰的函数。

一个装饰器,它将仅在主进程上运行被装饰的函数。也可以使用 PartialState 类调用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()


>>> @accelerator.on_main_process
... def print_something():
...     print("This will be printed by process 0 only.")


>>> print_something()
"This will be printed by process 0 only"

on_process

< >

( function: Callable[..., Any] = None process_index: int = None )

参数

  • function (Callable, optional) — 要装饰的函数。
  • process_index (int, optional) — 运行函数的进程索引(可选)。

一个装饰器,将仅在给定的进程索引上运行被装饰的函数。 也可以使用 PartialState 类调用。

示例

# Assume we have 4 processes.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_process(process_index=2)
def print_something():
    print(f"Printed on process {accelerator.process_index}")


print_something()
"Printed on process 2"

pad_across_processes

< >

( tensor dim = 0 pad_index = 0 pad_first = False ) torch.Tensor,或 torch.Tensor 的嵌套元组/列表/字典

参数

  • tensor (torch.Tensor 的嵌套列表/元组/字典) — 要收集的数据。
  • dim (int, optional, 默认为 0) — 要填充的维度。
  • pad_index (int, optional, 默认为 0) — 用于填充的值。
  • pad_first (bool, optional, 默认为 False) — 是否在开头或结尾填充。

返回

torch.Tensor,或 torch.Tensor 的嵌套元组/列表/字典

填充后的张量。

递归地填充来自所有设备的张量,使其在嵌套列表/元组/字典中达到相同大小,以便可以安全地收集它们。

示例

>>> # Assuming two processes, with the first processes having a tensor of size 1 and the second of size 2
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.process_index + 1).to(accelerator.device)
>>> padded_tensor = accelerator.pad_across_processes(process_tensor)
>>> padded_tensor.shape
torch.Size([2])

prepare

< >

( *args device_placement = None )

参数

  • *args (对象列表) — 以下类型的任何对象:

    • torch.utils.data.DataLoader:PyTorch 数据加载器
    • torch.nn.Module:PyTorch 模块
    • torch.optim.Optimizer:PyTorch 优化器
    • torch.optim.lr_scheduler.LRScheduler:PyTorch LR 调度器
  • device_placement (list[bool], optional) — 用于自定义是否应为传递的每个对象执行自动设备放置。 需要是与 args 长度相同的列表。 与 DeepSpeed 或 FSDP 不兼容。

准备在 args 中传递的所有对象,以进行分布式训练和混合精度,然后以相同的顺序返回它们。

如果您仅将模型用于推理,而没有任何类型的混合精度,则无需准备模型

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(model, optimizer, data_loader, scheduler)
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> device_placement = [True, True, False, False]
>>> # Will place the first two items passed in automatically to the right device but not the last two.
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(
...     model, optimizer, data_loader, scheduler, device_placement=device_placement
... )

prepare_data_loader

< >

( data_loader: torch.utils.data.DataLoader device_placement = None slice_fn_for_dispatch = None )

参数

  • data_loader (torch.utils.data.DataLoader) — 要准备的原始 PyTorch DataLoader
  • device_placement (bool, optional) — 是否在准备好的数据加载器中将批次放置在正确的设备上。 将默认为 self.device_placement
  • slice_fn_for_dispatch (Callable, optional) -- 如果传递,此函数将用于跨 num_processes 切片张量。 将默认为 [slice_tensors()](/docs/accelerate/v1.6.0/en/package_reference/utilities#accelerate.utils.slice_tensors)。 此参数仅在 dispatch_batches 设置为 True` 时使用,否则将被忽略。

准备用于在任何分布式设置中训练的 PyTorch DataLoader。 建议使用 Accelerator.prepare() 代替。

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> data_loader = torch.utils.data.DataLoader(...)
>>> data_loader = accelerator.prepare_data_loader(data_loader, device_placement=True)

prepare_model

< >

( model: torch.nn.Module device_placement: bool = None evaluation_mode: bool = False )

参数

  • model (torch.nn.Module) — 要准备的 PyTorch 模型。 如果模型仅用于推理,而没有任何类型的混合精度,则无需准备模型
  • device_placement (bool, optional) — 是否将模型放置在正确的设备上。 将默认为 self.device_placement
  • evaluation_mode (bool, optional, 默认为 False) — 是否仅通过应用混合精度和 torch.compile(如果在 Accelerator 对象中配置)来将模型设置为仅评估模式。

准备用于在任何分布式设置中训练的 PyTorch 模型。 建议使用 Accelerator.prepare() 代替。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume a model is defined
>>> model = accelerator.prepare_model(model)

prepare_optimizer

< >

( optimizer: torch.optim.Optimizer device_placement = None )

参数

  • optimizer (torch.optim.Optimizer) — 要准备的原始 PyTorch 优化器
  • device_placement (bool, optional) — 是否将优化器放置在正确的设备上。 将默认为 self.device_placement

准备用于在任何分布式设置中训练的 PyTorch 优化器。 建议使用 Accelerator.prepare() 代替。

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> optimizer = torch.optim.Adam(...)
>>> optimizer = accelerator.prepare_optimizer(optimizer, device_placement=True)

prepare_scheduler

< >

( scheduler: LRScheduler )

参数

  • scheduler (torch.optim.lr_scheduler.LRScheduler) — 要准备的原始 PyTorch 调度器

准备用于在任何分布式设置中训练的 PyTorch 调度器。 建议使用 Accelerator.prepare() 代替。

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> optimizer = torch.optim.Adam(...)
>>> scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, ...)
>>> scheduler = accelerator.prepare_scheduler(scheduler)

print

< >

( *args **kwargs )

print() 的替代品,仅在每台服务器上打印一次。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> accelerator.print("Hello world!")

profile

< >

( profile_handler: ProfileKwargs | None = None )

参数

  • profile_handler (ProfileKwargs, optional) — 用于此上下文管理器的性能分析处理器。如果未传递,将使用在 Accelerator 对象中设置的处理器。

将对上下文管理器内的代码进行性能分析。如果设置了 profile_handler.output_trace_dir,性能分析结果将保存到 Chrome Trace 文件中。

可以传入不同的 profile_handler 来覆盖在 Accelerator 对象中设置的处理器。

示例

# Profile with default settings
from accelerate import Accelerator
from accelerate.utils import ProfileKwargs

accelerator = Accelerator()
with accelerator.profile() as prof:
    train()
accelerator.print(prof.key_averages().table())


# Profile with the custom handler
def custom_handler(prof):
    print(prof.key_averages().table(sort_by="self_cpu_time_total", row_limit=10))


kwargs = ProfileKwargs(schedule_option=dict(wait=1, warmup=1, active=1), on_trace_ready=custom_handler)
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile() as prof:
    for _ in range(10):
        train_iteration()
        prof.step()


# Profile and export to Chrome Trace
kwargs = ProfileKwargs(output_trace_dir="output_trace")
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile():
    train()

reduce

< >

( tensor reduction = 'sum' scale = 1.0 ) torch.Tensor,或 torch.Tensor 的嵌套元组/列表/字典

参数

  • tensor (`torch.Tensor`,或 torch.Tensor 的嵌套元组/列表/字典) — 要在所有进程中归约的张量。
  • reduction (`str`,*可选*,默认为 “sum”) — 归约类型,可以是 ‘sum’、‘mean’ 或 ‘none’ 之一。如果为 ‘none’,则不执行任何操作。
  • scale (`float`,*可选*,默认为 1.0) — 归约后应用的默认缩放值,仅在 XLA 上有效。

返回

torch.Tensor,或 torch.Tensor 的嵌套元组/列表/字典

归约后的张量。

基于 reduction,在所有进程中归约 tensor 中的值。

注意:所有进程都会获得归约后的值。

示例

>>> # Assuming two processes
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.num_processes) + 1 + (2 * accelerator.process_index)
>>> process_tensor = process_tensor.to(accelerator.device)
>>> reduced_tensor = accelerator.reduce(process_tensor, reduction="sum")
>>> reduced_tensor
tensor([4, 6])

register_for_checkpointing

< >

( *objects )

记录 objects,并在 save_stateload_state 期间保存或加载它们。

这些对象应在同一脚本中加载或保存状态时使用。它不适用于在不同的脚本中使用。

每个 object 都必须具有 load_state_dictstate_dict 函数才能被存储。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume `CustomObject` has a `state_dict` and `load_state_dict` function.
>>> obj = CustomObject()
>>> accelerator.register_for_checkpointing(obj)
>>> accelerator.save_state("checkpoint.pt")

register_load_state_pre_hook

< >

( hook: Callable[..., None] ) torch.utils.hooks.RemovableHandle

参数

返回

torch.utils.hooks.RemovableHandle

一个句柄,可用于通过调用 handle.remove() 来移除添加的钩子

注册一个预钩子,使其在 Accelerator.load_state() 中调用 load_checkpoint 之前运行。

钩子应具有以下签名

hook(models: list[torch.nn.Module], input_dir: str) -> None

`models` 参数是在加速器状态中 accelerator._models 下保存的模型,input_dir 参数是传递给 Accelerator.load_state()input_dir 参数。

应仅与 Accelerator.register_save_state_pre_hook() 结合使用。除了模型权重外,还可以用于加载配置。也可以用于使用自定义方法覆盖模型加载。在这种情况下,请确保从模型列表中删除已加载的模型。

register_save_state_pre_hook

< >

( hook: Callable[..., None] ) torch.utils.hooks.RemovableHandle

参数

返回

torch.utils.hooks.RemovableHandle

一个句柄,可用于通过调用 handle.remove() 来移除添加的钩子

注册一个预钩子,使其在 Accelerator.save_state() 中调用 save_checkpoint 之前运行。

钩子应具有以下签名

hook(models: list[torch.nn.Module], weights: list[dict[str, torch.Tensor]], input_dir: str) -> None

`models` 参数是在加速器状态中 accelerator._models 下保存的模型,weights 参数是 models 的状态字典,input_dir 参数是传递给 Accelerator.load_state()input_dir 参数。

应仅与 Accelerator.register_load_state_pre_hook() 结合使用。除了模型权重外,还可以用于保存配置。也可以用于使用自定义方法覆盖模型保存。在这种情况下,请确保从权重列表中删除已加载的权重。

save

< >

( obj f safe_serialization = False )

参数

  • obj (`object`) — 要保存的对象。
  • f (`str` 或 `os.PathLike`) — 保存 obj 内容的位置。
  • safe_serialization (`bool`,*可选*,默认为 `False`) — 是否使用 safetensors 保存 `obj`

每个机器将传递的对象保存到磁盘一次。代替 torch.save 使用。

注意:如果 save_on_each_node 作为 ProjectConfiguration 传入,则每个节点将保存对象一次,而不是仅在主节点上保存一次。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> arr = [0, 1, 2, 3]
>>> accelerator.save(arr, "array.pkl")

save_model

< >

( model: torch.nn.Module save_directory: Union[str, os.PathLike] max_shard_size: Union[int, str] = '10GB' safe_serialization: bool = True )

参数

  • model — (`torch.nn.Module`):要保存的模型。模型可以是包装的或未包装的。
  • save_directory (`str` 或 `os.PathLike`) — 要保存到的目录。如果不存在,将创建该目录。
  • max_shard_size (`int` 或 `str`,*可选*,默认为 `"10GB"`) — 分片前检查点的最大大小。检查点分片的大小将小于此大小。如果表示为字符串,则需要是数字后跟单位(例如 `"5MB"`)。

    如果模型的单个权重大于 max_shard_size,它将位于自己的检查点分片中,该分片将大于 max_shard_size

  • safe_serialization (`bool`,*可选*,默认为 `True`) — 是否使用 safetensors 或传统的 PyTorch 方式(使用 `pickle`)保存模型。

保存模型,以便可以使用 `load_checkpoint_in_model` 重新加载

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model = ...
>>> accelerator.save_model(model, save_directory)

save_state

< >

( output_dir: str = None safe_serialization: bool = True **save_model_func_kwargs )

参数

  • output_dir (`str` 或 `os.PathLike`) — 用于保存所有相关权重和状态的文件夹名称。
  • safe_serialization (`bool`,*可选*,默认为 `True`) — 是否使用 safetensors 或传统的 PyTorch 方式(使用 `pickle`)保存模型。
  • save_model_func_kwargs (`dict`,*可选*) — 用于保存模型的附加关键字参数,可以传递给底层的保存函数,例如 DeepSpeed 的 save_checkpoint 函数的可选参数。

将模型、优化器、scaler、RNG 生成器和注册对象的当前状态保存到一个文件夹中。

如果将启用了 automatic_checkpoint_namingProjectConfiguration 传递给 Accelerator 对象,则检查点将保存到 self.project_dir/checkpoints。如果当前保存的数量大于 total_limit,则会删除最旧的保存。每个检查点都保存在名为 checkpoint_ 的单独文件夹中。

否则,它们将仅保存到 output_dir

仅应在希望在训练期间保存检查点并在同一环境中恢复状态时使用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, lr_scheduler = ...
>>> model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)
>>> accelerator.save_state(output_dir="my_checkpoint")

set_trigger

< >

( )

在当前进程上将内部触发张量设置为 1。后续应使用此张量进行检查,该检查将跨所有进程进行。

注意:不需要 wait_for_everyone()

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
...     accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_breakpoint():
...     break

skip_first_batches

< >

( dataloader num_batches: int = 0 )

参数

  • dataloader (torch.utils.data.DataLoader) — 要跳过批次的数据加载器。
  • num_batches (int, 可选, 默认为 0) — 要跳过的批次数量。

创建一个新的 torch.utils.data.DataLoader,它将有效地跳过前 num_batches 个批次。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> skipped_dataloader = accelerator.skip_first_batches(dataloader, num_batches=2)
>>> # for the first epoch only
>>> for input, target in skipped_dataloader:
...     optimizer.zero_grad()
...     output = model(input)
...     loss = loss_func(output, target)
...     accelerator.backward(loss)
...     optimizer.step()

>>> # subsequent epochs
>>> for input, target in dataloader:
...     optimizer.zero_grad()
...     ...

split_between_processes

< >

( inputs: list | tuple | dict | torch.Tensor apply_padding: bool = False )

参数

  • inputs (list, tuple, torch.Tensor, 或 dict of list/tuple/torch.Tensor) — 要在进程之间拆分的输入。
  • apply_padding (bool, optional, 默认为 False) — 是否通过重复输入的最后一个元素来应用填充,以便所有进程都具有相同数量的元素。当尝试对输出执行诸如 Accelerator.gather() 之类的操作或传入的输入少于进程数时很有用。如果是这样,只需记住之后删除填充的元素。

self.num_processes 之间快速拆分 input,然后可以在该进程上使用。在执行分布式推理时很有用,例如使用不同的提示。

请注意,当使用 dict 时,所有键都需要具有相同数量的元素。

示例

# Assume there are two processes
from accelerate import Accelerator

accelerator = Accelerator()
with accelerator.split_between_processes(["A", "B", "C"]) as inputs:
    print(inputs)
# Process 0
["A", "B"]
# Process 1
["C"]

with accelerator.split_between_processes(["A", "B", "C"], apply_padding=True) as inputs:
    print(inputs)
# Process 0
["A", "B"]
# Process 1
["C", "C"]

trigger_sync_in_backward

< >

( model )

参数

  • model (torch.nn.Module) — 要触发梯度同步的模型。

Accelerator.no_sync 下的多次前向传递之后,触发模型下一次反向传递中的梯度同步(仅适用于多 GPU 场景)。

如果脚本未在分布式模式下启动,则此上下文管理器不执行任何操作。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)

>>> with accelerator.no_sync():
...     loss_a = loss_func(model(input_a))  # first forward pass
...     loss_b = loss_func(model(input_b))  # second forward pass
>>> accelerator.backward(loss_a)  # No synchronization across processes, only accumulate gradients
>>> with accelerator.trigger_sync_in_backward(model):
...     accelerator.backward(loss_b)  # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()

unscale_gradients

< >

( optimizer = None )

参数

  • optimizer (torch.optim.Optimizerlist[torch.optim.Optimizer], 可选) — 要取消缩放梯度的优化器。如果未设置,则将取消缩放传递给 prepare() 的所有优化器的梯度。

在使用 AMP 进行混合精度训练时取消缩放梯度。这在所有其他设置中都是空操作。

可能应该通过 Accelerator.clipgrad_norm()Accelerator.clipgrad_value() 调用

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer = accelerator.prepare(model, optimizer)
>>> outputs = model(inputs)
>>> loss = loss_fn(outputs, labels)
>>> accelerator.backward(loss)
>>> accelerator.unscale_gradients(optimizer=optimizer)

unwrap_model

< >

( model keep_fp32_wrapper: bool = True keep_torch_compile: bool = True ) torch.nn.Module

参数

  • model (torch.nn.Module) — 要解包的模型。
  • keep_fp32_wrapper (bool, 可选, 默认为 True) — 如果添加了混合精度钩子,是否不删除它。
  • keep_torch_compile (bool, 可选, 默认为 True) — 如果模型已编译,是否不解包已编译的模型。

返回

torch.nn.Module

解包后的模型。

prepare() 可能添加的附加层中解包 model。在保存模型之前很有用。

示例

>>> # Assuming two GPU processes
>>> from torch.nn.parallel import DistributedDataParallel
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model = accelerator.prepare(MyModel())
>>> print(model.__class__.__name__)
DistributedDataParallel

>>> model = accelerator.unwrap_model(model)
>>> print(model.__class__.__name__)
MyModel

verify_device_map

< >

( model: torch.nn.Module )

验证 model 是否未使用类似于 auto 的设备映射通过大型模型推理进行准备。

wait_for_everyone

< >

( )

将停止当前进程的执行,直到每个其他进程都到达该点(因此,当脚本仅在一个进程中运行时,这不会执行任何操作)。在保存模型之前执行此操作很有用。

示例

>>> # Assuming two GPU processes
>>> import time
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> if accelerator.is_main_process:
...     time.sleep(2)
>>> else:
...     print("I'm waiting for the main process to finish its sleep...")
>>> accelerator.wait_for_everyone()
>>> # Should print on every process at the same time
>>> print("Everyone is here")

实用工具

accelerate.utils.gather_object

< >

( object: typing.Any )

参数

  • object (可 pickle 对象的嵌套列表/元组/字典) — 要收集的数据。

递归地从所有设备收集嵌套列表/元组/字典中的对象。

< > 在 GitHub 上更新