张量并行
深度学习,尤其是语言模型,正变得越来越庞大,性能与规模之间的关系已在 kaplan2020scalinglawsneurallanguage 中解释,
计算量、数据量、参数量越多,困惑度方面的性能就越好。
当 GPT-3 发布时,其拥有 175B 参数,它改变了世界。根据论文 brown2020languagemodelsfewshotlearners,基本上,如果参数量足够大且数据集量适当,预训练语言模型就能完成任何 NLP 任务,只要你提供少量示例,或者技术术语叫少样本学习,无需进行训练会话(2024 年的训练会话有多个阶段,如预训练、持续预训练、预微调、中微调、后微调)。
现在 175B 很大,这篇论文发布于 2020 年,即使到现在,175B 仍然被认为是超大的。GPT-3 在 V100 上进行训练,论文第 2.3 节有提及,
V100 最适合单精度(32 位),假设模型保存为 float32(4 字节),175B * 4 字节 ≈ 652 GB!
对于 V100,最大的 GPU 内存是 32GB,652GB / 32 = 21 个 GPU!所以,仅仅为了将模型存储在内存中,你需要至少 21 个 V100 32GB 显存的单元,这还不包括前向传播!
那么 OpenAI 是如何将模型加载到多个 GPU 中的呢?**张量并行!**
如你所见,模型无法放入单个 GPU,因此我们必须对模型进行分片。深度学习中有两种分片方法:1. 张量并行,2. 流水线并行。
假设我有一个包含 2 个隐藏层(4x4 和 4x2)和 2 个 GPU 的模型,
张量并行将隐藏层分片到多个 GPU 中,但所有 GPU 都获得所有隐藏层。而流水线并行则将隐藏层分割到多个 GPU 中。每种方法都有其优缺点,但在这篇博客中,我们将重点关注使用 PyTorch 的张量并行。张量并行本身可以分为两种不同的方法:1. 行向并行和 2. 列向并行。
行向并行是指我们按行方式分片隐藏层,而列向并行则是按列方式分片隐藏层。
行向并行
使用与上述相同的隐藏层大小,
i. 对于第一个隐藏层,我们将 4x4 分成两行,每个 GPU 存储权重,GPU 0 存储 2x4,GPU 1 存储 2x4。
ii. 对于第二个隐藏层,我们将 4x2 分成两行,每个 GPU 存储权重,GPU 0 存储 2x2,GPU 1 存储 2x2。
iii. 输入为 1x4 -> 分成两列并分散到 GPU,1x2 到 GPU 0,1x2 到 GPU 1,每个 GPU 将执行矩阵乘法,GPU 0 1x2 乘以 2x4 = 1x4,GPU 1 1x2 乘以 2x4 = 1x4,之后进行聚合求和。在矩阵乘法坐标方面,
- iv. 第一个隐藏层的输出现在变为输入,1x4 -> 分成两列并分散到 GPU,1x2 到 GPU 0,1x2 到 GPU 1,每个 GPU 将执行矩阵乘法,GPU 0 1x2 乘以 2x2 = 1x2,GPU 1 1x2 乘以 2x2 = 1x2,之后进行聚合求和。在矩阵乘法坐标方面,
列向并行
使用与行向并行相同的隐藏层大小,
i. 对于第一个隐藏层,我们将 4x4 分成两列,每个 GPU 存储权重,GPU 0 存储 4x2,GPU 1 存储 4x2。
ii. 对于第二个隐藏层,我们将 4x2 分成两列,每个 GPU 存储权重,GPU 0 存储 4x1,GPU 1 存储 4x1。
iii. 输入为 1x4 -> 复制到与 GPU 数量相同,并分散到 GPU,1x4 到 GPU 0,1x4 到 GPU 1,每个 GPU 将执行矩阵乘法,GPU 0 1x4 乘以 4x2 = 1x2,GPU 1 1x4 乘以 4x2 = 1x2,之后进行聚合连接。在矩阵乘法坐标方面,
- iv. 第一个隐藏层的输出现在变为输入,1x4 -> 复制到与 GPU 数量相同,并分散到 GPU,1x4 到 GPU 0,1x4 到 GPU 1,每个 GPU 将执行矩阵乘法,GPU 0 1x4 乘以 4x1 = 1x2,GPU 1 1x4 乘以 4x1 = 1x1,之后进行聚合连接。在矩阵乘法坐标方面,
因为你将权重分片到 N 个设备上,所以每个设备的内存也减少了 N 倍!设备越多,可以容纳的模型就越大。
那么现在,让我们用 PyTorch 实现张量并行行向并行!
为什么是行向?因为它看起来更难,难的才好。
如上所述,要实现张量并行,必须使用多 GPU,而多 GPU 需要特定的分布式通信。幸运的是,在 PyTorch 中,有一个原生接口用于分布式通信,称为 Torch Distributed Elastic。
Torch Distributed Elastic 所做的是,每个 GPU 获得自己的进程,
- 假设我有 2 个 GPU,Torch Distributed Elastic 将派生 2 个进程,PID 0 用于 GPU 0,PID 1 用于 GPU 1。
- 这些进程如何相互通信?通过开放端口进行进程间通信。但对于深度学习模型的数据传输,如果你使用的是 Nvidia,默认情况下会使用 NCCL(发音为 nickel) 来进行梯度和权重同步。
- 在深度学习框架或 PyTorch 中谈论分布式系统时,有 3 个重要术语:`RANK`、`WORLD_SIZE` 和 `LOCAL_WORLD_SIZE`。`RANK` 是 GPU 的秩,`WORLD_SIZE` 是你初始化的 GPU 总数,`LOCAL_WORLD_SIZE` 是如果你使用多节点时每个节点的 GPU 总数。但如果你使用单个节点,`WORLD_SIZE` 和 `LOCAL_WORLD_SIZE` 是相同的。
- GPU 0 是 `RANK` 0,GPU 1 是 `RANK` 1,`WORLD_SIZE` 是 2。`RANK` 和 `WORLD_SIZE` 都可以通过操作系统环境变量获取,这些变量由 Torch Distributed Elastic 自动设置。
- 假设 `RANK` 0 开放端口 29950,`RANK` 1 开放端口 29951,
NCCL 使用其自己的通信方式,称为 CUDA IPC(进程间通信),这是一种设备到设备的点对点通信。并非所有 GPU 都支持 P2P,因此如果不支持,NCCL 将使用替代方案,例如位于 `/dev/shm` 的共享内存。
为什么多进程(即 Torch Distributed Elastic)和 GPU(即 NCCL)需要不同的通信方式?套接字或开放端口用于检查心跳并在多进程之间进行简单的字符串通信,而 NCCL 仅为 Nvidia 对等多 GPU 通信设计。
简单分散
在进行张量并行之前,我们先尝试简单的分散和收集,以便熟悉 PyTorch Distributed Elastic,
import torch
import torch.nn as nn
import torch.distributed as dist
import os
def main():
world_size = torch.cuda.device_count()
local_rank = int(os.environ["LOCAL_RANK"])
device = f'cuda:{local_rank}'
dist.init_process_group(backend='nccl')
tensor_size = 2
output_tensor = torch.zeros(tensor_size, device=device)
if dist.get_rank() == 0:
t_ones = torch.ones(tensor_size, device=device)
t_fives = torch.ones(tensor_size, device=device) * 5
scatter_list = [t_ones, t_fives]
else:
scatter_list = None
dist.scatter(output_tensor, scatter_list, src=0)
print(f'local rank: {local_rank}', output_tensor)
output_tensor += 1
if dist.get_rank() == 0:
t_ones1 = torch.ones(tensor_size, device=device)
t_ones2 = torch.ones(tensor_size, device=device)
scatter_list = [t_ones1, t_ones2]
else:
scatter_list = None
dist.gather(output_tensor, scatter_list, dst=0)
if dist.get_rank() == 0:
print(scatter_list)
if __name__ == "__main__":
main()
将其保存为 `simple-scatter-gather.py`,此示例最初来自 https://pytorch.ac.cn/docs/stable/distributed.html#torch.distributed.scatter,我们只是使其完整。此示例需要两个 GPU,并使用 `torchrun` 执行:
torchrun \
--nproc-per-node=2 \
simple-scatter-gather.py
有关此 CLI 定义的更多信息,请参阅 https://pytorch.ac.cn/docs/stable/elastic/run.html#stacked-single-node-multi-worker
torchrun \
--nproc-per-node=$NUM_TRAINERS \
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
--nproc-per-node
是你想要运行的 GPU 数量,如果设置为--nproc-per-node=2
,它将生成 2 个进程,每个进程拥有自己的 GPU。YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
是你的 Python 脚本及其参数。
输出,
local rank: 0 tensor([1., 1.], device='cuda:0')
local rank: 1 tensor([5., 5.], device='cuda:1')
[tensor([2., 2.], device='cuda:0'), tensor([6., 6.], device='cuda:0')]
dist.scatter
用于将张量列表分散到 N 个 GPU 中,列表的长度必须与 N 个 GPU 的数量相同。必须为每个 GPU 初始化一个输出张量,`output_tensor = torch.zeros(tensor_size, device=device)`。因此,这个输出张量是一个临时张量,它将在 `dist.scatter` 期间被替换。
if dist.get_rank() == 0:
如果RANK
是 0,我们将其作为列表,否则作为 None。之后,我们将所有 GPU 的值加一,如果 `RANK` 为 0,我们创建 2 个临时张量,分别用于 GPU 0 和 GPU 1。
我们在 `RANK` 为 0 的节点上收集并打印结果。如你所见,我们得到 [2, 2](来自 GPU 0)和 [6, 6](来自 GPU 1)。
数据移动如下:
现在让我们看看张量并行线性层,
import torch
import torch.nn as nn
import torch.distributed as dist
import os
class Linear(nn.Module):
def __init__(self, in_features, out_features):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.rank = dist.get_rank()
self.world_size = dist.get_world_size()
self.device = f'cuda:{self.rank}'
self.local_in_features = in_features // self.world_size
self.local_out_features = out_features
self.linear = nn.Linear(self.local_in_features, self.local_out_features)
def forward(self, x, batch_size):
local_input = torch.zeros(batch_size, self.local_in_features, device=self.device)
dist.scatter(local_input, list(x.chunk(self.world_size, dim=1)) if self.rank == 0 else None, src=0)
local_output = self.linear(local_input)
dist.reduce(local_output, dst=0, op=dist.ReduceOp.SUM)
return local_output
def main():
world_size = torch.cuda.device_count()
local_rank = int(os.environ["LOCAL_RANK"])
device = f'cuda:{local_rank}'
dist.init_process_group(backend='nccl')
model = Linear(100, 50).to(device)
batch_size = 32
if dist.get_rank() == 0:
input_tensor = torch.randn(batch_size, 100, device=device)
else:
input_tensor = None
output = model(input_tensor, batch_size)
if dist.get_rank() == 0:
print(output, output.shape)
if __name__ == "__main__":
main()
将其保存为 `tp-linear.py` 并运行:
torchrun --nproc-per-node=2 tp-linear.py
输出,
tensor([[ 0.3327, 0.5701, 1.2123, ..., -0.2698, 0.1395, -0.3736],
[ 1.8301, 0.1318, 0.1468, ..., 2.5036, -1.4445, -0.4215],
[-0.2827, 1.5337, 0.7688, ..., 1.8233, -1.2817, 0.7063],
...,
[-1.0496, 0.3786, -0.7972, ..., -0.1917, -1.0284, 0.4730],
[-0.1051, 0.6323, 0.3016, ..., 1.1792, 0.7384, -0.1869],
[-1.3593, -0.8120, 0.9141, ..., -0.4090, 0.5709, -0.5926]],
device='cuda:0', grad_fn=) torch.Size([32, 50])
输出大小为 32x50,这是正确的,32x100 矩阵乘法 100x50 输出 32x50。
local_in_features = in_features // self.world_size
我们将行大小除以世界大小,即 2。之后我们初始化线性层 `nn.Linear(self.local_in_features, self.local_out_features)`,每个 GPU 将拥有 50x50 的矩阵。
如前所述,必须为每个 GPU 初始化一个输出张量,即 `local_input = torch.zeros(batch_size, self.local_in_features, device=self.device)`。
如果 `RANK` 为 0,则将输入分片并分散到 GPU,`dist.scatter(local_input, list(x.chunk(self.world_size, dim=1)) if self.rank == 0 else None, src=0)`。
计算每个 GPU 的矩阵乘法,`local_output = self.linear(local_input)`。
PyTorch 本身具有 reduce 函数,`dist.reduce(local_output, dst=0, op=dist.ReduceOp.SUM)`,因此我们希望通过 sum 操作对所有 GPU 上的变量 `local_output` 进行规约,并将最终结果放在 GPU 0 上。
数据移动如下:
生产API
大多数张量并行线性层实际上都是从 https://github.com/facebookresearch/fairscale 演变而来的,例如,如果您查看 vLLM 张量并行源代码,https://github.com/vllm-project/vllm/blob/main/vllm/model_executor/layers/linear.py,它来自,
- 列向,https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/model_parallel/layers.py#L218
- 行向,https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/model_parallel/layers.py#L299
如您所见,行向前向方法 https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/model_parallel/layers.py#L373,
def forward(self, input_: torch.Tensor) -> torch.Tensor: # type:ignore
# Set up backprop all-reduce.
if self.input_is_parallel:
input_parallel = input_
else:
input_parallel = scatter_to_model_parallel_region(input_)
# Matrix multiply.
output_parallel = F.linear(input_parallel, self.weight)
# All-reduce across all the partitions.
output_ = reduce_from_model_parallel_region(output_parallel)
if self.bias is not None:
output = output_ + self.bias
else:
output = output_
return output
- 将输入分散到 GPU,
input_parallel = scatter_to_model_parallel_region(input_)
- 并行矩阵乘法,
output_parallel = F.linear(input_parallel, self.weight)
- 通过组合进行约化,
output_ = reduce_from_model_parallel_region(output_parallel)
是的,就这些,没那么复杂。