Transformers 文档

张量并行

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

张量并行

张量并行将模型层切分成若干片段,以便多个硬件加速器同时处理。这使您能够运行超出单块 GPU 内存容量的模型,并实现更高的吞吐量。由于 GPU 在每一层都需要交换部分结果,因此您需要快速的节点内通信。

下表列出了原生支持张量并行的模型。您可以开启 GitHub Issue 或提交 Pull Request 来添加对其他模型的支持。

显示支持的模型

本指南介绍了如何在 Transformers 中启用张量并行以及可用的分区策略。

模型分区

当模型拥有 tp_plan 时,Transformers 会启用张量并行。您可以从两种分区方法中进行选择。

  • 设置 tp_plan="auto" 以基于模型的预定义配置获取自动规划。
  • 定义并传入手动的 tp_plan
自动规划
手动规划
import os
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

# model_id = "meta-llama/Llama-4-Scout-17B-16E-Instruct" # better to visualize all the possible strategies
model = AutoModelForCausalLM.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct" , dtype=torch.bfloat16, tp_plan="auto")
print(model._tp_plan)

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct")
prompt = "Can I help"
inputs = tokenizer(prompt, return_tensors="pt").input_ids.to(model.device)

# distributed run
outputs = model(inputs)

使用 torchrun 启动推理脚本。每 GPU 使用 4 个进程。

torchrun --nproc-per-node 4 demo.py

分区策略

ParallelInterface 类定义了所有分区策略,它将字符串映射到策略实现。您无需直接与该类交互,因为您在 from_pretrained() 中通过 tp_plan 设置策略。它主要用于查看可用策略。

class ParallelInterface(MutableMapping):
    """
    Dict-like object keeping track of allowed attention functions. You can easily add a new attention function
    with a call to `register()`. If a model needs to locally overwrite an existing attention function, say `sdpa`,
    it needs to declare a new instance of this class inside the `modeling_<model>.py`, and declare it on that instance.
    """
    _global_mapping = {
        "colwise": ColwiseParallel(),
        "rowwise": RowwiseParallel(),
        "colwise_rep": ColwiseParallel(output_layouts=Replicate()),
        "rowwise_rep": RowwiseParallel(input_layouts=Replicate()),
        "local_colwise": ColwiseParallel(use_dtensor=False),
        "local_rowwise": RowwiseParallel(use_dtensor=False),
        "local": IsolatedParallel(),
        "moe_tp_experts": MoeTensorParalellExperts(),
        "local_packed_rowwise": PackedRowwiseParallel(use_dtensor=False),
        "sequence_parallel": SequenceParallel(),
        "replicate": ReplicateParallel(),
    }

下表描述了每种策略。

策略 描述
ColwiseParallel 按列对权重和偏置进行分区。
RowwiseParallel 按行对权重和偏置进行分区。支持 nn.Embedding 模块分区。
SequenceParallel 支持 LayerNormDropout 层的序列并行实现。支持 RMSNorm 的 Python 实现。
PackedColwiseParallel ColwiseParallel 的变体,支持打包权重(例如,将 up_projgate_proj 打包在一起)。详情请参阅代码
PackedRowwiseParallel RowwiseParallel 的变体,支持打包权重(详情请参阅代码)。
GatherParallel 跨设备收集模块输出。
IsolatedParallel 将模块与其他设备隔离。用于专家混合(MoE)层中的专家。
ReplicateParallel 在所有设备上复制模块。防止 torch.distributed API 因模型部分分片而报错。

打包策略

权重打包将多个线性层合并为一个更大的层。PackedColwiseParallelPackedRowwiseParallel 策略能正确分片打包后的权重,而基础的 ColwiseParallelRowwiseParallel 策略会导致分片错误。

下面的示例将 up_projgate_proj 打包进一个 gate_up_proj 模块,并需要使用 PackedRowwiseParallel 策略来对 gate_up_proj 进行分片。

class Llama4TextExperts(nn.Module):
    ...
    self.gate_up_proj = nn.Parameter(torch.zeros(self.num_experts, self.hidden_size, 2 * self.expert_dim))

forward 传递中使用批矩阵乘法来计算 gate_up_proj 模块的输出。

def forward(self, hidden_states):
    ...
    gate_up = torch.bmm(hidden_states, self.gate_up_proj) # Compute the output of the gate_up_proj module
    gate, up = gate_up.chunk(2, dim=-1) # Split the output into gate and up

查看此注释,了解为什么必须使用 Packed* 策略的图示。

本地策略

本地策略(local_colwiselocal_rowwiselocal_packed_rowwise)不使用 DTensor,因为后者不支持某些操作(如 torch.chunk)。相反,本地策略使用基础的 torch.Tensor 并手动执行分布式逻辑。

自定义分区策略

继承自 TensorParallelLayer 来创建自定义分区策略。实现 partition_tensor_prepare_input_fn_prepare_output_fn

ParallelInterface 映射中注册该策略,以便在 tp_plan 中指定时,调度逻辑可以找到它。

下面的示例展示了如何使用此工作流实现 ColwiseParallel

  1. 继承 TensorParallelLayer。在 __init__ 方法中,定义 input_layoutsoutput_layouts 来描述输入和输出张量应如何放置在设备上。desired_input_layouts 属性用于指定输入应如何放置在设备上。

    class ColwiseParallel(TensorParallelLayer):
        def __init__(
            self,
            *,
            input_layouts: Optional[Placement] = None, # The input layout coming from the previous layer
            output_layouts: Optional[Placement] = None, # The output layout we want to achieve
            use_local_output: bool = True, # Whether to use local output or not
            use_dtensor=True, # Whether to use DTensor or not
        ):
            self.input_layouts = (input_layouts or Replicate(),) # The input sharding coming from the previous layer
            self.output_layouts = (output_layouts or Shard(-1),) # Desired output sharding
            self.desired_input_layouts = (Replicate(),) # Desired input sharding, inputs should be replicated across GPUs
            self.use_local_output = use_local_output
            self.use_dtensor = use_dtensor
  2. 实现 partition_tensor_prepare_input_fn_prepare_output_fn 方法。

    partition_tensor 方法对张量进行分区,并使用分区后的张量填充 empty_param。使用辅助函数 get_tensor_shard 获取给定 rank 下原始参数的正确分片,并使用 get_packed_weights 处理打包权重。

    def partition_tensor(
        self,
        param, # Full tensor of the parameter
        empty_param, # Empty tensor of the parameter, will be filled with the partitioned tensor
        param_type, # Type of the parameter, `bias` or `weight`
        param_casting_dtype, # The type to cast the parameter to
        to_contiguous, # Whether to convert the tensor to a contiguous memory layout
        rank, # The rank of the current device
        device_mesh, # The device mesh
    ) -> nn.Parameter: # Return the partitioned parameter
        ...

    _prepare_input_fn_prepare_output_fn 方法用于 pre-forwardforward 钩子中。它们将输入和输出重新分配到 __init__ 中指定的所需布局。

    def _prepare_input_fn(input_layouts, desired_input_layouts, mod, inputs, device_mesh):
        ...
        # Do some custom logic, cast to DTensor etc.
        ...
        return inputs.redistribute(placements=desired_input_layouts, device_mesh=device_mesh)
    def _prepare_output_fn(output_layouts, use_local_output, mod, outputs, device_mesh):
        ...
        # Do some custom logic, cast to DTensor etc.
        ...
        return outputs.redistribute(placements=output_layouts, device_mesh=device_mesh)
  3. ParallelInterface 注册策略,以便在 tp_plan 中启用它。

    from transformers.integrations.tensor_parallel import ParallelInterface
    
    ParallelInterface.register_strategy("colwise_custom", ColwiseParallel)
    tp_plan = {
        "model.layers.*.self_attn.q_proj": "colwise_custom",
        ...
    }
    model = AutoModelForCausalLM.from_pretrained(model_id, dtype=torch.bfloat16, tp_plan=tp_plan)

基准测试

张量并行可以显著加速推理,特别是在大批次或长序列的情况下。

此图表展示了在序列长度为 512 的 Llama 模型上进行单次前向传递的预期加速比。

设计实现

Transformers 以框架无关的方式实现了张量并行。它依赖于 DeviceMesh 和来自 torch.distributedDTensor 来提供简单、可扩展的接口。

DeviceMesh

DeviceMesh 创建了一个多维的设备网格进行通信。不同的并行策略需要不同的通信模式。您可以创建一个带有多个子网格的 DeviceMesh 来处理这些模式。

from torch.distributed.device_mesh import init_device_mesh

# Create a 1D mesh of 4 GPUs
device_mesh = init_device_mesh("cuda", (4,), mesh_dim_names=["tp"])

大多数 torch.distributed 并行策略适用于网格本身或其子网格。网格会自动处理通信模式。

DTensor

DTensor(分布式张量)在常规张量操作之上处理分布式逻辑。张量并行中的大多数模型权重都存储为 DTensor

placement 属性告诉 PyTorch 如何在 DeviceMesh 中的设备上放置张量。它接受以下值:

  • Shard(dimension):在构建 DTensor 时,跨给定维度在 DeviceMesh 上对张量进行分片。下例展示了如何对列并行进行跨维权重分片。

    weight = ...
    weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(0)]) # Shard across the 1st (column-wise) dimension
    bias = ...
    bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Shard(-1)]) # Shard across the ONLY dimension

    下例展示了如何对行并行进行跨维权重分片。

    weight = ...
    weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(1)]) # Shard across the 2nd (row-wise) dimension
    bias = ...
    bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Replicate()]) # Replicate bias across all GPUs
  • Replicate():在 DeviceMesh 上复制 DTensor。它在每个设备上创建张量的完整副本。

    bias = ...
    bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Replicate()]) # Replicate bias across all GPUs
  • Partial():表示张量正在等待归约(Reduction)操作(在 Transformers 的使用中通常不相关)。

资源

在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.