🏎️ 使用 Hugging Face Kernel Hub 在 5 分钟内提升模型性能

发布于 2025 年 6 月 12 日
在 GitHub 上更新

使用预优化内核提升模型性能,可轻松从 Hub 加载。

今天,我们将探索 Hugging Face 的一项激动人心的新进展:**Kernel Hub**!作为机器学习从业者,我们知道要最大化性能,通常需要深入研究优化代码、自定义 CUDA 内核或复杂的构建系统。Kernel Hub 极大地简化了这一过程!

下面是如何在代码中使用内核的简短示例。

import torch

from kernels import get_kernel

# Download optimized kernels from the Hugging Face hub
activation = get_kernel("kernels-community/activation")

# Random tensor
x = torch.randn((10, 10), dtype=torch.float16, device="cuda")

# Run the kernel
y = torch.empty_like(x)
activation.gelu_fast(y, x)

print(y)

在接下来的章节中,我们将涵盖以下主题:

  1. **Kernel Hub 是什么?**——理解核心概念。
  2. **如何使用 Kernel Hub**——一个快速代码示例。
  3. **为简单模型添加 Kernel**——使用 RMSNorm 的实际集成。
  4. **审查性能影响**——对 RMSNorm 差异进行基准测试。
  5. **真实世界用例**——内核库在其他项目中的使用示例。

我们将快速介绍这些概念——核心思想可以在大约 5 分钟内掌握(尽管实验和基准测试可能需要更长时间!)。

1. Kernel Hub 是什么?

Kernel Hub(👈 查看它!)允许 Python 库和应用程序**直接从 Hugging Face Hub 加载优化的计算内核**。可以把它想象成模型中心,但是是用于加速特定操作(通常在 GPU 上)的低级、高性能代码片段(内核)。

示例包括高级注意力机制(如 FlashAttention,可显著提高速度并节省内存)。自定义量化内核(实现使用 INT8 或 INT4 等低精度数据类型的有效计算)。复杂架构(如 Mixture of Experts (MoE) 层)所需的专用内核,这些层涉及复杂的路由和计算模式。以及 激活函数归一化层(如 LayerNorm 或 RMSNorm)

您无需手动管理复杂的依赖项,无需处理编译标志,也无需从源代码构建 Triton 或 CUTLASS 等库,您可以使用 `kernels` 库立即获取并运行预编译的优化内核。

例如,要启用 **FlashAttention**,您只需一行代码——无需构建,无需标志

from kernels import get_kernel

flash_attention = get_kernel("kernels-community/flash-attn")

`kernels` 会检测您精确的 Python、PyTorch 和 CUDA 版本,然后下载匹配的预编译二进制文件——通常只需几秒钟(在连接较慢的情况下可能需要一两分钟)。

相比之下,自行编译 FlashAttention 需要:

  • 克隆仓库并安装所有依赖项。
  • 配置构建标志和环境变量。
  • 预留 **~96 GB 内存** 和大量 CPU 核心。
  • 等待 **10 分钟到数小时**,具体取决于您的硬件。(有关详细信息,请参阅该项目自己的安装指南。)

Kernel Hub 消除了所有这些麻烦:一个函数调用,即刻加速。

Kernel Hub 的优势:

  • **即时访问优化内核**:加载并运行针对各种硬件(从 NVIDIA 和 AMD GPU 开始)优化的内核,无需本地编译的麻烦。
  • **共享和重用**:在不同项目和社区中发现、共享和重用内核。
  • **轻松更新**:只需从 Hub 拉取最新版本,即可随时获取最新的内核改进。
  • **加速开发**:专注于模型架构和逻辑,而不是内核编译和部署的复杂性。
  • **提高性能**:利用专家优化的内核,有可能加速训练和推理。
  • **简化部署**:通过按需获取内核,降低部署环境的复杂性。
  • **开发和分享您自己的内核**:如果您创建了优化的内核,可以轻松将其分享到 Hub 上供他人使用。这鼓励了社区内的协作和知识共享。

正如许多机器学习开发者所知,管理依赖项和从源代码构建低级代码可能是一个耗时且容易出错的过程。Kernel Hub 旨在通过提供一个集中的优化计算内核仓库来简化这一点,这些内核可以轻松加载和运行。

将更多时间花在构建出色的模型上,减少与构建系统斗争的时间!

2. 如何使用 Kernel Hub(基本示例)

使用 Kernel Hub 的设计旨在直接了当。`kernels` 库提供了主要的接口。这是一个加载优化 GELU 激活函数内核的快速示例。(稍后,我们将看到另一个关于如何将内核集成到模型中的示例)。

文件:`activation_validation_example.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///

import torch
import torch.nn.functional as F
from kernels import get_kernel

DEVICE = "cuda"

# Make reproducible
torch.manual_seed(42)

# Download optimized activation kernels from the Hub
activation_kernels = get_kernel("kernels-community/activation")

# Create a random tensor on the GPU
x = torch.randn((4, 4), dtype=torch.float16, device=DEVICE)

# Prepare an output tensor
y = torch.empty_like(x)

# Run the fast GELU kernel
activation_kernels.gelu_fast(y, x)

# Get expected output using PyTorch's built-in GELU
expected = F.gelu(x)

# Compare the kernel output with PyTorch's result
torch.testing.assert_close(y, expected, rtol=1e-2, atol=1e-2)

print("✅ Kernel output matches PyTorch GELU!")

# Optional: print both tensors for inspection
print("\nInput tensor:")
print(x)
print("\nFast GELU kernel output:")
print(y)
print("\nPyTorch GELU output:")
print(expected)

# List available functions in the loaded kernel module
print("\nAvailable functions in 'kernels-community/activation':")
print(dir(activation_kernels))

**(注意:** 如果您安装了 `uv`,您可以将此脚本保存为 `script.py` 并运行 `uv run script.py`,它将自动处理依赖项。)

这里发生了什么?

  1. **导入 `get_kernel`**:此函数是通过 `kernels` 库访问 Kernel Hub 的入口点。
  2. **`get_kernel("kernels-community/activation")`**:这行代码在 `kernels-community` 组织下寻找 `activation` 内核仓库。它会下载、缓存并加载相应的预编译内核二进制文件。
  3. **准备张量**:我们在 GPU 上创建输入 (`x`) 和输出 (`y`) 张量。
  4. **`activation_kernels.gelu_fast(y, x)`**:我们调用加载的内核模块提供的特定优化函数 (`gelu_fast`)。
  5. **验证**:我们检查输出。

这个简单的示例展示了您如何轻松地获取和执行高度优化的代码。现在,让我们看看使用 RMS 归一化的更实际的集成。

3. 为简单模型添加 Kernel

让我们将一个优化的 **RMS 归一化**内核集成到一个基本模型中。我们将使用 `kernels-community/triton-layer-norm` 仓库中提供的 `LlamaRMSNorm` 实现(注意:此仓库包含各种归一化内核),并将其与基线 PyTorch RMSNorm 实现进行比较。

首先,在 PyTorch 中定义一个简单的 RMSNorm 模块和使用它的基线模型

文件:`rmsnorm_baseline.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///
import torch
import torch.nn as nn

DEVICE = "cuda"

DTYPE = torch.float16  # Use float16 for better kernel performance potential


# Simple PyTorch implementation of RMSNorm for baseline comparison
class RMSNorm(nn.Module):
    def __init__(self, hidden_size, variance_epsilon=1e-5):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.eps = variance_epsilon
        self.hidden_size = hidden_size

    def forward(self, x):
        # Assumes x is (batch_size, ..., hidden_size)
        input_dtype = x.dtype
        # Calculate variance in float32 for stability
        variance = x.to(torch.float32).pow(2).mean(-1, keepdim=True)
        x = x * torch.rsqrt(variance + self.eps)

        # Apply weight and convert back to original dtype
        return (self.weight * x).to(input_dtype)


class BaselineModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, eps=1e-5):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.norm = RMSNorm(hidden_size, variance_epsilon=eps)
        self.activation = nn.GELU()
        self.linear2 = nn.Linear(hidden_size, output_size)

        # ensure all linear layers weights are 1 for testing
        with torch.no_grad():
            self.linear1.weight.fill_(1)
            self.linear1.bias.fill_(0)
            self.linear2.weight.fill_(1)
            self.linear2.bias.fill_(0)
            self.norm.weight.fill_(1)

    def forward(self, x):
        x = self.linear1(x)
        x = self.norm(x)  # Apply RMSNorm
        x = self.activation(x)
        x = self.linear2(x)
        return x


# Example usage
input_size = 128
hidden_size = 256
output_size = 10
eps_val = 1e-5

baseline_model = (
    BaselineModel(input_size, hidden_size, output_size, eps=eps_val)
    .to(DEVICE)
    .to(DTYPE)
)
dummy_input = torch.randn(32, input_size, device=DEVICE, dtype=DTYPE)  # Batch of 32
output = baseline_model(dummy_input)
print("Baseline RMSNorm model output shape:", output.shape)

现在,让我们创建一个使用通过 `kernels` 加载的 `LlamaRMSNorm` 内核的版本。

文件:`rmsnorm_kernel.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///
import torch
import torch.nn as nn
from kernels import get_kernel, use_kernel_forward_from_hub

# reuse the model from the previous snippet or copy the class
# definition here to run this script independently
from rmsnorm_baseline import BaselineModel

DEVICE = "cuda"
DTYPE = torch.float16  # Use float16 for better kernel performance potential


layer_norm_kernel_module = get_kernel("kernels-community/triton-layer-norm")

# Simply add the decorator to the LlamaRMSNorm class to automatically replace the forward function
# with the optimized kernel version
# 
# Note: not all kernels ship with layers already mapped, and would require calling the function directly
# However in this case, the LlamaRMSNorm class is already mapped to the kernel function. Otherwise we'd need to
# call the function directly like this:
# ```python
# layer_norm_kernel_module.rms_norm_fn(
#     hidden_states,
#     self.weight,
#     bias=None,
#     residual=None,
#     eps=self.variance_epsilon,
#     dropout_p=0.0,
#     prenorm=False,
#     residual_in_fp32=False,
# )
# ```
@use_kernel_forward_from_hub("LlamaRMSNorm")
class OriginalRMSNorm(nn.Module):
    def __init__(self, hidden_size, variance_epsilon=1e-5):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.eps = variance_epsilon
        self.hidden_size = hidden_size

    def forward(self, x):
        # Assumes x is (batch_size, ..., hidden_size)
        input_dtype = x.dtype
        # Calculate variance in float32 for stability
        variance = x.to(torch.float32).pow(2).mean(-1, keepdim=True)
        x = x * torch.rsqrt(variance + self.eps)

        # Apply weight and convert back to original dtype
        return (self.weight * x).to(input_dtype)


class KernelModel(nn.Module):
    def __init__(
        self,
        input_size,
        hidden_size,
        output_size,
        device="cuda",
        dtype=torch.float16,
        eps=1e-5,
    ):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        # OriginalRMSNorm will be replaced with the optimized kernel layer
        # when the model is loaded
        self.norm = OriginalRMSNorm(hidden_size, variance_epsilon=eps)
        self.activation = nn.GELU()
        self.linear2 = nn.Linear(hidden_size, output_size)

        # ensure all linear layers weights are 1 for testing
        with torch.no_grad():
            self.linear1.weight.fill_(1)
            self.linear1.bias.fill_(0)
            self.linear2.weight.fill_(1)
            self.linear2.bias.fill_(0)
            self.norm.weight.fill_(1)

    def forward(self, x):
        x = self.linear1(x)
        x = self.norm(x)
        x = self.activation(x)
        x = self.linear2(x)
        return x


# Example usage
input_size = 128
hidden_size = 256
output_size = 10
eps_val = 1e-5

kernel_model = (
    KernelModel(
        input_size, hidden_size, output_size, device=DEVICE, dtype=DTYPE, eps=eps_val
    )
    .to(DEVICE)
    .to(DTYPE)
)

baseline_model = (
    BaselineModel(input_size, hidden_size, output_size, eps=eps_val)
    .to(DEVICE)
    .to(DTYPE)
)

dummy_input = torch.randn(32, input_size, device=DEVICE, dtype=DTYPE)  # Batch of 32

output = baseline_model(dummy_input)
output_kernel = kernel_model(dummy_input)
print("Kernel RMSNorm model output shape:", output_kernel.shape)

# Verify outputs are close (RMSNorm implementations should be numerically close)
try:
    torch.testing.assert_close(output, output_kernel, rtol=1e-2, atol=1e-2)
    print("\nBaseline and Kernel RMSNorm model outputs match!")
except AssertionError as e:
    print("\nBaseline and Kernel RMSNorm model outputs differ slightly:")
    print(e)
except NameError:
    print("\nSkipping output comparison as kernel model output was not generated.")

关于 `KernelModel` 的重要注意事项

  • **内核继承:** `KernelRMSNorm` 类继承自内核中的 RMSNorm 实现 `layer_norm_kernel_module.layers.LlamaRMSNorm`。这允许我们直接使用优化过的内核。
  • **访问函数:** 访问 RMSNorm 函数的确切方式(`layer_norm_kernel_module.layers.LlamaRMSNorm.forward`、`layer_norm_kernel_module.rms_norm_forward` 或其他)**完全取决于内核创建者在 Hub 上组织仓库的方式。** 您可能需要检查加载的 `layer_norm_kernel_module` 对象(例如,使用 `dir()`)或查看 Hub 上内核的文档以找到正确的函数/方法及其签名。我已使用 `rms_norm_forward` 作为合理的占位符并添加了错误处理。
  • **参数:** 现在我们只定义 `rms_norm_weight`(无偏置),这与 RMSNorm 一致。

4. 基准测试性能影响

与标准 PyTorch 版本相比,优化的 Triton RMSNorm 内核速度快了多少?让我们对前向传播进行基准测试,找出答案。

文件:`rmsnorm_benchmark.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///
import torch

# reuse the models from the previous snippets or copy the class
# definitions here to run this script independently
from rmsnorm_baseline import BaselineModel
from rmsnorm_kernel import KernelModel

DEVICE = "cuda"
DTYPE = torch.float16  # Use float16 for better kernel performance potential


# Use torch.cuda.Event for accurate GPU timing (ensure function is defined)
def benchmark_model(model, input_tensor, num_runs=100, warmup_runs=10):
    model.eval()  # Set model to evaluation mode
    dtype = input_tensor.dtype
    model = model.to(input_tensor.device).to(dtype)

    # Warmup runs
    for _ in range(warmup_runs):
        _ = model(input_tensor)
    torch.cuda.synchronize()

    # Timed runs
    start_event = torch.cuda.Event(enable_timing=True)
    end_event = torch.cuda.Event(enable_timing=True)
    start_event.record()
    for _ in range(num_runs):
        _ = model(input_tensor)
    end_event.record()
    torch.cuda.synchronize()
    elapsed_time_ms = start_event.elapsed_time(end_event)
    avg_time_ms = elapsed_time_ms / num_runs
    return avg_time_ms


input_size_bench = 4096
hidden_size_bench = 4096  # RMSNorm performance is sensitive to this dimension
output_size_bench = 10
eps_val_bench = 1e-5

# Create larger models and input for benchmark
# Ensure both models are fully converted to the target DEVICE and DTYPE
baseline_model_bench = (
    BaselineModel(
        input_size_bench, hidden_size_bench, output_size_bench, eps=eps_val_bench
    )
    .to(DEVICE)
    .to(DTYPE)
)
kernel_model_bench = (
    KernelModel(
        input_size_bench,
        hidden_size_bench,
        output_size_bench,
        device=DEVICE,
        dtype=DTYPE,
        eps=eps_val_bench,
    )
    .to(DEVICE)
    .to(DTYPE)
)

# call both with larger batch sizes to warm up the GPU
# and ensure the models are loaded
warmup_input = torch.randn(4096, input_size_bench, device=DEVICE, dtype=DTYPE)
_ = kernel_model_bench(warmup_input)
_ = baseline_model_bench(warmup_input)

batch_sizes = [
    256,
    512,
    1024,
    2048,
    4096,
    8192,
    16384,
    32768,
]

print(
    f"{'Batch Size':<12} | {'Baseline Time (ms)':<18} | {'Kernel Time (ms)':<18} | {'Speedup'}"
)
print("-" * 74)

for batch_size in batch_sizes:
    # Call cuda synchronize to ensure all previous GPU operations are complete
    torch.cuda.synchronize()

    # Create random input tensor
    # Ensure the input tensor is on the correct device and dtype
    bench_input = torch.randn(batch_size, input_size_bench, device=DEVICE, dtype=DTYPE)

    # Run benchmarks only if kernel was loaded successfully
    baseline_time = benchmark_model(baseline_model_bench, bench_input)

    kernel_time = -1  # Sentinel value

    kernel_time = benchmark_model(kernel_model_bench, bench_input)

    baseline_time = round(baseline_time, 4)
    kernel_time = round(kernel_time, 4)
    speedup = round(baseline_time / kernel_time, 2) if kernel_time > 0 else "N/A"
    if kernel_time < baseline_time:
        speedup = f"{speedup:.2f}x"
    elif kernel_time == baseline_time:
        speedup = "1.00x (identical)"
    else:
        speedup = f"{kernel_time / baseline_time:.2f}x slower"
    print(f"{batch_size:<12} | {baseline_time:<18} | {kernel_time:<18} | {speedup}")

**预期结果:** 与 LayerNorm 一样,使用 Triton 的良好调优 RMSNorm 实现可以比 PyTorch 的默认版本带来显著的加速——特别是对于兼容硬件(例如,NVIDIA Ampere 或 Hopper GPU)上的内存密集型工作负载和低精度类型(如 `float16` 或 `bfloat16`)。

请记住

  • 结果可能因您的 GPU、输入大小和数据类型而异。
  • 微基准测试可能无法真实反映实际性能。
  • 性能取决于内核实现的质量。
  • 由于开销,优化内核可能对小批量大小无益。

实际结果将取决于您的硬件和具体的内核实现。以下是您可能看到的一个示例(在 L4 GPU 上)

批量大小 基线时间 (ms) 内核时间 (ms) 加速比
256 0.2122 0.2911 0.72x
512 0.4748 0.3312 1.43x
1024 0.8946 0.6864 1.30x
2048 2.0289 1.3889 1.46x
4096 4.4318 2.2467 1.97x
8192 9.2438 4.8497 1.91x
16384 18.6992 9.8805 1.89x
32768 37.079 19.9461 1.86x
65536 73.588 39.593 1.86x

5. 真实世界用例

`kernels` 库仍在发展中,但已在各种实际项目中得到应用,包括:

  • Text Generation Inference:TGI 项目使用 `kernels` 库加载用于文本生成任务的优化内核,从而提高性能和效率。
  • Transformers:Transformers 库已集成了 `kernels` 库,无需对模型代码进行任何更改即可使用优化层。这允许用户在标准实现和优化实现之间轻松切换。

开始和下一步!

您已经了解了使用 Hugging Face Kernel Hub 获取和使用优化内核是多么容易。准备好自己尝试了吗?

  1. 安装库

    pip install kernels torch numpy
    

    确保您已安装兼容的 PyTorch 版本和 GPU 驱动程序。

  2. **浏览 Hub:** 在 Hugging Face Hub 上,在 `kernels` 标签下或在 `kernels-community` 等组织中探索可用的内核。寻找与您的操作相关的内核(激活函数、注意力机制、LayerNorm/RMSNorm 等归一化)。

  3. **实验:** 尝试替换您自己模型中的组件。使用 `get_kernel("user-or-org/kernel-name")`。 **至关重要的是,检查加载的内核对象**(例如,使用 `print(dir(loaded_kernel))`)或查看其 Hub 仓库文档,以了解如何正确调用其函数/方法以及它期望哪些参数(权重、偏置、输入、epsilon)。

  4. **基准测试:** 衡量其对您特定硬件和工作负载的性能影响。不要忘记检查数值正确性 (`torch.testing.assert_close`)。

  5. **(高级)贡献:** 如果您开发了优化的内核,请考虑在 Hub 上分享它们!

结论

Hugging Face Kernel Hub 提供了一种强大而简单的方式来访问和利用优化的计算内核。通过将标准 PyTorch 组件替换为针对 RMS 归一化等操作优化的版本,您可以在不增加传统自定义构建复杂性的情况下,潜在地实现显著的性能提升。请记住查看 Hub 上每个内核的详细信息,以确保正确使用。快来尝试一下,看看它如何加速您的工作流程!

社区

这是一篇好文章 #

我的实现不完整,输出结果只是噪音,但即使在 Zero GPU 空间中,SDXL 管线仍然可以工作到输出阶段。替换似乎相对稳定。
此外,kernels 模块的主体是一个 C++ 编译的二进制文件,因此无需 CUDA Toolkit 即可安装!这在 Zero GPU 环境中是一个很有用的功能。
如果 Hugging Face 的主要库能够逐步在内部采用 kernels 库(通过可选开关),我们可以期待智能的性能提升。
参数和返回值与原始库不匹配,因此可能需要一个包装器。

import torch
from kernels import get_kernel

if torch.cuda.is_available():
    flash_attn = get_kernel("kernels-community/flash-attn")
    try:
        sdpa = torch.nn.functional.scaled_dot_product_attention

        def sdpa_hijack(query, key, value, attn_mask=None, dropout_p=0.0, is_causal=False, scale=None):
            if query.shape[3] <= 128 and attn_mask is None and query.dtype != torch.float32:
                result = flash_attn.mha_fwd(q=query.transpose(1, 2), k=key.transpose(1, 2), v=value.transpose(1, 2),
                    p_dropout=dropout_p, is_causal=is_causal, softmax_scale=1.0 if scale is None else scale)[0]
                hidden_states = result.transpose(1, 2) if result is not None else None
            else:
                hidden_states = sdpa(query=query, key=key, value=value,
                    attn_mask=attn_mask, dropout_p=dropout_p, is_causal=is_causal, scale=scale)
            return hidden_states

        torch.nn.functional.scaled_dot_product_attention = sdpa_hijack
        print("# # #\nHijacked SDPA with kernels Flash Attention\n# # #")
    except ImportError as e:
        print(f"# # #\nCould not load Flash Attention for hijack:\n{e}\n# # #")
else:
    print(f"# # #\nCould not detect GPU\n# # #")

# https://github.com/huggingface/diffusers/discussions/7172
# https://huggingface.co/kernels-community/flash-attn
# https://huggingface.co/blog/hello-hf-kernels

注册登录 以评论