加速器
加速器 是启用任何类型训练设置的分布式训练的主要类。阅读将加速器添加到您的代码教程,了解如何将 加速器 添加到您的脚本。
加速器
类 accelerate.加速器
< 源码 >( device_placement: 布尔型 = True split_batches: 布尔型 = mixed_precision: 精确度类型 | 字符串 | None = None gradient_accumulation_steps: 整数 = 1 cpu: 布尔型 = False dataloader_config: DataLoaderConfiguration | None = None deepspeed_plugin: DeepSpeedPlugin | None = None fsdp_plugin: FullyShardedDataParallelPlugin | None = None megatron_lm_plugin: MegatronLMPlugin | None = None rng_types: 字符串列表 | RNGType | None = None log_with: 字符串 | LoggerType | GeneralTracker | 字符串列表 | LoggerType | GeneralTracker | None = None project_dir: 字符串 | os.PathLike | None = None project_config: ProjectConfiguration | None = None gradient_accumulation_plugin: GradientAccumulationPlugin | None = None dispatch_batches: 布尔型 | None = even_batches: 布尔型 = use_seedable_sampler: 布尔型 = step_scheduler_with_optimizer: 布尔型 = True kwargs_handlers: KwargsHandler 列表 | None = None dynamo_backend: DynamoBackend | 字符串 | None = None )
参数
- device_placement (
bool
,可选,默认为True
)—— 是否将加速器置于设备上(数据加载器、模型等生成的张量)。 - mixed_precision (
str
,可选)—— 是否使用混合精度训练。选择‘no’,‘fp16’,‘bf16’或‘fp8’。默认值为环境变量ACCELERATE_MIXED_PRECISION
中的值,该值使用当前系统加速器配置中的默认值或通过accelerate.launch
命令传递的标志。‘fp8’需要安装 transformers-engine。 - gradient_accumulation_steps (
int
,可选,默认为 1)—— 在梯度累积之前应经过的步数。大于 1 的数字应与Accelerator.accumulate
结合使用。如果未传入,则默认为环境变量ACCELERATE_GRADIENT_ACCUMULATION_STEPS
中的值。也可以通过GradientAccumulationPlugin
进行配置。 - cpu (
布尔值
, 可选) — 是否强制脚本在CPU上执行。如果设置为True
,将忽略GPU可用性,并强制在一个进程上执行。 - dataloader_config (
DataLoaderConfiguration
, 可选) — 用于处理分布式场景中数据加载器配置的方式。 - deepspeed_plugin (DeepSpeedPlugin, 可选) — 使用此参数微调与DeepSpeed相关的参数。此参数为可选,可以直接使用accelerate config进行配置
- fsdp_plugin (FullyShardedDataParallelPlugin,可选) — 使用此参数微调与FSDP相关的参数。此参数为可选参数,可以直接使用 accelerate config 进行配置
- megatron_lm_plugin (MegatronLMPlugin,可选) — 使用此参数微调与MegatronLM相关的参数。此参数为可选参数,可以直接使用 accelerate config 进行配置
- rng_types (字符串列表或 RNGType) — 在每个迭代开始时同步数据加载器中要同步的随机数生成器列表。应该是以下之一或多个:
"torch"
:基本torch随机数生成器"cuda"
:CUDA随机数生成器(仅限GPU)"xla"
:XLA随机数生成器(仅限TPU)"generator"
:采样器的torch.Generator
(如果没有采样器在数据加载器中则为批采样器)或可迭代数据集的torch.Generator
(如果存在的话),如果底层数据集是那种类型。
对于 PyTorch 版本 <= 1.5.1,默认为
["torch"]
,对于 PyTorch 版本 >= 1.6,默认为["generator"]
。 - log_with (字符串列表,LoggerType 或 GeneralTracker,可选) — 用于实验跟踪的日志记录器列表。可以是以下之一或多个:
"all"
"tensorboard"
"wandb"
"comet_ml"
如果选择"all"
,将选择环境中所有可用的跟踪器并初始化它们。还可以接受GeneralTracker
的实现以进行自定义跟踪器,并且可以与"all"
结合使用。
- project_config (ProjectConfiguration,可选) — 控制如何处理保存状态的配置。
- project_dir (
str
,os.PathLike
,可选) — 存储数据(例如本地兼容的日志记录器的日志和可能保存的检查点)的目录路径。 - step_scheduler_with_optimizer (
布尔值
, 可选, 默认为True
) — 如果在学习率调度器与优化器同时步进时设置为True
,仅在特定情况下(如每个周期的末尾)执行时设置为False
。 - kwargs_handlers (KwargsHandler 的列表, 可选) — 一系列 KwargsHandler,用于自定义与分布式训练、性能分析或混合精度相关对象的创建方式。请参阅 kwargs 获取更多信息。
- dynamo_backend (
字符串
或 DynamoBackend, 可选, 默认为"no"
) — 设置为可能的 Dynamo 后端之一,以优化使用 torch dynamo 的训练。
创建用于分布式训练(在多GPU、TPU)或混合精度训练的加速器实例。
可用的属性
- device (
torch.device
)—— 要使用的设备。 - distributed_type (DistributedType)—— 分布式训练配置。
- local_process_index (
int
)—— 当前机器上的进程索引。 - mixed_precision (
str
)—— 配置的混合精度模式。 - num_processes (
int
)—— 用于训练的总进程数。 - optimizer_step_was_skipped (
bool
)—— 优化器更新是否被跳过(由于混合精度中的梯度溢出),在这种情况下不应更改学习率。 - process_index (
int
)—— 所有进程中当前过程的总体索引。 - state (AcceleratorState)—— 分布式设置状态。
- sync_gradients (
bool
)—— 是否正在将梯度同步到所有进程。 - use_distributed (
bool
)—— 当前配置是否为分布式训练。
累计
自动管理器,将在周围轻量级包装并自动执行梯度累积
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(gradient_accumulation_steps=1)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> for input, output in dataloader:
... with accelerator.accumulate(model):
... outputs = model(input)
... loss = loss_func(outputs)
... loss.backward()
... optimizer.step()
... scheduler.step()
... optimizer.zero_grad()
如果启用,此上下文管理器内的块将应用自动混合精度。否则不会发生任何不同。
可以通过传递不同的 autocast_handler
来覆盖在 Accelerator
对象中设置的值。这对于在 autocast
下想回退到 fp32 的块非常有用。
按照 GradientAccumulationPlugin
的规则缩放梯度并调用正确的 backward()
方法。
应替换 loss.backward()
使用。
检查内部触发张量是否在任何进程中已设置为 1。如果是这样,将返回 True
并将触发张量重置为 0。
注意:不需要 wait_for_everyone()
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
... accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_trigger():
... break
是 Accelerate.free_memory
的别名,释放对内部存储对象的全部引用并调用垃圾收集器。您应该在两种不同的模型/优化器之间的训练之间调用此方法。
clip_grad_norm_
< source >( parameters max_norm norm_type = 2 ) → torch.Tensor
返回值
torch.Tensor
参数梯度的总和(视为单个向量)。
应替换使用 torch.nn.utils.clip_grad_norm_
。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> for input, target in dataloader:
... optimizer.zero_grad()
... output = model(input)
... loss = loss_func(output, target)
... accelerator.backward(loss)
... if accelerator.sync_gradients:
... accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
... optimizer.step()
应替换使用 torch.nn.utils.clip_grad_value_
。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> for input, target in dataloader:
... optimizer.zero_grad()
... output = model(input)
... loss = loss_func(output, target)
... accelerator.backward(loss)
... if accelerator.sync_gradients:
... accelerator.clip_grad_value_(model.parameters(), clip_value)
... optimizer.step()
将释放对所有内部对象的引用并调用垃圾回收器。您应该在两次使用不同模型/优化器的训练之间调用此方法。还将重置 Accelerator.step
为 0。
gather
< 源代码 >( tensor ) → torch.Tensor
, 或嵌套的 torch.Tensor
元组/列表/字典
在所有进程中收集 tensor 中的值并在第一个维度上拼接它们。在评估时可用于重新整理所有进程的预测。
注意:此收集在所有进程中发生。
gather_for_metrics
< 源代码 >( input_data use_gather_object = False )
参数
- input (
torch.Tensor
,object
, a nested tuple/list/dictionary oftorch.Tensor
, or a nested tuple/list/dictionary ofobject
) — 用于在所有进程中计算指标的张量或对象 use_gather_object(bool
) — 是否强制使用 gather_object 而不是 gather(如果传递的所有对象都不包含张量,则会自动进行)。此标志对于汇聚不同大小的张量(我们不想通过第一个维度进行填充和连接)可能很有用。与 GPU 张量一起使用时,不受良好支持且效率低下,因为它会导致 GPU -> CPU 的转移,因为张量将被序列化。
在分布式系统上聚合 input_data
并可能丢弃最后一个批次的重复项。应用于收集指标计算的输入和目标。
示例
>>> # Assuming two processes, with a batch size of 5 on a dataset with 9 samples
>>> import torch
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader = torch.utils.data.DataLoader(range(9), batch_size=5)
>>> dataloader = accelerator.prepare(dataloader)
>>> batch = next(iter(dataloader))
>>> gathered_items = accelerator.gather_for_metrics(batch)
>>> len(gathered_items)
9
get_state_dict
< source >( model unwrap = True ) → dict
参数
- model (
torch.nn.Module
) — 在 Accelerator.prepare() 中通过 PyTorch 模型 - unwrap (
bool
, 可选,默认值为True
) — 是否返回原始的model
的 underlying state_dict 或返回包装的 state_dict
返回值
dict
模型的状态字典,可能不具有完整的精度。
返回通过 Accelerator.prepare() 发送的模型的状态字典,可能不具有完整的精度。
get_tracker
< source >( name: str unwrap: bool = False ) → GeneralTracker
仅在主进程中,根据 name
从 self.trackers
返回一个 tracker
。
join_uneven_inputs
< source >( joinables even_batches = None )
一个上下文管理器,它促进在非均匀输入上的分布式训练或评估,作为 torch.distributed.algorithms.join
的包装器。当总批次大小不能被数据集长度整除时,这很有用。
join_uneven_inputs
只支持在多个 GPU 上的分布式数据并行训练。对于任何其他配置,此方法将没有效果。
覆盖 even_batches
不会影响可迭代的数据加载器。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(even_batches=True)
>>> ddp_model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)
>>> with accelerator.join_uneven_inputs([ddp_model], even_batches=False):
... for input, output in dataloader:
... outputs = model(input)
... loss = loss_func(outputs)
... loss.backward()
... optimizer.step()
... optimizer.zero_grad()
load_state
< source >( input_dir: str = None **load_model_func_kwargs )
加载模型、优化器、缩放器、RNG 生成器和注册对象的当前状态。
应与 Accelerator.save_state() 一起使用。如果没有注册文件用于检查点,则存储在目录中的文件将不会加载。
允许本地主进程进入with块内部。
其他进程将在主进程退出后进入with块。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> with accelerator.local_main_process_first():
... # This will be printed first by local process 0 then in a seemingly
... # random order by the other processes.
... print(f"This will be printed by process {accelerator.local_process_index}")
对LOMO优化器运行反向传播。
允许主进程首先进入with块。
其他进程将在主进程退出后进入with块。
一个上下文管理器,通过调用 torch.nn.parallel.DistributedDataParallel.no_sync
来禁用 DDP 进程间的梯度同步。
如果 model
不在 DDP,此上下文管理器不执行任何操作
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)
>>> input_a = next(iter(dataloader))
>>> input_b = next(iter(dataloader))
>>> with accelerator.no_sync():
... outputs = model(input_a)
... loss = loss_func(outputs)
... accelerator.backward(loss)
... # No synchronization across processes, only accumulate gradients
>>> outputs = model(input_b)
>>> accelerator.backward(loss)
>>> # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()
一个装饰器,仅在某些进程上运行被装饰的函数。也可以通过 PartialState
类使用。
仅在当地主进程中运行装饰函数的装饰器。也可以使用PartialState
类调用。
示例
# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator
accelerator = Accelerator()
@accelerator.on_local_main_process
def print_something():
print("This will be printed by process 0 only on each server.")
print_something()
# On server 1:
"This will be printed by process 0 only"
# On server 2:
"This will be printed by process 0 only"
on_local_process
< 源代码 >( 函数: Callable[..., Any] = None local_process_index: int = None )
一个装饰器,它只将在指定的本地进程索引上运行被装饰的函数。也可以使用 PartialState
类调用来执行。
示例
# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator
accelerator = Accelerator()
@accelerator.on_local_process(local_process_index=2)
def print_something():
print(f"Printed on process {accelerator.local_process_index}")
print_something()
# On server 1:
"Printed on process 2"
# On server 2:
"Printed on process 2"
一个装饰器,仅将在主进程中运行被装饰的函数。也可以通过使用PartialState
类来调用。
on_process
( function: Callable[..., Any] = None process_index: int = None )
参数
一个装饰器,仅将在指定的进程索引上运行的函数上运行。也可以通过使用PartialState
类来调用。
pad_across_processes
< source >( tensor dim = 0 pad_index = 0 pad_first = False ) → torch.Tensor
, or a nested tuple/list/dictionary of torch.Tensor
递归地将所有设备上的张量嵌套列表/元组/字典中的张量填充到同一大小,以便可以安全地聚集成。
示例
>>> # Assuming two processes, with the first processes having a tensor of size 1 and the second of size 2
>>> import torch
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.process_index + 1).to(accelerator.device)
>>> padded_tensor = accelerator.pad_across_processes(process_tensor)
>>> padded_tensor.shape
torch.Size([2])
准备
< source >( *args device_placement = None )
准备在args
中传递的所有对象,以便进行分布式训练和混合精度训练,然后按相同顺序返回它们。
如果只用于推理且不涉及任何类型的混合精度,则不需要准备模型。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(model, optimizer, data_loader, scheduler)
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> device_placement = [True, True, False, False]
>>> # Will place the first to items passed in automatically to the right device but not the last two.
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(
... model, optimizer, data_loader, scheduler, device_placement=device_placement
... )
prepare_data_loader
< 源代码 >( data_loader: torch.utils.data.DataLoader device_placement = None slice_fn_for_dispatch = None )
参数
- data_loader (
torch.utils.data.DataLoader
) ——一个普通的PyTorch DataLoader,用于准备 - device_placement (
bool
, 可选) — 是否在准备的数据加载器中将批次放置在正确的设备上。默认为self.device_placement
。 - slice_fn_for_dispatch (
Callable
, 可选) -- 如果传递了此函数,它将被用于跨
num_processes的张量切片。默认为 slice_tensors()。此参数仅在将
dispatch_batches设置为 True时使用,否则将被忽略。
为在任何分布式设置中训练准备PyTorch DataLoader。建议使用 Accelerator.prepare()。
prepare_model
< source >( model: torch.nn.Module device_placement: bool = None evaluation_mode: bool = False )
为任何分布式设置中的 PyTorch 模型准备训练。建议使用 Accelerator.prepare() 代替。
prepare_optimizer
< 源代码 >( optimizer: torch.optim.Optimizer device_placement = None )
为在任何分布式设置中训练的 PyTorch 优化器做准备。建议使用 Accelerator.prepare()。
prepare_scheduler
< source >( scheduler: LRScheduler )
为在任何分布式设置中训练的 PyTorch 调度器做准备。建议使用 Accelerator.prepare()。
print()
的替换,仅在每个服务器上打印一次。
配置文件
< source >( profile_handler: ProfileKwargs | None = None )
将对上下文管理器内的代码进行配置文件分析。如果设置了profile_handler.output_trace_dir
,则会将配置保存为Chrome Trace文件。
可以传递不同的profile_handler
来覆盖在Accelerator
对象中设置的配置。
示例
# Profile with default settings
from accelerate import Accelerator
from accelerate.utils import ProfileKwargs
accelerator = Accelerator()
with accelerator.profile() as prof:
train()
accelerator.print(prof.key_averages().table())
# Profile with the custom handler
def custom_handler(prof):
print(prof.key_averages().table(sort_by="self_cpu_time_total", row_limit=10))
kwargs = ProfileKwargs(schedule_option=dict(wait=1, warmup=1, active=1), on_trace_ready=custom_handler)
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile() as prof:
for _ in range(10):
train_iteration()
prof.step()
# Profile and export to Chrome Trace
kwargs = ProfileKwargs(output_trace_dir="output_trace")
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile():
train()
降低
< 源代码 >( tensor reduction = 'sum' scale = 1.0 ) → torch.Tensor
, or a nested tuple/list/dictionary of torch.Tensor
参数
- tensor (
torch.Tensor
, or a nested tuple/list/dictionary oftorch.Tensor
) — 在所有进程中要减少的张量。 - reduction (
str
, 可选,默认为“sum”) — 一个减少类型,可以是以下之一:‘sum’、‘mean’或‘none’。如果选择‘none’,则不执行任何操作。 - scale (
float
, optional, defaults to 1.0) — 在reduce操作之后应用默认缩放值,仅在XLA中有效。
返回值
torch.Tensor
, 或嵌套的 torch.Tensor
元组/列表/字典
减少后的张量。
根据reduction在所有进程中减少tensor中的值。
注意:所有进程都获得减少后的值。
示例
>>> # Assuming two processes
>>> import torch
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.num_processes) + 1 + (2 * accelerator.process_index)
>>> process_tensor = process_tensor.to(accelerator.device)
>>> reduced_tensor = accelerator.reduce(process_tensor, reduction="sum")
>>> reduced_tensor
tensor([4, 6])
记录objects
并在save_state
或load_state
期间保存或加载。
应在同一脚本中加载或保存状态时使用此类。它不设计为在不同脚本中使用。
每个object
必须具有load_state_dict
和state_dict
函数才能存储。
register_load_state_pre_hook
< source >( hook: Callable[..., None] ) → torch.utils.hooks.RemovableHandle
参数
- hook (
Callable
) — 在调用 Accelerator.load_state() 之前的函数。
返回值
torch.utils.hooks.RemovableHandle
可以用来通过调用 handle.remove()
删除已添加的钩子的处理器。
在 Accelerator.load_state() 调用 load_checkpoint
之前注册一个前钩。
钩子应具有以下签名
hook(models: list[torch.nn.Module], input_dir: str) -> None
models
参数是存储在加速器状态 accelerator._models
下的模型,而 input_dir
参数是传递给 Accelerator.load_state() 的参数。
应与 Accelerator.register_save_state_pre_hook() 一同使用。可以用于加载配置以及模型权重。也可以用来用自定义方法覆盖模型加载。在这种情况下,请确保从模型列表中删除已加载的模型。
register_save_state_pre_hook
< source >( hook: Callable[..., None] ) → torch.utils.hooks.RemovableHandle
参数
- 钩子 (
Callable
) — 在 Accelerator.save_state() 调用save_checkpoint
之前要调用的函数。
返回值
torch.utils.hooks.RemovableHandle
可以用来通过调用 handle.remove()
删除已添加的钩子的处理器。
在 Accelerator.save_state() 调用 save_checkpoint
之前注册一个钩子。
钩子应具有以下签名
hook
(models: list[torch.nn.Module], weights: list[dict[str, torch.Tensor]], input_dir: str) -> None
models
参数是保存在加速器状态下的模型,位于 accelerator._models
下,weigths
参数是 models
的状态字典,input_dir
参数是传递给 Accelerator.load_state() 的 input_dir
参数。
应与 Accelerator.register_load_state_pre_hook() 一起使用。除了模型权重外,还可以保存配置。还可以用来用自定义方法替换模型保存。在这种情况下,请确保从权重列表中删除已加载的权重。
保存
< source >( obj f safe_serialization = False )
将传递给磁盘的对象保存一次,每个机器。可用作 torch.save
的替代品。
注意:如果将 save_on_each_node
作为 ProjectConfiguration
传递,则在每个节点上保存对象一次,而不是只在前端节点上保存一次。
save_model
< 源代码 >( model: torch.nn.Module save_directory: Union[str, os.PathLike] max_shard_size: Union[int, str] = '10GB' safe_serialization: bool = True )
参数
- save_directory (
str
或os.PathLike
) — 要保存的目录。如果不存在,将会被创建。 - max_shard_size (
int
或str
,可选,默认为"10GB"
) — 保存检查点前的最大大小。然后按此大小以下分块。如果以字符串形式表示,需要是数字后跟一个单位(比如"5MB"
)。如果模型的某个权重比
max_shard_size
大,它将位于自己的检查点块中,该块的大小将大于max_shard_size
。 - safe_serialization (
bool
,可选,默认为True
) — 是否使用safetensors
或传统 PyTorch 方式(使用pickle
)保存模型。
保存模型以便使用 load_checkpoint_in_model 函数重新加载。
save_state
< 源码 >( output_dir: str = None safe_serialization: bool = True **save_model_func_kwargs )
将模型的当前状态、优化器、缩放器、RNG生成器和注册对象保存到一个文件夹中。
如果在Accelerator
对象传递了带有automatic_checkpoint_naming
启用的ProjectConfiguration
,则检查点将保存到self.project_dir/checkpoints
。如果当前保存的数量超过total_limit
,则删除最早的保存。每个检查点将保存在名为checkpoint_<迭代>
的单独文件夹中。
否则,它们只是保存在output_dir
中。
仅在需要在训练期间保存检查点并在同一环境中恢复状态时使用。
将当前进程的内部触发张量设置为1。后来应使用此方法进行检查,该检查将跨越所有进程进行。
注意:不需要 wait_for_everyone()
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
... accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_breakpoint():
... break
跳过第一批次
< source >( dataloader num_batches: int = 0 )
创建一个新的 torch.utils.data.DataLoader
,该加载器将高效地跳过前 num_batches
批次。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> skipped_dataloader = accelerator.skip_first_batches(dataloader, num_batches=2)
>>> # for the first epoch only
>>> for input, target in skipped_dataloader:
... optimizer.zero_grad()
... output = model(input)
... loss = loss_func(output, target)
... accelerator.backward(loss)
... optimizer.step()
>>> # subsequent epochs
>>> for input, target in dataloader:
... optimizer.zero_grad()
... ...
split_between_processes
< 来源 >( inputs: list | tuple | dict | torch.Tensor apply_padding: bool = False )
快速在 self.num_processes
之间分割 input
,然后可以在该进程上使用。在进行分布式推理时很有用,例如使用不同的提示。
注意,当使用 dict
时,所有键都需要有相同数量的元素。
示例
# Assume there are two processes
from accelerate import Accelerator
accelerator = Accelerator()
with accelerator.split_between_processes(["A", "B", "C"]) as inputs:
print(inputs)
# Process 0
["A", "B"]
# Process 1
["C"]
with accelerator.split_between_processes(["A", "B", "C"], apply_padding=True) as inputs:
print(inputs)
# Process 0
["A", "B"]
# Process 1
["C", "C"]
触发模型在多个前进传递并应用 Accelerator.no_sync
后的下一个反向传递中的梯度同步(仅适用于多GPU场景)。
如果脚本不是在分布式模式下启动,则此上下文管理器什么都不做。
示例
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)
>>> with accelerator.no_sync():
... loss_a = loss_func(model(input_a)) # first forward pass
... loss_b = loss_func(model(input_b)) # second forward pass
>>> accelerator.backward(loss_a) # No synchronization across processes, only accumulate gradients
>>> with accelerator.trigger_sync_in_backward(model):
... accelerator.backward(loss_b) # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()
unscale_gradients
( optimizer = None )
参数
- optimizer (
torch.optim.Optimizer
或list[torch.optim.Optimizer]
, 可选) — 要解除梯度失真的优化器。如果没有设置,将在传递给 prepare() 的所有优化器上解除梯度失真。
在混合精度训练中解除梯度失真。在其他设置中该操作为无操作。
通常应通过 Accelerator.clipgrad_norm() 或 Accelerator.clipgrad_value() 调用。
unwrap_model
< source >( model keep_fp32_wrapper: bool = True ) → torch.nn.Module
从 prepare() 可能添加的额外层中解除 model
的包裹。在保存模型之前使用很有用。
示例
>>> # Assuming two GPU processes
>>> from torch.nn.parallel import DistributedDataParallel
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> model = accelerator.prepare(MyModel())
>>> print(model.__class__.__name__)
DistributedDataParallel
>>> model = accelerator.unwrap_model(model)
>>> print(model.__class__.__name__)
MyModel
验证模型没有用类似auto的设备图进行大模型推理而准备。
将停止当前进程的执行,直到每个其他进程都达到该点(因此当脚本只在一个进程中运行时,这个什么也不做)。在保存模型之前做这个很有用。
示例
>>> # Assuming two GPU processes
>>> import time
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> if accelerator.is_main_process:
... time.sleep(2)
>>> else:
... print("I'm waiting for the main process to finish its sleep...")
>>> accelerator.wait_for_everyone()
>>> # Should print on every process at the same time
>>> print("Everyone is here")
实用工具
accelerate.utils.gather_object
< 源 >( object: Any )
递归地从所有设备收集嵌套列表/元组/字典中的对象。