Nano-vLLM 遇见推理端点
最近,我致力于一个概念验证,旨在简化各种引擎的集成,以提供推理端点服务。这促使我创建了 hfendpoint-draft,这是一个用于实验将暴露的 API 逻辑与推理引擎尽可能隔离的项目。主要思想是避免强制使用原生 Python 绑定(大多数项目都这样做,原因充分),而是构建一个不受引擎所用语言或构建环境影响的系统。
在测试与 transformers 和 llama.cpp 的集成时,我添加了图像生成、嵌入和支持流式传输的聊天完成 API。我对初步结果和基准测试非常满意。
那么,有什么新内容呢?最近出现了一个令人兴奋的项目:Nano-vLLM,一个从头开始构建的轻量级 vLLM 实现,正是我喜欢的那种极简主义项目。这也是挑战我新项目的完美方式!绑定这个新引擎会有多难?让我们一探究竟!
理解架构
该项目提供了一个非常简单明了的示例,精确展示了如何使用该引擎,并使用 Qwen/Qwen3-0.6B
,这是 Nano-vLLM 目前设计的模型。
import os
from nanovllm import LLM, SamplingParams
from transformers import AutoTokenizer
def main():
path = os.path.expanduser("~/huggingface/Qwen3-0.6B/")
tokenizer = AutoTokenizer.from_pretrained(path)
llm = LLM(path, enforce_eager=True, tensor_parallel_size=1)
sampling_params = SamplingParams(temperature=0.6, max_tokens=256)
prompts = [
"introduce yourself",
"list all prime numbers within 100",
]
prompts = [
tokenizer.apply_chat_template(
[{"role": "user", "content": prompt}],
tokenize=False,
add_generation_prompt=True,
enable_thinking=True
)
for prompt in prompts
]
outputs = llm.generate(prompts, sampling_params)
for prompt, output in zip(prompts, outputs):
print("\n")
print(f"Prompt: {prompt!r}")
print(f"Completion: {output['text']!r}")
if __name__ == "__main__":
main()
这段代码非常适合离线生成,但我们不能将其用于并发 Web 服务。我们需要对请求生命周期进行更多控制。
因此,在编写一行代码之前,让我们先分解 Nano-vLLM 的构建块。快速查看源代码会发现其模块化且设计精良的架构:
LLMEngine
:示例中使用的类,我们需要重写它。Scheduler
:它管理运行中和等待中的序列队列,决定在下一个批次中处理哪些序列,并处理预填充(摄取提示)和解码(生成令牌)的逻辑。ModelRunner
:这是奇迹发生的地方。它加载模型权重,管理 GPU 内存和 KV 缓存,并执行模型的实际前向传递。Sequence
:一个简单的数据类,表示单个请求,包含其令牌 ID、状态和采样参数。
我们的目标是构建一个能够处理多个并发请求而不会阻塞的服务。为此,我们将直接使用 Scheduler
和 ModelRunner
创建我们自己的引擎循环。
构建自定义引擎服务
Worker 类
为了实现这一目标,我们将构建一个 Worker
,它将封装核心 Nano-vLLM 组件并在专用线程中管理它们。我们 Worker
的构造函数是 LLMEngine
的 __init__
方法的直接改编,但我们引入了两个关键修改:
异步钩子:我们添加了一个
queue.Queue
和一个threading.Condition
。它们将作为从异步端点向我们的同步引擎提交请求的接口。自动化模型获取:我们将路径参数替换为对
huggingface_hub.snapshot_download
的调用。这使得服务自包含并简化了部署,确保模型在未存在时自动下载到缓存。
这很简单,到目前为止我们主要是在“偷”一些代码。
class Worker:
def __init__(self):
model_path = snapshot_download(repo_id="Qwen/Qwen3-0.6B")
self.config = Config(model_path, **CONFIG)
self.tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)
self.config.eos = self.tokenizer.eos_token_id
self.requests = queue.Queue()
self.notifier = threading.Condition()
self.loop = None
self.engine = None
self.processes = []
self.events = []
if self.config.tensor_parallel_size > 1:
ctx = mp.get_context("spawn")
for i in range(1, self.config.tensor_parallel_size):
event = ctx.Event()
process = ctx.Process(target=ModelRunner, args=(self.config, i, event))
process.start()
self.processes.append(process)
self.events.append(event)
self.model_runner = ModelRunner(self.config, 0, self.events)
self.scheduler = Scheduler(self.config)
atexit.register(self.stop)
运行循环
为了构建一个响应式服务,我们需要一个独立运行的处理循环。这就是 _run
方法的工作。这个方法是我们的 Worker 的核心。它连接了两个世界:Scheduler
和 ModelRunner
的同步、批处理导向的逻辑,以及我们 Web 服务的异步、事件驱动的世界。它的任务很简单:不断检查工作,处理一个批次,并将结果发送回等待的代码。
def _run(self):
while True:
try:
with self.notifier:
self.notifier.wait_for(lambda: not self.requests.empty() or not self.scheduler.is_finished())
while not self.requests.empty():
seq = self.requests.get_nowait()
self.scheduler.add(seq)
sequences, is_prefill = self.scheduler.schedule()
if not sequences:
continue
new_token_ids = self.model_runner.call("run", sequences, is_prefill)
self.scheduler.postprocess(sequences, new_token_ids)
for seq, token_id in zip(sequences, new_token_ids):
response_queue = getattr(seq, 'response_queue', None)
if not response_queue:
continue
self.loop.call_soon_threadsafe(response_queue.put_nowait, token_id)
if seq.is_finished:
self.loop.call_soon_threadsafe(response_queue.put_nowait, None)
except Exception as e:
hfendpoint.error(f"worker loop: {e}")
让我们分解一下:
循环以
self.notifier.wait_for(...)
开始,这会使线程进入休眠状态,直到新请求到达或调度程序仍有工作要做。当它醒来时,它会清空请求队列并将任何新序列添加到调度程序中。
然后调度程序批量处理序列。
接下来是计算:
self.model_runner.call("run", ...)
为每个序列生成下一个令牌。self.scheduler.postprocess(...)
更新状态,附加新令牌,并标记已完成的序列。最后,我们将令牌发送回正确的队列。当一个序列完成时,我们还会发送一个
None
来表示流的结束。
就是这样!
提交请求
在引擎循环运行后,我们需要一个线程安全的方法来从我们的 Web 服务提交请求。这就是 submit
方法的作用,它是 LLMEngine
类中 add_request
方法的精神继承者。
def submit(self, prompt_token_ids: list[int], sampling_params: SamplingParams) -> asyncio.Queue:
seq = Sequence(prompt_token_ids, sampling_params)
seq.response_queue = asyncio.Queue()
self.requests.put(seq)
with self.notifier:
self.notifier.notify()
return seq.response_queue
我们也来分解一下:
首先,我们创建一个 Sequence,就像在
LLMEngine
中一样。但这次,我们向其附加一个asyncio.Queue
。这为每个请求提供了自己的私有通道,用于将流式令牌传回给发出调用的处理程序。我们将序列放入
self.requests
并调用self.notifier.notify()
,如果引擎线程正在等待,则将其唤醒。最后,也是最重要的一点,我们返回
response_queue
。调用方可以立即开始等待令牌,而不会在排队或推理过程中被阻塞。
这甚至更简单!
创建处理程序
最后一步是创建 hfendpoint
处理程序,它将我们的服务暴露给守护进程。
worker = Worker()
@hfendpoint.handler("chat_completions")
async def chat(request_data: Dict[str, Any]):
prompt_text = worker.tokenizer.apply_chat_template(
request_data["messages"],
tokenize=False,
add_generation_prompt=True,
enable_thinking=True
)
prompt_token_ids = worker.tokenizer.encode(prompt_text)
sampling_params = SamplingParams(
temperature=request_data.get("temperature", 0.7),
max_tokens=request_data.get("max_tokens", 2048),
)
response = worker.submit(prompt_token_ids, sampling_params)
decoder = DecodeStream(skip_special_tokens=True)
while True:
token_id = await response.get()
if token_id is None:
break
output = decoder.step(worker.tokenizer._tokenizer, token_id)
if output:
yield {"content": output}
yield {"content":"", "finish_reason": "stop"}
if __name__ == "__main__":
asyncio.run(worker.start())
实例化 Worker
后,chat
处理程序负责处理每个传入的 chat_completions
请求。它对输入数据进行标记化,创建适当的 SamplingParams
,然后调用 worker.submit
将任务分派给引擎。
然后,处理程序可以 await
此队列中的令牌。我们使用 tokenizers
库中方便的 DecodeStream
工具将令牌流转换回有效文本。一旦接收到 None
,循环终止,并将最终消息发送给客户端。
至此,实现已完成。服务的完整源代码可在此处获取:https://github.com/angt/hfendpoint-draft-nanovllm。
部署到推理端点
现在您已准备好进行推理了。您按下 Enter
键,然后……一片寂静。什么都没有。这是因为 Nano-vLLM 需要现代 GPU 架构。这就是 Hugging Face 推理端点特别出彩的地方。当我们只是想做一些测试时,它能免费提供最好的硬件,并已完全配置好。
首先,前往 此页面 并点击 New endpoint
。然后,在 Hardware Configuration
中,选择 GPU
并选择带有 1 个 GPU 的 Nvidia L4
实例。
最后,在 Container Configuration
中,选择 Custom
类型,并使用此 URL 作为容器:
ghcr.io/angt/hfendpoint-draft-nanovllm
当端点准备就绪后,其状态将变为 Running
。将有一个专用 URL 可用于访问您的容器,其形式类似于:https://<您的端点名称>.endpoints.huggingface.cloud
。
默认情况下,端点是 Protected
,因此您需要访问令牌才能发送请求。幸运的是,获取令牌非常容易。前往您的 Access Tokens
部分并创建一个令牌(Read
角色就足够了)。
为了方便起见,让我们设置 HF_ENDPOINT_URL
和 HF_TOKEN
:
HF_ENDPOINT_URL=https://<your-endpoint-name>.endpoints.huggingface.cloud
HF_TOKEN=hf_xxxx
并运行以下 curl
命令:
curl "$HF_ENDPOINT_URL/v1/chat/completions" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $HF_TOKEN" -d'{
"messages": [
{ "role": "system", "content": "You are a helpful assistant." },
{ "role": "user", "content": "Hello!" }
]
}'
就这样!希望这能启发您在自己的端点上尝试 Nano-vLLM。祝您编程愉快!