Transformers 文档
张量并行
并获得增强的文档体验
开始使用
张量并行
张量并行将模型层切分成若干片段,以便多个硬件加速器同时处理。这使您能够运行超出单块 GPU 内存容量的模型,并实现更高的吞吐量。由于 GPU 在每一层都需要交换部分结果,因此您需要快速的节点内通信。
下表列出了原生支持张量并行的模型。您可以开启 GitHub Issue 或提交 Pull Request 来添加对其他模型的支持。
显示支持的模型
本指南介绍了如何在 Transformers 中启用张量并行以及可用的分区策略。
模型分区
当模型拥有 tp_plan 时,Transformers 会启用张量并行。您可以从两种分区方法中进行选择。
- 设置
tp_plan="auto"以基于模型的预定义配置获取自动规划。 - 定义并传入手动的
tp_plan。
import os
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
# model_id = "meta-llama/Llama-4-Scout-17B-16E-Instruct" # better to visualize all the possible strategies
model = AutoModelForCausalLM.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct" , dtype=torch.bfloat16, tp_plan="auto")
print(model._tp_plan)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct")
prompt = "Can I help"
inputs = tokenizer(prompt, return_tensors="pt").input_ids.to(model.device)
# distributed run
outputs = model(inputs)使用 torchrun 启动推理脚本。每 GPU 使用 4 个进程。
torchrun --nproc-per-node 4 demo.py
分区策略
ParallelInterface 类定义了所有分区策略,它将字符串映射到策略实现。您无需直接与该类交互,因为您在 from_pretrained() 中通过 tp_plan 设置策略。它主要用于查看可用策略。
class ParallelInterface(MutableMapping):
"""
Dict-like object keeping track of allowed attention functions. You can easily add a new attention function
with a call to `register()`. If a model needs to locally overwrite an existing attention function, say `sdpa`,
it needs to declare a new instance of this class inside the `modeling_<model>.py`, and declare it on that instance.
"""
_global_mapping = {
"colwise": ColwiseParallel(),
"rowwise": RowwiseParallel(),
"colwise_rep": ColwiseParallel(output_layouts=Replicate()),
"rowwise_rep": RowwiseParallel(input_layouts=Replicate()),
"local_colwise": ColwiseParallel(use_dtensor=False),
"local_rowwise": RowwiseParallel(use_dtensor=False),
"local": IsolatedParallel(),
"moe_tp_experts": MoeTensorParalellExperts(),
"local_packed_rowwise": PackedRowwiseParallel(use_dtensor=False),
"sequence_parallel": SequenceParallel(),
"replicate": ReplicateParallel(),
}下表描述了每种策略。
| 策略 | 描述 |
|---|---|
ColwiseParallel | 按列对权重和偏置进行分区。 |
RowwiseParallel | 按行对权重和偏置进行分区。支持 nn.Embedding 模块分区。 |
SequenceParallel | 支持 LayerNorm 和 Dropout 层的序列并行实现。支持 RMSNorm 的 Python 实现。 |
PackedColwiseParallel | ColwiseParallel 的变体,支持打包权重(例如,将 up_proj 和 gate_proj 打包在一起)。详情请参阅代码。 |
PackedRowwiseParallel | RowwiseParallel 的变体,支持打包权重(详情请参阅代码)。 |
GatherParallel | 跨设备收集模块输出。 |
IsolatedParallel | 将模块与其他设备隔离。用于专家混合(MoE)层中的专家。 |
ReplicateParallel | 在所有设备上复制模块。防止 torch.distributed API 因模型部分分片而报错。 |
打包策略
权重打包将多个线性层合并为一个更大的层。PackedColwiseParallel 和 PackedRowwiseParallel 策略能正确分片打包后的权重,而基础的 ColwiseParallel 或 RowwiseParallel 策略会导致分片错误。
下面的示例将 up_proj 和 gate_proj 打包进一个 gate_up_proj 模块,并需要使用 PackedRowwiseParallel 策略来对 gate_up_proj 进行分片。
class Llama4TextExperts(nn.Module):
...
self.gate_up_proj = nn.Parameter(torch.zeros(self.num_experts, self.hidden_size, 2 * self.expert_dim))在 forward 传递中使用批矩阵乘法来计算 gate_up_proj 模块的输出。
def forward(self, hidden_states):
...
gate_up = torch.bmm(hidden_states, self.gate_up_proj) # Compute the output of the gate_up_proj module
gate, up = gate_up.chunk(2, dim=-1) # Split the output into gate and up查看此注释,了解为什么必须使用
Packed*策略的图示。
本地策略
本地策略(local_colwise、local_rowwise、local_packed_rowwise)不使用 DTensor,因为后者不支持某些操作(如 torch.chunk)。相反,本地策略使用基础的 torch.Tensor 并手动执行分布式逻辑。
自定义分区策略
继承自 TensorParallelLayer 来创建自定义分区策略。实现 partition_tensor、_prepare_input_fn 和 _prepare_output_fn。
在 ParallelInterface 映射中注册该策略,以便在 tp_plan 中指定时,调度逻辑可以找到它。
下面的示例展示了如何使用此工作流实现 ColwiseParallel。
继承
TensorParallelLayer。在__init__方法中,定义input_layouts和output_layouts来描述输入和输出张量应如何放置在设备上。desired_input_layouts属性用于指定输入应如何放置在设备上。class ColwiseParallel(TensorParallelLayer): def __init__( self, *, input_layouts: Optional[Placement] = None, # The input layout coming from the previous layer output_layouts: Optional[Placement] = None, # The output layout we want to achieve use_local_output: bool = True, # Whether to use local output or not use_dtensor=True, # Whether to use DTensor or not ): self.input_layouts = (input_layouts or Replicate(),) # The input sharding coming from the previous layer self.output_layouts = (output_layouts or Shard(-1),) # Desired output sharding self.desired_input_layouts = (Replicate(),) # Desired input sharding, inputs should be replicated across GPUs self.use_local_output = use_local_output self.use_dtensor = use_dtensor实现
partition_tensor、_prepare_input_fn和_prepare_output_fn方法。partition_tensor方法对张量进行分区,并使用分区后的张量填充empty_param。使用辅助函数get_tensor_shard获取给定 rank 下原始参数的正确分片,并使用get_packed_weights处理打包权重。def partition_tensor( self, param, # Full tensor of the parameter empty_param, # Empty tensor of the parameter, will be filled with the partitioned tensor param_type, # Type of the parameter, `bias` or `weight` param_casting_dtype, # The type to cast the parameter to to_contiguous, # Whether to convert the tensor to a contiguous memory layout rank, # The rank of the current device device_mesh, # The device mesh ) -> nn.Parameter: # Return the partitioned parameter ..._prepare_input_fn和_prepare_output_fn方法用于 pre-forward 和 forward 钩子中。它们将输入和输出重新分配到__init__中指定的所需布局。def _prepare_input_fn(input_layouts, desired_input_layouts, mod, inputs, device_mesh): ... # Do some custom logic, cast to DTensor etc. ... return inputs.redistribute(placements=desired_input_layouts, device_mesh=device_mesh) def _prepare_output_fn(output_layouts, use_local_output, mod, outputs, device_mesh): ... # Do some custom logic, cast to DTensor etc. ... return outputs.redistribute(placements=output_layouts, device_mesh=device_mesh)向
ParallelInterface注册策略,以便在tp_plan中启用它。from transformers.integrations.tensor_parallel import ParallelInterface ParallelInterface.register_strategy("colwise_custom", ColwiseParallel) tp_plan = { "model.layers.*.self_attn.q_proj": "colwise_custom", ... } model = AutoModelForCausalLM.from_pretrained(model_id, dtype=torch.bfloat16, tp_plan=tp_plan)
基准测试
张量并行可以显著加速推理,特别是在大批次或长序列的情况下。
此图表展示了在序列长度为 512 的 Llama 模型上进行单次前向传递的预期加速比。

设计实现
Transformers 以框架无关的方式实现了张量并行。它依赖于 DeviceMesh 和来自 torch.distributed 的 DTensor 来提供简单、可扩展的接口。
DeviceMesh
DeviceMesh 创建了一个多维的设备网格进行通信。不同的并行策略需要不同的通信模式。您可以创建一个带有多个子网格的 DeviceMesh 来处理这些模式。
from torch.distributed.device_mesh import init_device_mesh
# Create a 1D mesh of 4 GPUs
device_mesh = init_device_mesh("cuda", (4,), mesh_dim_names=["tp"])大多数 torch.distributed 并行策略适用于网格本身或其子网格。网格会自动处理通信模式。
DTensor
DTensor(分布式张量)在常规张量操作之上处理分布式逻辑。张量并行中的大多数模型权重都存储为 DTensor。
placement 属性告诉 PyTorch 如何在 DeviceMesh 中的设备上放置张量。它接受以下值:
Shard(dimension):在构建DTensor时,跨给定维度在DeviceMesh上对张量进行分片。下例展示了如何对列并行进行跨维权重分片。weight = ... weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(0)]) # Shard across the 1st (column-wise) dimension bias = ... bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Shard(-1)]) # Shard across the ONLY dimension下例展示了如何对行并行进行跨维权重分片。
weight = ... weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(1)]) # Shard across the 2nd (row-wise) dimension bias = ... bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Replicate()]) # Replicate bias across all GPUsReplicate():在DeviceMesh上复制DTensor。它在每个设备上创建张量的完整副本。bias = ... bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Replicate()]) # Replicate bias across all GPUsPartial():表示张量正在等待归约(Reduction)操作(在 Transformers 的使用中通常不相关)。
资源
Ultra-Scale Playbook 中关于张量并行的部分提供了更多细节。
如果您正在使用专家混合(MoE)模型,请查看专家并行指南。这些模型支持张量并行和专家并行。
阅读博文《Transformers 中的张量并行 (TP):5 分钟入门》,快速了解张量并行,并学习列并行和行并行设置的区别。
请参阅张量并行训练指南,了解如何在训练设置中使用它。