Nano-vLLM 遇见推理端点

社区文章 2025年6月25日发布

最近,我致力于一个概念验证,旨在简化各种引擎的集成,以提供推理端点服务。这促使我创建了 hfendpoint-draft,这是一个用于实验将暴露的 API 逻辑与推理引擎尽可能隔离的项目。主要思想是避免强制使用原生 Python 绑定(大多数项目都这样做,原因充分),而是构建一个不受引擎所用语言或构建环境影响的系统。

在测试与 transformersllama.cpp 的集成时,我添加了图像生成、嵌入和支持流式传输的聊天完成 API。我对初步结果和基准测试非常满意。

那么,有什么新内容呢?最近出现了一个令人兴奋的项目:Nano-vLLM,一个从头开始构建的轻量级 vLLM 实现,正是我喜欢的那种极简主义项目。这也是挑战我新项目的完美方式!绑定这个新引擎会有多难?让我们一探究竟!

理解架构

该项目提供了一个非常简单明了的示例,精确展示了如何使用该引擎,并使用 Qwen/Qwen3-0.6B,这是 Nano-vLLM 目前设计的模型。

import os
from nanovllm import LLM, SamplingParams
from transformers import AutoTokenizer

def main():
    path = os.path.expanduser("~/huggingface/Qwen3-0.6B/")
    tokenizer = AutoTokenizer.from_pretrained(path)
    llm = LLM(path, enforce_eager=True, tensor_parallel_size=1)

    sampling_params = SamplingParams(temperature=0.6, max_tokens=256)
    prompts = [
        "introduce yourself",
        "list all prime numbers within 100",
    ]
    prompts = [
        tokenizer.apply_chat_template(
            [{"role": "user", "content": prompt}],
            tokenize=False,
            add_generation_prompt=True,
            enable_thinking=True
        )
        for prompt in prompts
    ]
    outputs = llm.generate(prompts, sampling_params)

    for prompt, output in zip(prompts, outputs):
        print("\n")
        print(f"Prompt: {prompt!r}")
        print(f"Completion: {output['text']!r}")

if __name__ == "__main__":
    main()

这段代码非常适合离线生成,但我们不能将其用于并发 Web 服务。我们需要对请求生命周期进行更多控制。

因此,在编写一行代码之前,让我们先分解 Nano-vLLM 的构建块。快速查看源代码会发现其模块化且设计精良的架构:

  • LLMEngine:示例中使用的类,我们需要重写它。

  • Scheduler:它管理运行中和等待中的序列队列,决定在下一个批次中处理哪些序列,并处理预填充(摄取提示)和解码(生成令牌)的逻辑。

  • ModelRunner:这是奇迹发生的地方。它加载模型权重,管理 GPU 内存和 KV 缓存,并执行模型的实际前向传递。

  • Sequence:一个简单的数据类,表示单个请求,包含其令牌 ID、状态和采样参数。

我们的目标是构建一个能够处理多个并发请求而不会阻塞的服务。为此,我们将直接使用 SchedulerModelRunner 创建我们自己的引擎循环。

构建自定义引擎服务

Worker 类

为了实现这一目标,我们将构建一个 Worker,它将封装核心 Nano-vLLM 组件并在专用线程中管理它们。我们 Worker 的构造函数是 LLMEngine__init__ 方法的直接改编,但我们引入了两个关键修改:

  • 异步钩子:我们添加了一个 queue.Queue 和一个 threading.Condition。它们将作为从异步端点向我们的同步引擎提交请求的接口。

  • 自动化模型获取:我们将路径参数替换为对 huggingface_hub.snapshot_download 的调用。这使得服务自包含并简化了部署,确保模型在未存在时自动下载到缓存。

这很简单,到目前为止我们主要是在“偷”一些代码。

class Worker:
    def __init__(self):
        model_path = snapshot_download(repo_id="Qwen/Qwen3-0.6B")
        self.config = Config(model_path, **CONFIG)
        self.tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)
        self.config.eos = self.tokenizer.eos_token_id
        self.requests = queue.Queue()
        self.notifier = threading.Condition()
        self.loop = None
        self.engine = None
        self.processes = []
        self.events = []
        if self.config.tensor_parallel_size > 1:
            ctx = mp.get_context("spawn")
            for i in range(1, self.config.tensor_parallel_size):
                event = ctx.Event()
                process = ctx.Process(target=ModelRunner, args=(self.config, i, event))
                process.start()
                self.processes.append(process)
                self.events.append(event)
        self.model_runner = ModelRunner(self.config, 0, self.events)
        self.scheduler = Scheduler(self.config)
        atexit.register(self.stop)

运行循环

为了构建一个响应式服务,我们需要一个独立运行的处理循环。这就是 _run 方法的工作。这个方法是我们的 Worker 的核心。它连接了两个世界:SchedulerModelRunner 的同步、批处理导向的逻辑,以及我们 Web 服务的异步、事件驱动的世界。它的任务很简单:不断检查工作,处理一个批次,并将结果发送回等待的代码。

def _run(self):
        while True:
            try:
                with self.notifier:
                    self.notifier.wait_for(lambda: not self.requests.empty() or not self.scheduler.is_finished())

                while not self.requests.empty():
                    seq = self.requests.get_nowait()
                    self.scheduler.add(seq)

                sequences, is_prefill = self.scheduler.schedule()
                if not sequences:
                    continue

                new_token_ids = self.model_runner.call("run", sequences, is_prefill)
                self.scheduler.postprocess(sequences, new_token_ids)

                for seq, token_id in zip(sequences, new_token_ids):
                    response_queue = getattr(seq, 'response_queue', None)
                    if not response_queue:
                        continue
                    self.loop.call_soon_threadsafe(response_queue.put_nowait, token_id)
                    if seq.is_finished:
                        self.loop.call_soon_threadsafe(response_queue.put_nowait, None)

            except Exception as e:
                hfendpoint.error(f"worker loop: {e}")

让我们分解一下:

  • 循环以 self.notifier.wait_for(...) 开始,这会使线程进入休眠状态,直到新请求到达或调度程序仍有工作要做。

  • 当它醒来时,它会清空请求队列并将任何新序列添加到调度程序中。

  • 然后调度程序批量处理序列。

  • 接下来是计算:self.model_runner.call("run", ...) 为每个序列生成下一个令牌。self.scheduler.postprocess(...) 更新状态,附加新令牌,并标记已完成的序列。

  • 最后,我们将令牌发送回正确的队列。当一个序列完成时,我们还会发送一个 None 来表示流的结束。

就是这样!

提交请求

在引擎循环运行后,我们需要一个线程安全的方法来从我们的 Web 服务提交请求。这就是 submit 方法的作用,它是 LLMEngine 类中 add_request 方法的精神继承者。

    def submit(self, prompt_token_ids: list[int], sampling_params: SamplingParams) -> asyncio.Queue:
        seq = Sequence(prompt_token_ids, sampling_params)
        seq.response_queue = asyncio.Queue()
        self.requests.put(seq)
        with self.notifier:
            self.notifier.notify()
        return seq.response_queue

我们也来分解一下:

  • 首先,我们创建一个 Sequence,就像在 LLMEngine 中一样。但这次,我们向其附加一个 asyncio.Queue。这为每个请求提供了自己的私有通道,用于将流式令牌传回给发出调用的处理程序。

  • 我们将序列放入 self.requests 并调用 self.notifier.notify(),如果引擎线程正在等待,则将其唤醒。

  • 最后,也是最重要的一点,我们返回 response_queue。调用方可以立即开始等待令牌,而不会在排队或推理过程中被阻塞。

这甚至更简单!

创建处理程序

最后一步是创建 hfendpoint 处理程序,它将我们的服务暴露给守护进程。

worker = Worker()

@hfendpoint.handler("chat_completions")
async def chat(request_data: Dict[str, Any]):
    prompt_text = worker.tokenizer.apply_chat_template(
        request_data["messages"],
        tokenize=False,
        add_generation_prompt=True,
        enable_thinking=True
    )
    prompt_token_ids = worker.tokenizer.encode(prompt_text)

    sampling_params = SamplingParams(
        temperature=request_data.get("temperature", 0.7),
        max_tokens=request_data.get("max_tokens", 2048),
    )
    response = worker.submit(prompt_token_ids, sampling_params)
    decoder = DecodeStream(skip_special_tokens=True)

    while True:
        token_id = await response.get()
        if token_id is None:
            break
        output = decoder.step(worker.tokenizer._tokenizer, token_id)
        if output:
            yield {"content": output}

    yield {"content":"", "finish_reason": "stop"}

if __name__ == "__main__":
    asyncio.run(worker.start())

实例化 Worker 后,chat 处理程序负责处理每个传入的 chat_completions 请求。它对输入数据进行标记化,创建适当的 SamplingParams,然后调用 worker.submit 将任务分派给引擎。

然后,处理程序可以 await 此队列中的令牌。我们使用 tokenizers 库中方便的 DecodeStream 工具将令牌流转换回有效文本。一旦接收到 None,循环终止,并将最终消息发送给客户端。

至此,实现已完成。服务的完整源代码可在此处获取:https://github.com/angt/hfendpoint-draft-nanovllm

部署到推理端点

现在您已准备好进行推理了。您按下 Enter 键,然后……一片寂静。什么都没有。这是因为 Nano-vLLM 需要现代 GPU 架构。这就是 Hugging Face 推理端点特别出彩的地方。当我们只是想做一些测试时,它能免费提供最好的硬件,并已完全配置好。

首先,前往 此页面 并点击 New endpoint。然后,在 Hardware Configuration 中,选择 GPU 并选择带有 1 个 GPU 的 Nvidia L4 实例。

Hardware Configuration

最后,在 Container Configuration 中,选择 Custom 类型,并使用此 URL 作为容器:

ghcr.io/angt/hfendpoint-draft-nanovllm

Container Configuration

当端点准备就绪后,其状态将变为 Running。将有一个专用 URL 可用于访问您的容器,其形式类似于:https://<您的端点名称>.endpoints.huggingface.cloud

默认情况下,端点是 Protected,因此您需要访问令牌才能发送请求。幸运的是,获取令牌非常容易。前往您的 Access Tokens 部分并创建一个令牌(Read 角色就足够了)。

Access Tokens

为了方便起见,让我们设置 HF_ENDPOINT_URLHF_TOKEN

HF_ENDPOINT_URL=https://<your-endpoint-name>.endpoints.huggingface.cloud
HF_TOKEN=hf_xxxx

并运行以下 curl 命令:

curl "$HF_ENDPOINT_URL/v1/chat/completions" \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $HF_TOKEN" -d'{
    "messages": [
        { "role": "system", "content": "You are a helpful assistant." },
        { "role": "user", "content": "Hello!" }
    ]
}'

就这样!希望这能启发您在自己的端点上尝试 Nano-vLLM。祝您编程愉快!

社区

注册登录 以评论