Transformers 文档

张量并行

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

张量并行

张量并行 将模型层切片,以便多个硬件加速器同时对其进行操作。这使您可以运行超出单个 GPU 内存容量的模型并获得更高的吞吐量。您需要快速的节点内通信,因为 GPU 在每一层都会交换部分结果。

下表列出了支持原生张量并行性的模型。请在 GitHub 上提交 issue 或 pull request 来为模型添加支持。

显示支持的模型

本指南介绍了如何在 Transformers 中启用张量并行性以及可用的分区策略。

对模型进行分区

当模型具有 tp_plan 时,Transformers 会启用张量并行。可从两种分区方法中选择。

  • tp_plan="auto" 设置为基于模型的预定义配置的自动计划。
  • 定义并传递手动 tp_plan
自动计划
手动计划
import os
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

# model_id = "meta-llama/Llama-4-Scout-17B-16E-Instruct" # better to visualize all the possible strategies
model = AutoModelForCausalLM.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct" , dtype=torch.bfloat16, tp_plan="auto")
print(model._tp_plan)

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct")
prompt = "Can I help"
inputs = tokenizer(prompt, return_tensors="pt").input_ids.to(model.device)

# distributed run
outputs = model(inputs)

使用 torchrun 启动推理脚本。每个 GPU 使用 4 个进程。

torchrun --nproc-per-node 4 demo.py

分区策略

ParallelInterface 类定义了所有分区策略。它将字符串映射到策略实现。您无需直接与此类交互,因为您在 from_pretrained() 中使用 tp_plan 来设置策略。检查可用策略很有用。

class ParallelInterface(MutableMapping):
    """
    Dict-like object keeping track of allowed attention functions. You can easily add a new attention function
    with a call to `register()`. If a model needs to locally overwrite an existing attention function, say `sdpa`,
    it needs to declare a new instance of this class inside the `modeling_<model>.py`, and declare it on that instance.
    """
    _global_mapping = {
        "colwise": ColwiseParallel(),
        "rowwise": RowwiseParallel(),
        "colwise_rep": ColwiseParallel(output_layouts=Replicate()),
        "rowwise_rep": RowwiseParallel(input_layouts=Replicate()),
        "local_colwise": ColwiseParallel(use_dtensor=False),
        "local_rowwise": RowwiseParallel(use_dtensor=False),
        "local": IsolatedParallel(),
        "moe_tp_experts": MoeTensorParalellExperts(),
        "local_packed_rowwise": PackedRowwiseParallel(use_dtensor=False),
        "sequence_parallel": SequenceParallel(),
        "replicate": ReplicateParallel(),
    }

下表描述了每种策略。

策略 描述
ColwiseParallel 按列分割权重和偏置。
RowwiseParallel 按行分割权重和偏置。支持 nn.Embedding 模块的分区。
SequenceParallel 序列并行实现,支持 LayerNormDropout 层。支持 Python 实现的 RMSNorm
PackedColwiseParallel ColwiseParallel 的一个变体,支持打包权重(例如,将 up_projgate_proj 打包在一起)。有关更多详细信息,请参阅 代码
PackedRowwiseParallel RowwiseParallel 的一个变体,支持打包权重(有关更多详细信息,请参阅 代码)。
GatherParallel 跨设备收集模块输出。
IsolatedParallel 将模块隔离在其他设备之外。用于混合专家 (MoE) 层中的专家。
ReplicateParallel 将模块复制到所有设备上。防止部分分片模型导致 torch.distributed API 出现错误。

打包策略

权重打包将多个线性层合并为一个更大的层。PackedColwiseParallelPackedRowwiseParallel 策略会正确地分片打包权重。基本的 ColwiseParallelRowwiseParallel 策略会错误地分片打包权重。

下面的示例将 up_projgate_proj 打包到一个 gate_up_proj 模块中,并需要 PackedRowwiseParallel 策略来分片 gate_up_proj

class Llama4TextExperts(nn.Module):
    ...
    self.gate_up_proj = nn.Parameter(torch.zeros(self.num_experts, self.hidden_size, 2 * self.expert_dim))

forward 传递中使用批矩阵乘法来计算 gate_up_proj 模块的输出。

def forward(self, hidden_states):
    ...
    gate_up = torch.bmm(hidden_states, self.gate_up_proj) # Compute the output of the gate_up_proj module
    gate, up = gate_up.chunk(2, dim=-1) # Split the output into gate and up

有关 Packed* 为什么需要使用的视觉表示,请参阅 此注释

本地策略

本地策略 (local_colwiselocal_rowwiselocal_packed_rowwise) 不使用 DTensor,因为它缺乏对某些操作(如 torch.chunk)的支持。相反,本地策略使用基本的 torch.Tensor 并手动执行分布式逻辑。

自定义分区策略

继承自 TensorParallelLayer 来创建自定义分区策略。实现 partition_tensor_prepare_input_fn_prepare_output_fn

ParallelInterface 映射中注册策略,以便在 tp_plan 中指定时,分发逻辑能够找到它。

下面的示例展示了如何通过此工作流程实现 ColwiseParallel

  1. 继承自 TensorParallelLayer。在 __init__ 方法中,定义 input_layoutsoutput_layouts 来描述输入和输出张量如何在设备上放置。desired_input_layouts 属性用于指定输入应 *如何* 放置在设备上。

    class ColwiseParallel(TensorParallelLayer):
        def __init__(
            self,
            *,
            input_layouts: Optional[Placement] = None, # The input layout coming from the previous layer
            output_layouts: Optional[Placement] = None, # The output layout we want to achieve
            use_local_output: bool = True, # Whether to use local output or not
            use_dtensor=True, # Whether to use DTensor or not
        ):
            self.input_layouts = (input_layouts or Replicate(),) # The input sharding coming from the previous layer
            self.output_layouts = (output_layouts or Shard(-1),) # Desired output sharding
            self.desired_input_layouts = (Replicate(),) # Desired input sharding, inputs should be replicated across GPUs
            self.use_local_output = use_local_output
            self.use_dtensor = use_dtensor
  2. 实现 partition_tensor_prepare_input_fn_prepare_output_fn 方法。

    partition_tensor 方法对张量进行分区,并用分区后的张量填充 empty_param。使用实用函数 get_tensor_shard 来帮助您获取给定 rank 的原始参数的正确分片,并使用 get_packed_weights 来处理打包权重。

    def partition_tensor(
        self,
        param, # Full tensor of the parameter
        empty_param, # Empty tensor of the parameter, will be filled with the partitioned tensor
        param_type, # Type of the parameter, `bias` or `weight`
        param_casting_dtype, # The type to cast the parameter to
        to_contiguous, # Whether to convert the tensor to a contiguous memory layout
        rank, # The rank of the current device
        device_mesh, # The device mesh
    ) -> nn.Parameter: # Return the partitioned parameter
        ...

    _prepare_input_fn_prepare_output_fn 方法用于 预前向后向 钩子。它们会根据 __init__ 中指定的期望布局重新分发输入和输出。

    def _prepare_input_fn(input_layouts, desired_input_layouts, mod, inputs, device_mesh):
        ...
        # Do some custom logic, cast to DTensor etc.
        ...
        return inputs.redistribute(placements=desired_input_layouts, device_mesh=device_mesh)
    def _prepare_output_fn(output_layouts, use_local_output, mod, outputs, device_mesh):
        ...
        # Do some custom logic, cast to DTensor etc.
        ...
        return outputs.redistribute(placements=output_layouts, device_mesh=device_mesh)
  3. 将策略注册到 ParallelInterface 以便在 tp_plan 中使用它。

    from transformers.integrations.tensor_parallel import ParallelInterface
    
    ParallelInterface.register_strategy("colwise_custom", ColwiseParallel)
    tp_plan = {
        "model.layers.*.self_attn.q_proj": "colwise_custom",
        ...
    }
    model = AutoModelForCausalLM.from_pretrained(model_id, dtype=torch.bfloat16, tp_plan=tp_plan)

基准测试

张量并行性显著加快了推理速度,尤其是在处理大型批量或长序列时。

此图表显示了在 Llama 上进行一次前向传递(序列长度为 512)的预期加速效果。

设计实现

Transformers 以一种框架无关的方式实现张量并行。它依赖于 DeviceMeshDTensor(来自 torch.distributed),提供了一个简单且可扩展的接口。

DeviceMesh

DeviceMesh 创建一个通信的设备多维网格。不同的并行化策略需要不同的通信模式。创建带有多个子网格的 DeviceMesh 来处理这些模式。

from torch.distributed.device_mesh import init_device_mesh

# Create a 1D mesh of 4 GPUs
device_mesh = init_device_mesh("cuda", (4,), mesh_dim_names=["tp"])

大多数 torch.distributed 并行化策略都适用于网格本身或其子网格。网格会自动处理通信模式。

DTensor

DTensor (分布式张量) 在常规张量操作之上处理分布式逻辑。张量并行中的大多数模型权重都存储为 DTensor

placement 属性告诉 PyTorch 如何在 DeviceMesh 中的设备上放置张量。它接受以下值:

  • Shard(dimension) 会在 DeviceMesh 上沿着给定的维度分片 DTensor。下面的示例展示了如何对列式分区的权重进行跨不同维度的分片。

    weight = ...
    weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(0)]) # Shard across the 1st (column-wise) dimension
    bias = ...
    bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Shard(-1)]) # Shard across the ONLY dimension

    此示例展示了如何对行式分区的权重进行跨不同维度的分片。

    weight = ...
    weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(1)]) # Shard across the 2nd (row-wise) dimension
    bias = ...
    bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Replicate()]) # Replicate bias across all GPUs
  • Replicate() 会在整个 DeviceMesh 上复制 DTensor。它会在每个设备上创建张量的完整副本。

    bias = ...
    bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Replicate()]) # Replicate bias across all GPUs
  • Partial() 表示一个张量正在等待规约操作(在 Transformers 中通常不适用)。

资源

在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.