以混合分布式方式微调 Falcon 7b

社区文章 发布于 2024年12月31日

(人工智能难道不是会吞噬我们大脑的僵尸吗!!)

image/webp

大型语言模型 (LLMs) 的微调

微调大型语言模型(LLMs)是使预训练模型适应特定任务或领域的关键一步。预训练的 LLMs,如 Falcon 3,在海量数据集上进行训练,并具备关于语言和上下文的通用知识。然而,微调使这些模型能够在摘要、情感分析或领域特定问答等特定任务中表现出色。

微调过程涉及使用较小的、任务特定的数据集更新模型权重,同时利用预训练知识。与从头开始训练模型相比,微调通常需要较少的计算资源,并专注于提高任务性能,同时最大限度地降低过拟合的风险。常用的技术包括监督微调、从人类反馈中进行强化学习(RLHF)或指令微调。

微调大型模型面临内存和计算需求高昂等挑战,因此分布式训练技术对于优化效率至关重要。

分布式模型微调

随着 LLM 规模的增长,由于内存限制,在单台机器上微调它们变得不可行。分布式模型微调通过将计算分散到多个设备(例如 GPU 或 TPU)来解决这些挑战。这种方法不仅减少了每个设备的内存负担,还通过并行操作加速了训练。

分布式微调的关键方法

  • 数据并行:每个设备处理数据的不同子集,同时保持整个模型的副本。在每次前向和反向传播后,在每个设备上计算的梯度都会同步并平均。
  • 模型并行:模型本身被拆分到多个设备上。例如,不同的层或单个层的不同部分可以位于不同的 GPU 上,从而实现超大型模型的训练。
  • 流水线并行:通过将模型划分为多个段并通过每个段依次处理小批量,结合了模型并行和数据并行的方面。
  • 零冗余优化器 (ZeRO):DeepSpeed 的 ZeRO 是一种流行的分布式微调优化框架,它将优化器状态、梯度和模型权重分配到不同设备上,显著降低了内存需求。

这些方法各有优缺点,选择取决于模型大小、硬件可用性和期望的效率。

分布式张量微调

分布式张量微调是一种先进的方法,旨在优化 LLM 微调时的内存和计算效率。这种方法不是将模型视为单一实体,而是将张量(模型参数)本身在设备之间进行分区和操作。

分布式张量微调的核心概念

  • 分片张量管理:张量(如权重和梯度)被分成较小的分片,并分布到各个设备上。这最大限度地减少了单个设备上的内存使用。
  • 张量并行:大型张量上的操作(如矩阵乘法)被拆分并在多个 GPU 上执行。例如,在多头注意力机制中,不同注意力头的计算可以并行分布。
  • 内存卸载:像 DeepSpeed ZeRO stage 3 这样的技术将优化器状态和激活卸载到 CPU 内存或 NVMe 存储,从而可以微调远大于 GPU 总内存的模型。
  • 带分区的梯度累积:梯度以分区方式计算,并以内存高效的方式累积,然后更新模型参数。

分布式张量微调在处理超大型模型(例如 7B、13B 或 175B 参数模型)时特别有用。通过实现细粒度分区和执行,它使大规模模型微调在不牺牲性能的情况下变得实用。

这种方法还与混合精度训练(例如 FP16 或 BF16)和稀疏技术高度兼容,进一步提高了 NVIDIA A100 GPU 等现代硬件的效率。

废话少说,让我们开始行动,大展身手吧!!!

DeepSpeed 混合并行

结合张量和流水线并行 DeepSpeed 支持混合并行,它结合了张量并行和流水线并行。这对于超大型模型尤其有用。

  • 张量并行将单层内的计算拆分到多个 GPU 上。
  • 流水线并行将模型跨层拆分为多个阶段。

通过结合这些方法,DeepSpeed 可以训练比单个 GPU 能处理的模型大几个数量级的模型。

假设您的模型有 24 层(Transformer 块),并且每层都非常大(单个 GPU 无法处理)。混合并行用于

  • 拆分每层内的操作(张量并行)。
  • 将模型分配到多个 GPU(流水线并行)。如果我们假设有 4 个 GPU,流水线并行:将模型分成 2 个阶段(每个阶段跨越 12 层)。
  • 阶段 1:层 1-12(在 GPU 0 和 1 上)。
  • 阶段 2:层 13-24(在 GPU 2 和 3 上)。

张量并行:在每个阶段内,层被拆分到 2 个 GPU 上。

  • 阶段 1:层 1-12 在 GPU 0 和 1 上进行张量并行。
  • 阶段 2:层 13-24 在 GPU 2 和 3 上进行张量并行。

安装依赖项并准备数据集

在本教程中,我们将使用 DeepSpeed 框架以分布式张量的方式训练/微调模型,我们将使用 四块 A100 GPU,每块 80GB 内存

依赖项

训练模型和准备数据集需要以下依赖项

pip install transformers datasets pandas accelerate tqdm torch deepspeed tqdm wandb dataclasses

其中一些依赖项将用于处理数据集,而另一些则用于微调/训练目的。

数据集和参数准备

在本例中,我将使用我共享的一个包含 40 万个 Ghidra 反编译代码样本的数据集。在本教程中,我们将微调 Falcon3 模型,以从反编译实例中重新创建原始程序。该数据集包含原始源代码程序及其使用 Ghidra(NSA 开发的强大逆向工程工具)获得的反编译函数对。它旨在促进程序分析、逆向工程、反编译改进以及源代码和二进制转换的机器学习应用领域的研究。

让我们从导入一些依赖项开始

import copy
import random
from dataclasses import dataclass, field
from typing import Optional, Dict, Sequence

import torch
import torch.distributed
import transformers
from transformers import Trainer
from datasets import load_dataset
import wandb
import deepspeed

首先,让我们通过定义一些数据类来简化我们的工作,这些数据类用于获取数据集、训练器和模型所需的参数。

@dataclass
class ModelArguments:
    model_name_or_path: Optional[str] = field(
        default="tiiuae/Falcon3-7B-Base"
    )
    use_flash_attention: bool = field(
        default=False, metadata={"help": "Whether to use flash attention."}
    )


@dataclass
class DataArguments:
    data_path: str = field(
        default=None, metadata={"help": "Path to the training data."}
    )


@dataclass
class TrainingArguments(transformers.TrainingArguments):
    cache_dir: Optional[str] = field(default=None)
    optim: str = field(default="adamw_torch")
    model_max_length: int = field(
        default=512,
        metadata={
            "help": "Maximum sequence length. Sequences will be right padded (and possibly truncated)."
        },
    )

可以使用以下代码片段加载数据集(但暂时忽略此步骤,因为我们将构建更灵活的数据集加载方法)

data_path = Neo111x/decompile_LLM
raw_train_datasets = load_dataset(data_path, split="train")

数据集如下所示

指令 输出
void ioabs_tcp_pre_select(int *param_1,int *param_2,long param_3) { *param_1 = *param_2; *param_2 = *param_2 + 1; *(int *)((long)*param_1 * 8 + param_3 + 4) = param_1[4]; *(uint *)(param_3 + (long)*param_1 * 8) = *(uint *)(param_3 + (long)*param_1 * 8) 1; if (((**(int **)(param_1 + 2) + *(int )((long *)(param_1 + 2) + 4)) - *(int )((long *)(param_1 + 2) + 8)) % *(int )((long *)(param_1 + 2) + 4) != 0) { *(uint *)(param_3 + (long)*param_1 * 8) = *(uint *)(param_3 + (long)*param_1 * 8)
ulong ioabs_tcp_pre_select(int *param_1,int *param_2,long param_3) { uint *puVar1; int iVar2; int *piVar3; int iVar4; ulong uVar5; iVar2 = *param_2; *param_1 = iVar2; *param_2 = iVar2 + 1; *(int *)(param_3 + 4 + (long)*param_1 * 8) = param_1[4]; puVar1 = (uint *)(param_3 + (long)*param_1 * 8); *puVar1 = *puVar1 1; piVar3 = *(int **)(param_1 + 2); iVar2 = piVar3[1]; iVar4 = (iVar2 + *piVar3) - piVar3[2]; uVar5 = (long)iVar4 / (long)iVar2 & 0xffffffff; if (iVar4 % iVar2 != 0) { uVar5 = (ulong)*param_1; puVar1 = (uint *)(param_3 + uVar5 * 8); *puVar1 = *puVar1
......................... ................................

其中指令被认为是模型的输入,输出是模型的预期输出。接下来我们需要根据数据集提供的指令构建提示,我们可以为此目的定义以下函数

def build_instruction_prompt(instruction: str):
    return """# This is the decompiled code:
{}
# What is the source code?
""".format(
        instruction
    )

接下来我们将定义我们的标记化函数,该函数应该接受指令并为模型输入进行标记化。

def _tokenize_fn(
    strings: Sequence[str], tokenizer: transformers.PreTrainedTokenizer
) -> Dict:
    """Tokenize a list of strings."""
    tokenized_list = [
        tokenizer(
            text,
            return_tensors="pt",
            padding="longest",
            max_length=tokenizer.model_max_length,
            truncation=True,
        )
        for text in strings
    ]

    input_ids = labels = [tokenized.input_ids[0] for tokenized in tokenized_list]
    input_ids_lens = labels_lens = [
        tokenized.input_ids.ne(tokenizer.pad_token_id).sum().item()
        for tokenized in tokenized_list
    ]

    return dict(
        input_ids=input_ids,
        labels=labels,
        input_ids_lens=input_ids_lens,
        labels_lens=labels_lens,
    )

让我们更花哨一点,定义一个预处理函数,它使用标记化函数来预处理我们的数据

IGNORE_INDEX = -100
def preprocess(
    sources: Sequence[str],
    targets: Sequence[str],
    tokenizer: transformers.PreTrainedTokenizer,
) -> Dict:
    """Preprocess the data by tokenizing."""
    examples = [s + t for s, t in zip(sources, targets)]
    examples_tokenized, sources_tokenized = [
        _tokenize_fn(strings, tokenizer) for strings in (examples, sources)
    ]
    input_ids = examples_tokenized["input_ids"]

    labels = copy.deepcopy(input_ids)
    for label, source_len in zip(labels, sources_tokenized["input_ids_lens"]):
        label[:source_len] = IGNORE_INDEX
    return dict(input_ids=input_ids, labels=labels)

现在在下一步中,我们将更加精细,定义一个数据收集器类,该类可以传递给我们的训练器对象,并使数据预处理和标记化的所有过程变得更容易。

@dataclass
class DataCollatorForSupervisedDataset(object):
    """Collate examples for supervised fine-tuning."""

    tokenizer: transformers.PreTrainedTokenizer

    def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
        input_ids, labels = tuple(
            [instance[key] for instance in instances] for key in ("input_ids", "labels")
        )
        input_ids = [torch.tensor(x) for x in input_ids]
        input_ids = torch.nn.utils.rnn.pad_sequence(
            input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id
        )
        labels = [torch.tensor(x) for x in labels]
        labels = torch.nn.utils.rnn.pad_sequence(
            labels, batch_first=True, padding_value=IGNORE_INDEX
        )

        return dict(
            input_ids=input_ids,
            labels=labels,
            attention_mask=input_ids.ne(self.tokenizer.pad_token_id),
        )

现在为了让事情更易于管理,我们将定义一个标记化函数来封装构建提示和标记化过程。该函数将调用构建提示指令来构建适当的提示,然后对函数调用结果进行标记化。

def train_tokenize_function(examples, tokenizer):
    sources = [
        build_instruction_prompt(instruction) for instruction in examples["instruction"]
    ]
    eos_token = tokenizer.eos_token
    targets = [f"{output}\n{eos_token}" for output in examples["output"]]
    data_dict = preprocess(sources, targets, tokenizer)
    return data_dict

到目前为止,我们已经完成了数据准备和预处理定义,我们定义了所有必需的函数来构建我们的训练数据集并将其格式化为适当的模型输入。

准备模型进行训练

至此,我们已经完成了令牌化、预处理和准备数据集所需的所有步骤。现在我们可以专注于实现训练/微调模型所需的代码。在某些时候,我们定义了一些数据类来获取一些参数,因为在使用张量并行模式和管道训练模型时,我们需要对批量大小进行一些实验,以使大型模型和大型数据集适应我们的硬件。通常在使用 DeepSpeed 时,最好将这些参数作为参数传递给我们的训练脚本,我们可以使用以下代码来解析不同的参数并将它们分配给适当的参数类(数据、训练器、模型)

def train():
    parser = transformers.HfArgumentParser(
        (ModelArguments, DataArguments, TrainingArguments)
    )
    model_args, data_args, training_args = parser.parse_args_into_dataclasses()

您可以在 Hugging Face 文档中找到更多关于 HfArgumentParser 的信息:文档

让我们继续我们的训练函数定义,加载模型参数中定义的模型分词器。

tokenizer = transformers.AutoTokenizer.from_pretrained(
        model_args.model_name_or_path,
        model_max_length=training_args.model_max_length,
        padding_side="right",
        use_fast=True,
        trust_remote_code=True,
    )

    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
        tokenizer.pad_token_id = tokenizer.eos_token_id

    print("PAD Token:", tokenizer.pad_token, tokenizer.pad_token_id)
    print("BOS Token", tokenizer.bos_token, tokenizer.bos_token_id)
    print("EOS Token", tokenizer.eos_token, tokenizer.eos_token_id)

现在让我们加载我们的模型并激活 Flash Attention(如果它与我们的模型兼容)

model_kwargs = {}
    if model_args.use_flash_attention:
        model_kwargs["attn_implementation"] = "flash_attention_2"

    model = transformers.AutoModelForCausalLM.from_pretrained(
        model_args.model_name_or_path, torch_dtype=torch.bfloat16, trust_remote_code=True, **model_kwargs
    )

这些参数将从 DeepSpeed 调用脚本时用户定义的参数中定义的类中解析。您可以在这里找到更多关于 Flash Attention 的信息。

现在让我们使用本文中定义的预处理函数并加载我们的数据集

raw_train_datasets = load_dataset(data_args.data_path, split="train")
        #"json",
        #data_files=data_args.data_path,
        #split="train",
        #cache_dir=training_args.cache_dir,
    #)
    if training_args.local_rank > 0:
        torch.distributed.barrier()

    train_dataset = raw_train_datasets.map(
        train_tokenize_function,
        batched=True,
        batch_size=64,
        num_proc=8,
        remove_columns=raw_train_datasets.column_names,
        load_from_cache_file=True,  # not args.overwrite_cache
        desc="Running Encoding",
        fn_kwargs={"tokenizer": tokenizer},
    )

    if training_args.local_rank == 0:
        torch.distributed.barrier()

    if training_args.local_rank == 0:
        print("Training dataset samples:", len(train_dataset))
        for index in random.sample(range(len(train_dataset)), 3):
            print(
                f"Sample {index} of the training set: {train_dataset[index]['input_ids']}, {train_dataset[index]['labels']}."
            )
            print(
                f"Sample {index} of the training set: {tokenizer.decode(list(train_dataset[index]['input_ids']))}."
            )

    data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer)
    data_module = dict(
        train_dataset=train_dataset, eval_dataset=None, data_collator=data_collator
    )

在这里,我们使用 local_rank 参数,这些参数在使用 DeepSpeed 时会自动传递给脚本,以实现数据在 GPU 中的分发。我们还使用一些打印来帮助理解正在发生的事情。

在这一步,我们准备好使用为数据集和模型定义的所有对象和函数来实例化我们的训练器实例

  trainer = Trainer(
      model=model, tokenizer=tokenizer, args=training_args, **data_module
  )
  
  trainer.train()
  trainer.save_model()
  trainer.save_state()

坚持住,我们快到了!我们很快就可以开始训练了!

DeepSpeed 配置

现在最后一步是定义我们的 DeepSpeed 配置。让我们首先定义如何确定我们的批量大小

全局批量大小 = 每设备批量大小 × GPU 数量 × 梯度累积步数

因此,假设我们有 4 个 GPU,每个设备 4 个数据批次,梯度累积步数为 4,那么批量大小将是 64。

我们还将在 DeepSpeed 中使用 ZeRO 优化框架,该框架旨在减少内存消耗,使在内存有限的 GPU 上训练大型模型成为可能。它通过在设备之间分区和卸载模型和优化器的某些组件来实现这一点。

我们还将使用 ZeRO 优化的 Stage 2,它具有以下特点:

  • 分区优化器状态:优化器状态(例如,Adam 的动量、方差)在 GPU 之间分割。
  • 分区梯度:每个 GPU 只保留一部分梯度,而不是存储完整的梯度。

通常,模型参数(权重)在训练期间驻留在 GPU 内存中,但卸载可减少 GPU 内存需求。在配置中,我们指定可以使用 CPU 内存进行卸载。

我们还将启用将优化器状态(例如,Adam 中的动量和方差)卸载到 CPU。这些状态通常很大,会消耗大量内存。将它们卸载到 CPU 将有助于避免这种大型模型出现内存不足的情况。启用 CPU 上的固定(页面锁定)内存,可以实现 CPU 和 GPU 之间更快、更高效的数据传输。

剩下的不言而喻,我们通过流水线将其并行到 4 个阶段来启用模型并行,并通过与可用 GPU 数量相等的世界大小来启用张量并行。

{
  "train_batch_size": 64,
  "gradient_accumulation_steps": 4,
  "zero_optimization": {
    "stage": 2,
    "offload_param": {
      "device": "cpu",
      "pin_memory": true
    },
    "offload_optimizer": {
      "device": "cpu",
      "pin_memory": true
    }
  },
  "bf16": {
    "enabled": true,
    "loss_scale": 0,
    "initial_scale_power": 32
  },
  "deepspeed_transformer_kernel": true,
  "model_parallel": {
    "enabled": true,
    "pipeline": {
      "enabled": true,
      "num_stages": 4
    },
    "tensor_parallel": {
      "enabled": true,
      "world_size": 4
    }
  }
}

我们必须将此配置保存为 JSON 文件,我们将其命名为 ds.json。更多信息可以在 DeepSpeed 文档这里找到。现在我们准备好训练模型了,但让我们做得更花哨一些,定义一个 bash 脚本来传递脚本的参数

WORKSPACE="/data"
DATA_PATH="Neo111x/decompile_LLM"
OUTPUT_PATH="${WORKSPACE}/output_models/falCodecompile-large-7b"
MODEL_PATH="tiiuae/Falcon3-7B-Base"

deepspeed --num_gpus=4 finetune.py \
    --model_name_or_path $MODEL_PATH \
    --data_path $DATA_PATH \
    --output_dir $OUTPUT_PATH \
    --num_train_epochs 2 \
    --model_max_length 1024 \
    --per_device_train_batch_size 4 \
    --gradient_accumulation_steps 4 \
    --evaluation_strategy "no" \
    --save_strategy "steps" \
    --use_flash_attention \
    --save_steps 500 \
    --save_total_limit 100 \
    --learning_rate 2e-5 \
    --max_grad_norm 1.0 \
    --weight_decay 0.1 \
    --warmup_ratio 0.025 \
    --logging_steps 1 \
    --lr_scheduler_type "cosine" \
    --gradient_checkpointing True \
    --report_to "wandb" \
    --bf16 True \
    --deepspeed "ds.json"
#    --master_port 29600 

这里的所有参数都是不言自明的,你可以根据自己的目的进行调整。

完整脚本可在此处找到

社区

注册登录 以评论