Accelerate 文档
加速器
并获得增强的文档体验
开始使用
Accelerator
Accelerator 是用于在任何类型的训练设置中启用分布式训练的主类。请阅读将 Accelerator 添加到您的代码中教程,以了解有关如何将 Accelerator 添加到脚本的更多信息。
Accelerator
class accelerate.Accelerator
< 源码 >( device_placement: bool = True split_batches: bool = <object object at 0x7f9bc0a7a0f0> mixed_precision: PrecisionType | str | None = None gradient_accumulation_steps: int = 1 cpu: bool = False dataloader_config: DataLoaderConfiguration | None = None deepspeed_plugin: DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None = None fsdp_plugin: FullyShardedDataParallelPlugin | None = None torch_tp_plugin: TorchTensorParallelPlugin | None = None megatron_lm_plugin: MegatronLMPlugin | None = None rng_types: list[str | RNGType] | None = None log_with: str | LoggerType | GeneralTracker | list[str | LoggerType | GeneralTracker] | None = None project_dir: str | os.PathLike | None = None project_config: ProjectConfiguration | None = None gradient_accumulation_plugin: GradientAccumulationPlugin | None = None step_scheduler_with_optimizer: bool = True kwargs_handlers: list[KwargsHandler] | None = None dynamo_backend: DynamoBackend | str | None = None dynamo_plugin: TorchDynamoPlugin | None = None deepspeed_plugins: DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None = None parallelism_config: ParallelismConfig | None = None )
参数
- device_placement (
bool
,可选,默认为True
) — Accelerator 是否应将对象(数据加载器生成的张量、模型等)放置在设备上。 - mixed_precision (
str
,可选) — 是否使用混合精度训练。可选择 'no'、'fp16'、'bf16' 或 'fp8'。将默认为环境变量ACCELERATE_MIXED_PRECISION
中的值,该值将使用当前系统的 Accelerate 配置中的默认值或通过accelerate.launch
命令传递的标志。'fp8' 需要安装 transformers-engine。 - gradient_accumulation_steps (
int
,可选,默认为 1) — 在累积梯度之前应经过的步骤数。大于 1 的数字应与Accelerator.accumulate
结合使用。如果未传递,将默认为环境变量ACCELERATE_GRADIENT_ACCUMULATION_STEPS
中的值。也可以通过GradientAccumulationPlugin
进行配置。 - cpu (
bool
,可选) — 是否强制脚本在 CPU 上执行。如果设置为True
,将忽略可用的 GPU,并强制在单个进程上执行。 - dataloader_config (
DataLoaderConfiguration
,可选) — 用于配置在分布式场景中如何处理数据加载器的配置。 - deepspeed_plugin (DeepSpeedPlugin 或
str
的字典 — DeepSpeedPlugin,可选):使用此参数调整与 DeepSpeed 相关的参数。此参数是可选的,可以直接使用 *accelerate config* 进行配置。如果使用多个插件,请使用每个插件配置的key
属性从accelerator.state.get_deepspeed_plugin(key)
访问它们。是deepspeed_plugins
的别名。 - fsdp_plugin (FullyShardedDataParallelPlugin,可选) — 使用此参数调整与 FSDP 相关的参数。此参数是可选的,可以直接使用 *accelerate config* 进行配置。
- torch_tp_plugin (
TorchTensorParallelPlugin
,可选) — 已弃用:请改用带有tp_size
的parallelism_config
。 - megatron_lm_plugin (MegatronLMPlugin,可选) — 使用此参数调整与 MegatronLM 相关的参数。此参数是可选的,可以直接使用 *accelerate config* 进行配置。
- rng_types (
str
或 RNGType 列表) — 在您准备的数据加载器的每次迭代开始时要同步的随机数生成器列表。应为以下一项或多项:"torch"
:基础 torch 随机数生成器"cuda"
:CUDA 随机数生成器(仅限 GPU)"xla"
:XLA 随机数生成器(仅限 TPU)"generator"
:采样器的torch.Generator
(如果数据加载器中没有采样器,则为批采样器),或者如果底层数据集是该类型,则是可迭代数据集的torch.Generator
(如果存在)。
对于 PyTorch 版本 <=1.5.1,将默认为
["torch"]
,对于 PyTorch 版本 >= 1.6,将默认为["generator"]
。 - log_with (
str
、LoggerType 或 GeneralTracker 列表,可选) — 为实验跟踪设置的日志记录器列表。应为以下一项或多项:"all"
"tensorboard"
"wandb"
"trackio"
"aim"
"comet_ml"
"mlflow"
"dvclive"
"swanlab"
如果选择"all"
,将选择环境中所有可用的跟踪器并初始化它们。也可以接受GeneralTracker
的实现以用于自定义跟踪器,并且可以与"all"
结合使用。
- project_config (ProjectConfiguration,可选) — 用于配置如何处理状态保存的配置。
- project_dir (
str
,os.PathLike
,可选) — 用于存储数据(如本地兼容日志记录器的日志和可能保存的检查点)的目录路径。 - step_scheduler_with_optimizer (
bool
,可选,默认为True
) — 如果学习率调度器与优化器同时更新,则设置为True
;如果仅在特定情况下(例如每个 epoch 结束时)更新,则设置为False
。 - kwargs_handlers (KwargsHandler 列表,可选) — KwargsHandler 列表,用于自定义如何创建与分布式训练、性能分析或混合精度相关的对象。有关更多信息,请参阅 kwargs。
- dynamo_backend (
str
或 DynamoBackend,可选,默认为"no"
) — 设置为可能的 dynamo 后端之一,以使用 torch dynamo 优化您的训练。 - dynamo_plugin (TorchDynamoPlugin,可选) — 用于配置如何处理 torch dynamo 的配置,如果需要的调整不仅仅是
backend
或mode
。 - gradient_accumulation_plugin (GradientAccumulationPlugin,可选) — 用于配置如何处理梯度累积的配置,如果需要的调整不仅仅是
gradient_accumulation_steps
。
为分布式训练或混合精度训练创建 accelerator 的实例。
可用属性
- device (
torch.device
) — 要使用的设备。 - distributed_type (DistributedType) — 分布式训练配置。
- local_process_index (
int
) — 当前机器上的进程索引。 - mixed_precision (
str
) — 配置的混合精度模式。 - num_processes (
int
) — 用于训练的总进程数。 - optimizer_step_was_skipped (
bool
) — 优化器更新是否被跳过(因为混合精度中的梯度溢出),在这种情况下学习率不应改变。 - process_index (
int
) — 当前进程在所有进程中的总索引。 - state (AcceleratorState) — 分布式设置状态。
- sync_gradients (
bool
) — 梯度当前是否在所有进程中同步。 - use_distributed (
bool
) — 当前配置是否用于分布式训练。
accumulate
< 源码 >( *models )
一个上下文管理器,它将轻量级地包装并自动执行梯度累积。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(gradient_accumulation_steps=1)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> for input, output in dataloader:
... with accelerator.accumulate(model):
... outputs = model(input)
... loss = loss_func(outputs)
... loss.backward()
... optimizer.step()
... scheduler.step()
... optimizer.zero_grad()
如果启用了自动混合精度,将在此上下文管理器内的块中应用它。否则不会发生任何变化。
可以传入一个不同的 autocast_handler
来覆盖在 Accelerator
对象中设置的那个。这在 autocast
下的块中非常有用,当你想要恢复到 fp32 时。
根据 GradientAccumulationPlugin
缩放梯度,并根据配置调用正确的 backward()
。
应该用来替代 loss.backward()
。
检查内部触发张量在任何进程中是否被设置为 1。如果是,将返回 True
并将触发张量重置为 0。
注意:不需要 wait_for_everyone()
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
... accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_trigger():
... break
Accelerate.free_memory
的别名,释放对存储的内部对象的所有引用并调用垃圾回收器。您应该在两次使用不同模型/优化器的训练之间调用此方法。
clip_grad_norm_
< 源码 >( parameters max_norm norm_type = 2 ) → torch.Tensor
返回
torch.Tensor
参数梯度的总范数(视为单个向量)。
应该用来代替 torch.nn.utils.clip_grad_norm_
。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> for input, target in dataloader:
... optimizer.zero_grad()
... output = model(input)
... loss = loss_func(output, target)
... accelerator.backward(loss)
... if accelerator.sync_gradients:
... accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
... optimizer.step()
应该用来代替 torch.nn.utils.clip_grad_value_
。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> for input, target in dataloader:
... optimizer.zero_grad()
... output = model(input)
... loss = loss_func(output, target)
... accelerator.backward(loss)
... if accelerator.sync_gradients:
... accelerator.clip_grad_value_(model.parameters(), clip_value)
... optimizer.step()
运行任何特殊的训练结束行为,例如仅在主进程上停止跟踪器或销毁进程组。如果使用实验跟踪,应始终在脚本结束时调用。
将释放对存储的内部对象的所有引用并调用垃圾回收器。您应该在两次使用不同模型/优化器的训练之间调用此方法。同时会将 Accelerator.step
重置为 0。
gather
< 源码 >( tensor ) → torch.Tensor
,或嵌套的元组/列表/字典的 torch.Tensor
在所有进程中收集 *tensor* 中的值,并在第一个维度上进行连接。在进行评估时,用于重新组合所有进程的预测非常有用。
注意:此收集操作在所有进程中进行。
gather_for_metrics
< 源码 >( input_data use_gather_object = False )
收集 input_data
,并在分布式系统上可能删除最后一个批次中的重复项。应用于收集用于指标计算的输入和目标。
示例
>>> # Assuming two processes, with a batch size of 5 on a dataset with 9 samples
>>> import torch
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader = torch.utils.data.DataLoader(range(9), batch_size=5)
>>> dataloader = accelerator.prepare(dataloader)
>>> batch = next(iter(dataloader))
>>> gathered_items = accelerator.gather_for_metrics(batch)
>>> len(gathered_items)
9
get_state_dict
< 源码 >( model unwrap = True ) → dict
参数
- model (
torch.nn.Module
) — 通过 Accelerator.prepare() 传递的 PyTorch 模型。 - unwrap (
bool
,可选,默认为True
) — 是否返回model
的原始底层 state_dict,还是返回包装后的 state_dict。
返回
字典
可能不包含完整精度的模型的 state_dict。
返回通过 Accelerator.prepare() 传递的模型的 state_dict,可能不包含完整精度。
get_tracker
< 源码 >( name: str unwrap: bool = False ) → GeneralTracker
仅在主进程中,根据 name
从 self.trackers
返回一个 tracker
。
join_uneven_inputs
< 源码 >( joinables even_batches = None )
一个上下文管理器,它有助于在不均匀输入上进行分布式训练或评估,作为 torch.distributed.algorithms.join
的包装器。当总批次大小不能被数据集长度整除时,这很有用。
join_uneven_inputs
仅支持在多个 GPU 上进行分布式数据并行训练。对于任何其他配置,此方法将没有效果。
覆盖 even_batches
不会影响可迭代式数据加载器。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(even_batches=True)
>>> ddp_model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)
>>> with accelerator.join_uneven_inputs([ddp_model], even_batches=False):
... for input, output in dataloader:
... outputs = model(input)
... loss = loss_func(outputs)
... loss.backward()
... optimizer.step()
... optimizer.zero_grad()
load_state
< 源码 >( input_dir: str = None load_kwargs: dict | None = None **load_model_func_kwargs )
参数
- input_dir (
str
或os.PathLike
) — 所有相关权重和状态保存的文件夹名称。如果使用了automatic_checkpoint_naming
,则可以为None
,并将从最新的检查点加载。 - load_kwargs (
dict
, 可选) — 传递给底层load
函数的附加关键字参数,例如 state_dict 和 optimizer 的可选参数。 - load_model_func_kwargs (
dict
, 可选) — 用于加载模型的附加关键字参数,可以传递给底层加载函数,例如 DeepSpeed 的load_checkpoint
函数的可选参数或用于加载模型和优化器的map_location
。
加载模型、优化器、缩放器、RNG 生成器和已注册对象的当前状态。
应仅与 Accelerator.save_state() 结合使用。如果某个文件未注册用于检查点,则即使它存储在目录中也不会被加载。
让本地主进程先进入 with 块。
其他进程将在主进程退出后进入 with 块。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> with accelerator.local_main_process_first():
... # This will be printed first by local process 0 then in a seemingly
... # random order by the other processes.
... print(f"This will be printed by process {accelerator.local_process_index}")
在 LOMO 优化器上运行反向传播。
让主进程先进入 with 块。
其他进程将在主进程退出后进入 with 块。
maybe_context_parallel
< 源码 >( buffers: list[torch.Tensor] | None = None buffer_seq_dims: list[int] | None = None no_restore_buffers: set[torch.Tensor] | None = None )
参数
- buffers (
list[torch.Tensor]
,可选
) — 将沿序列维度进行分片的缓冲区。常见示例包括输入、标签或位置嵌入缓冲区。此上下文管理器将就地修改这些缓冲区,退出上下文后,缓冲区将恢复到其原始状态。为避免不必要的恢复,您可以使用no_restore_buffers
指定哪些缓冲区不需要恢复。 - buffer_seq_dims (
list[int]
,可选
) —buffers
的序列维度。 - no_restore_buffers (
set[torch.Tensor]
,可选
) — 此集合必须是buffers
的子集。指定在上下文退出后,buffers
参数中的哪些缓冲区将不会被恢复。这些缓冲区将保持分片状态。
一个启用上下文并行训练的上下文管理器。
context_parallel
目前仅支持与 FSDP2 一起使用,并且需要 parallelism_config.cp_size
>
- 如果这些条件中的任何一个不满足,此上下文管理器将没有效果,但为了减少代码更改,它不会引发异常。
此上下文管理器必须在每个训练步骤中重新创建,如下面的示例所示。
一个上下文管理器,通过调用 torch.nn.parallel.DistributedDataParallel.no_sync
来禁用 DDP 进程间的梯度同步。
如果 model
不处于 DDP 模式,此上下文管理器不执行任何操作。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)
>>> input_a = next(iter(dataloader))
>>> input_b = next(iter(dataloader))
>>> with accelerator.no_sync():
... outputs = model(input_a)
... loss = loss_func(outputs)
... accelerator.backward(loss)
... # No synchronization across processes, only accumulate gradients
>>> outputs = model(input_b)
>>> accelerator.backward(loss)
>>> # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()
一个装饰器,它将仅在最后一个进程上运行被装饰的函数。也可以使用 PartialState
类调用。
on_local_main_process
< 源码 >( function: Callable[..., Any] = None )
一个装饰器,它将仅在本地主进程上运行被装饰的函数。也可以使用 PartialState
类调用。
示例
# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator
accelerator = Accelerator()
@accelerator.on_local_main_process
def print_something():
print("This will be printed by process 0 only on each server.")
print_something()
# On server 1:
"This will be printed by process 0 only"
# On server 2:
"This will be printed by process 0 only"
on_local_process
< 源码 >( function: Callable[..., Any] = None local_process_index: int = None )
一个装饰器,它将仅在给定的本地进程索引上运行被装饰的函数。也可以使用 PartialState
类调用。
示例
# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator
accelerator = Accelerator()
@accelerator.on_local_process(local_process_index=2)
def print_something():
print(f"Printed on process {accelerator.local_process_index}")
print_something()
# On server 1:
"Printed on process 2"
# On server 2:
"Printed on process 2"
一个装饰器,它将仅在主进程上运行被装饰的函数。也可以使用 PartialState
类调用。
on_process
< 源码 >( function: Callable[..., Any] = None process_index: int = None )
一个装饰器,它将仅在给定的进程索引上运行被装饰的函数。也可以使用 PartialState
类调用。
pad_across_processes
< 源码 >( tensor dim = 0 pad_index = 0 pad_first = False ) → torch.Tensor
或嵌套的元组/列表/字典的 torch.Tensor
递归地将嵌套的列表/元组/字典中的张量从所有设备填充到相同的大小,以便它们可以安全地被收集。
示例
>>> # Assuming two processes, with the first processes having a tensor of size 1 and the second of size 2
>>> import torch
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.process_index + 1).to(accelerator.device)
>>> padded_tensor = accelerator.pad_across_processes(process_tensor)
>>> padded_tensor.shape
torch.Size([2])
prepare
< 源码 >( *args device_placement = None )
参数
- *args (对象列表) — 以下任何类型的对象:
torch.utils.data.DataLoader
: PyTorch Dataloadertorch.nn.Module
: PyTorch Moduletorch.optim.Optimizer
: PyTorch Optimizertorch.optim.lr_scheduler.LRScheduler
: PyTorch LR Scheduler
- device_placement (
list[bool]
, 可选) — 用于自定义是否为每个传递的对象执行自动设备放置。需要是一个与args
长度相同的列表。与 DeepSpeed 或 FSDP 不兼容。
为分布式训练和混合精度准备 args
中传递的所有对象,然后按相同顺序返回它们。
如果模型仅用于推理且不使用任何混合精度,则无需准备。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(model, optimizer, data_loader, scheduler)
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> device_placement = [True, True, False, False]
>>> # Will place the first two items passed in automatically to the right device but not the last two.
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(
... model, optimizer, data_loader, scheduler, device_placement=device_placement
... )
prepare_data_loader
< 源码 >( data_loader: torch.utils.data.DataLoader device_placement = None slice_fn_for_dispatch = None )
参数
- data_loader (
torch.utils.data.DataLoader
) — 一个普通的 PyTorch DataLoader,用于准备 - device_placement (
bool
, 可选) — 是否在准备好的 dataloader 中将批次放置在正确的设备上。默认为self.device_placement
。 - slice_fn_for_dispatch (
Callable
, 可选) -- 如果传递,此函数将用于在
num_processes之间切分张量。默认为 [slice_tensors()](/docs/accelerate/v1.10.0/en/package_reference/utilities#accelerate.utils.slice_tensors)。此参数仅在
dispatch_batches设置为
True` 时使用,否则将被忽略。
为任何分布式设置中的训练准备一个 PyTorch DataLoader。建议改用 Accelerator.prepare()。
prepare_model
< 源码 >( model: torch.nn.Module device_placement: bool = None evaluation_mode: bool = False )
为任何分布式设置中的训练准备一个 PyTorch 模型。建议改用 Accelerator.prepare()。
prepare_optimizer
< 源码 >( optimizer: torch.optim.Optimizer device_placement = None )
为任何分布式设置中的训练准备 PyTorch 优化器。建议改用 Accelerator.prepare()。
prepare_scheduler
< source >( scheduler: LRScheduler )
为任何分布式设置中的训练准备 PyTorch 调度器。建议改用 Accelerator.prepare()。
print()
的直接替代品,每台服务器只打印一次。
profile
< source >( profile_handler: ProfileKwargs | None = None )
将对上下文管理器内的代码进行性能分析。如果设置了 profile_handler.output_trace_dir
,性能分析结果将保存到 Chrome Trace 文件中。
可以传入一个不同的 profile_handler
来覆盖 Accelerator
对象中设置的处理器。
示例
# Profile with default settings
from accelerate import Accelerator
from accelerate.utils import ProfileKwargs
accelerator = Accelerator()
with accelerator.profile() as prof:
train()
accelerator.print(prof.key_averages().table())
# Profile with the custom handler
def custom_handler(prof):
print(prof.key_averages().table(sort_by="self_cpu_time_total", row_limit=10))
kwargs = ProfileKwargs(schedule_option=dict(wait=1, warmup=1, active=1), on_trace_ready=custom_handler)
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile() as prof:
for _ in range(10):
train_iteration()
prof.step()
# Profile and export to Chrome Trace
kwargs = ProfileKwargs(output_trace_dir="output_trace")
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile():
train()
reduce
< source >( tensor reduction = 'sum' scale = 1.0 ) → torch.Tensor
,或 torch.Tensor
的嵌套元组/列表/字典
根据 reduction 在所有进程间规约 tensor 中的值。
注意:所有进程都会获得规约后的值。
示例
>>> # Assuming two processes
>>> import torch
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.num_processes) + 1 + (2 * accelerator.process_index)
>>> process_tensor = process_tensor.to(accelerator.device)
>>> reduced_tensor = accelerator.reduce(process_tensor, reduction="sum")
>>> reduced_tensor
tensor([4, 6])
记录 objects
,并在 save_state
或 load_state
期间保存或加载它们。
当在同一脚本中加载或保存状态时应使用此功能。它不适用于在不同脚本中使用。
每个 object
必须具有 load_state_dict
和 state_dict
函数才能被存储。
register_load_state_pre_hook
< source >( hook: Callable[..., None] ) → torch.utils.hooks.RemovableHandle
参数
- hook (
Callable
) — 在 Accelerator.load_state() 中调用load_checkpoint
之前要调用的函数。
返回
torch.utils.hooks.RemovableHandle
一个句柄,可以通过调用 `handle.remove()` 来移除添加的钩子
注册一个预处理钩子,在 Accelerator.load_state() 中调用 load_checkpoint
之前运行。
钩子应具有以下签名:
hook(models: list[torch.nn.Module], input_dir: str) -> None
models
参数是保存在加速器状态 accelerator._models
下的模型,input_dir
参数是传递给 Accelerator.load_state() 的 input_dir
参数。
应仅与 Accelerator.register_save_state_pre_hook() 结合使用。这对于加载模型权重之外的配置很有用。也可用于使用自定义方法覆盖模型加载。在这种情况下,请确保从模型列表中移除已加载的模型。
register_save_state_pre_hook
< source >( hook: Callable[..., None] ) → torch.utils.hooks.RemovableHandle
参数
- hook (
Callable
) — 在 Accelerator.save_state() 中调用save_checkpoint
之前要调用的函数。
返回
torch.utils.hooks.RemovableHandle
一个句柄,可以通过调用 `handle.remove()` 来移除添加的钩子
注册一个预处理钩子,在 Accelerator.save_state() 中调用 save_checkpoint
之前运行。
钩子应具有以下签名:
hook(models: list[torch.nn.Module], weights: list[dict[str, torch.Tensor]], input_dir: str) -> None
models
参数是保存在加速器状态 accelerator._models
下的模型,weights
参数是 models
的状态字典,而 input_dir
参数是传递给 Accelerator.load_state() 的 input_dir
参数。
应仅与 Accelerator.register_load_state_pre_hook() 结合使用。这对于保存模型权重之外的配置很有用。也可用于使用自定义方法覆盖模型保存。在这种情况下,请确保从权重列表中移除已加载的权重。
save
< source >( obj f safe_serialization = False )
每台机器只将传递的对象保存到磁盘一次。用于替代 torch.save
。
注意:如果在 ProjectConfiguration
中传入了 save_on_each_node
,将在每个节点上保存一次对象,而不仅仅是在主节点上保存一次。
save_model
< source >( model: torch.nn.Module save_directory: Union[str, os.PathLike] max_shard_size: Union[int, str] = '10GB' safe_serialization: bool = True )
参数
- model — (
torch.nn.Module
):要保存的模型。模型可以是包装过的或未包装的。 - save_directory (
str
或os.PathLike
) — 要保存到的目录。如果不存在,将会创建。 - max_shard_size (
int
orstr
, optional, 默认为"10GB"
) — 检查点在分片前的最大大小。检查点分片后的大小将小于此值。如果以字符串表示,需要是数字后跟单位(如"5MB"
)。如果模型的单个权重比
max_shard_size
大,它将被放在自己的检查点分片中,该分片将大于max_shard_size
。 - safe_serialization (
bool
, optional, 默认为True
) — 是使用safetensors
还是传统的 PyTorch 方式(使用pickle
)来保存模型。
保存模型,以便可以使用 load_checkpoint_in_model 重新加载
save_state
< source >( output_dir: str = None safe_serialization: bool = True **save_model_func_kwargs )
将模型、优化器、缩放器、RNG 生成器和已注册对象的当前状态保存到一个文件夹中。
如果向 Accelerator
对象传递了启用了 automatic_checkpoint_naming
的 ProjectConfiguration
,则检查点将保存到 self.project_dir/checkpoints
。如果当前保存的数量大于 total_limit
,则会删除最旧的保存。每个检查点保存在名为 checkpoint_<iteration>
的独立文件夹中。
否则,它们只会被保存到 output_dir
。
仅当希望在训练期间保存检查点并在相同环境中恢复状态时使用。
在当前进程上将内部触发张量设置为 1。后续应使用此张量进行检查,该检查将在所有进程间进行。
注意:不需要 wait_for_everyone()
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
... accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_breakpoint():
... break
skip_first_batches
< source >( dataloader num_batches: int = 0 )
创建一个新的 torch.utils.data.DataLoader
,它将高效地跳过前 num_batches
个批次。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> skipped_dataloader = accelerator.skip_first_batches(dataloader, num_batches=2)
>>> # for the first epoch only
>>> for input, target in skipped_dataloader:
... optimizer.zero_grad()
... output = model(input)
... loss = loss_func(output, target)
... accelerator.backward(loss)
... optimizer.step()
>>> # subsequent epochs
>>> for input, target in dataloader:
... optimizer.zero_grad()
... ...
split_between_processes
< source >( inputs: list | tuple | dict | torch.Tensor apply_padding: bool = False )
在 self.num_processes
之间快速分割 input
,然后可以在该进程上使用。在进行分布式推理(例如使用不同提示)时非常有用。
注意,当使用 dict
时,所有键都需要有相同数量的元素。
示例
# Assume there are two processes
from accelerate import Accelerator
accelerator = Accelerator()
with accelerator.split_between_processes(["A", "B", "C"]) as inputs:
print(inputs)
# Process 0
["A", "B"]
# Process 1
["C"]
with accelerator.split_between_processes(["A", "B", "C"], apply_padding=True) as inputs:
print(inputs)
# Process 0
["A", "B"]
# Process 1
["C", "C"]
在 `Accelerator.no_sync` 下多次前向传播后,在模型的下一次反向传播中触发梯度同步(仅适用于多 GPU 场景)。
如果脚本不是在分布式模式下启动,此上下文管理器不执行任何操作。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)
>>> with accelerator.no_sync():
... loss_a = loss_func(model(input_a)) # first forward pass
... loss_b = loss_func(model(input_b)) # second forward pass
>>> accelerator.backward(loss_a) # No synchronization across processes, only accumulate gradients
>>> with accelerator.trigger_sync_in_backward(model):
... accelerator.backward(loss_b) # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()
unscale_gradients
< source >( optimizer = None )
参数
- optimizer (
torch.optim.Optimizer
或list[torch.optim.Optimizer]
, optional) — 需要取消梯度缩放的优化器。如果未设置,将对所有传递给 prepare() 的优化器取消梯度缩放。
在 AMP 混合精度训练中取消梯度缩放。在所有其他设置中,这是一个空操作。
可能应通过 Accelerator.clip_grad_norm_() 或 Accelerator.clip_grad_value_() 调用。
unwrap_model
< source >( model keep_fp32_wrapper: bool = True keep_torch_compile: bool = True ) → torch.nn.Module
从 prepare() 可能添加的额外层中解包 model
。在保存模型之前很有用。
示例
>>> # Assuming two GPU processes
>>> from torch.nn.parallel import DistributedDataParallel
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> model = accelerator.prepare(MyModel())
>>> print(model.__class__.__name__)
DistributedDataParallel
>>> model = accelerator.unwrap_model(model)
>>> print(model.__class__.__name__)
MyModel
验证 model
未使用类似于 auto
的设备映射进行大模型推理的准备。
将停止当前进程的执行,直到所有其他进程都到达该点(因此当脚本仅在一个进程中运行时,此操作无效)。在保存模型之前很有用。
示例
>>> # Assuming two GPU processes
>>> import time
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> if accelerator.is_main_process:
... time.sleep(2)
>>> else:
... print("I'm waiting for the main process to finish its sleep...")
>>> accelerator.wait_for_everyone()
>>> # Should print on every process at the same time
>>> print("Everyone is here")
实用工具
accelerate.utils.gather_object
< source >( object: typing.Any )
从所有设备递归地收集嵌套列表/元组/字典中的对象。