MCP 课程文档

Webhook 监听器

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

Webhook 监听器

Webhook 监听器是我们 Pull Request 代理的入口点。当讨论被创建或更新时,它会从 Hugging Face Hub 接收实时事件,从而触发我们由 MCP 提供支持的标记工作流程。在本节中,我们将使用 FastAPI 实现一个 webhook 处理程序。

理解 Webhook 集成

遵循 Hugging Face Webhook 指南,我们的 webhook 监听器将验证传入的请求并实时处理讨论事件。

Webhook Creation

Webhook 事件流

理解 webhook 流程对于构建可靠的监听器至关重要

  1. 用户操作:某人在模型仓库讨论中创建评论
  2. Hub 事件:Hugging Face 生成一个 webhook 事件
  3. Webhook 交付:Hub 向我们的端点发送 POST 请求
  4. 身份验证:我们验证 webhook 密钥
  5. 处理:从评论内容中提取标签
  6. 操作:使用 MCP 工具为新标签创建 Pull Request

Webhooks 是推送通知——Hugging Face Hub 会主动向您的应用程序发送事件,而不是您轮询更改。这使得能够实时响应讨论和评论。

FastAPI Webhook 应用程序

让我们一步步构建 webhook 监听器,从基础开始,逐步构建完整的处理逻辑。

1. 应用程序设置

首先,让我们设置基本的 FastAPI 应用程序,包括所有必要的导入和配置

import os
import json
from datetime import datetime
from typing import List, Dict, Any, Optional

from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

这些导入提供了构建健壮的 webhook 处理程序所需的一切。`FastAPI` 提供 Web 框架,`BackgroundTasks` 启用异步处理,而类型导入有助于数据验证。

现在让我们配置我们的应用程序

# Configuration
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
HF_TOKEN = os.getenv("HF_TOKEN")

# Simple storage for processed operations
tag_operations_store: List[Dict[str, Any]] = []

app = FastAPI(title="HF Tagging Bot")
app.add_middleware(CORSMiddleware, allow_origins=["*"])

此配置设置了

  • Webhook 密钥:用于验证传入的 webhook
  • HF 令牌:用于向 Hub API 进行身份验证
  • 操作存储:用于监控已处理操作的内存存储
  • CORS 中间件:允许 Web 界面的跨域请求
`tag_operations_store` 列表跟踪最近的 webhook 处理操作。这对于调试和监控很有用,但在生产环境中,您可能希望使用数据库或限制此列表的大小。

2. Webhook 数据模型

根据 Hugging Face webhook 文档,我们需要理解 webhook 数据结构

class WebhookEvent(BaseModel):
    event: Dict[str, str]          # Contains action and scope information
    comment: Dict[str, Any]        # Comment content and metadata
    discussion: Dict[str, Any]     # Discussion information
    repo: Dict[str, str]           # Repository details

此 Pydantic 模型帮助我们理解 webhook 结构。

我们关心的关键字段是

  • event.action:对于新评论通常是“create”
  • event.scope:对于评论事件通常是“discussion.comment”
  • comment.content:实际评论文本
  • repo.name:评论所在的仓库

3. 核心 Webhook 处理程序

现在是主要的 webhook 处理程序——这是重要部分发生的地方。让我们把它分解成易于理解的部分

@app.post("/webhook")
async def webhook_handler(request: Request, background_tasks: BackgroundTasks):
    """
    Handle incoming webhooks from Hugging Face Hub
    Following the pattern from: https://raw.githubusercontent.com/huggingface/hub-docs/refs/heads/main/docs/hub/webhooks-guide-discussion-bot.md
    """
    print("🔔 Webhook received!")
    
    # Step 1: Validate webhook secret (security)
    webhook_secret = request.headers.get("X-Webhook-Secret")
    if webhook_secret != WEBHOOK_SECRET:
        print("❌ Invalid webhook secret")
        return {"error": "incorrect secret"}, 400

第一步是安全验证。我们对照配置的密钥检查 `X-Webhook-Secret` 标头,以确保 webhook 合法。

始终验证 webhook 密钥!如果没有此检查,任何人都可以向您的应用程序发送虚假的 webhook 请求。密钥充当 Hugging Face 和您的应用程序之间的共享密码。

接下来,让我们解析和验证 webhook 数据

    # Step 2: Parse webhook data
    try:
        webhook_data = await request.json()
        print(f"📥 Webhook data: {json.dumps(webhook_data, indent=2)}")
    except Exception as e:
        print(f"❌ Error parsing webhook data: {str(e)}")
        return {"error": "invalid JSON"}, 400
    
    # Step 3: Validate event structure
    event = webhook_data.get("event", {})
    if not event:
        print("❌ No event data in webhook")
        return {"error": "missing event data"}, 400

此解析步骤优雅地处理潜在的 JSON 错误,并验证我们是否具有预期的事件结构。

现在是事件过滤逻辑

    # Step 4: Check if this is a discussion comment creation
    # Following the webhook guide pattern:
    if (
        event.get("action") == "create" and 
        event.get("scope") == "discussion.comment"
    ):
        print("✅ Valid discussion comment creation event")
        
        # Process in background to return quickly to Hub
        background_tasks.add_task(process_webhook_comment, webhook_data)
        
        return {
            "status": "accepted",
            "message": "Comment processing started",
            "timestamp": datetime.now().isoformat()
        }
    else:
        print(f"ℹ️ Ignoring event: action={event.get('action')}, scope={event.get('scope')}")
        return {
            "status": "ignored",
            "reason": "Not a discussion comment creation"
        }

此过滤可确保我们只处理我们关心的事件——新的讨论评论。我们忽略其他事件,例如仓库创建、模型上传等。

我们使用 FastAPI 的 `background_tasks.add_task()` 异步处理 webhook。这使我们能够快速(在几秒钟内)返回响应,而实际的标签处理则在后台进行。

Webhook 端点应在 10 秒内响应,否则发送平台可能会认为它们失败。使用后台任务可确保快速响应,同时允许复杂的处理异步进行。

4. 评论处理逻辑

现在让我们实现核心评论处理函数,它执行实际的标签提取和 MCP 工具使用

async def process_webhook_comment(webhook_data: Dict[str, Any]):
    """
    Process webhook comment to detect and add tags
    Integrates with our MCP client for Hub interactions
    """
    print("🏷️ Starting process_webhook_comment...")
    
    try:
        # Extract comment and repository information
        comment_content = webhook_data["comment"]["content"]
        discussion_title = webhook_data["discussion"]["title"]
        repo_name = webhook_data["repo"]["name"]
        discussion_num = webhook_data["discussion"]["num"]
        comment_author = webhook_data["comment"]["author"].get("id", "unknown")
        
        print(f"📝 Comment from {comment_author}: {comment_content}")
        print(f"📰 Discussion: {discussion_title}")
        print(f"📦 Repository: {repo_name}")

此初始部分从 webhook 数据中提取所有相关信息。我们同时获取评论内容和讨论标题,因为标签可能在两者中的任何一个位置被提及。

接下来,我们提取并处理标签

        # Extract potential tags from comment and title
        comment_tags = extract_tags_from_text(comment_content)
        title_tags = extract_tags_from_text(discussion_title)
        all_tags = list(set(comment_tags + title_tags))
        
        print(f"🔍 Found tags: {all_tags}")
        
        # Store operation for monitoring
        operation = {
            "timestamp": datetime.now().isoformat(),
            "repo_name": repo_name,
            "discussion_num": discussion_num,
            "comment_author": comment_author,
            "extracted_tags": all_tags,
            "comment_preview": comment_content[:100] + "..." if len(comment_content) > 100 else comment_content,
            "status": "processing"
        }
        tag_operations_store.append(operation)

我们结合了来自两个来源的标签,并创建了一个操作记录以供监控。此记录跟踪每个 webhook 处理操作的进度。

存储操作记录对于调试和监控至关重要。当出现问题时,您可以查看最近的操作以了解发生了什么以及为什么。

现在是 MCP 代理集成

        if not all_tags:
            operation["status"] = "no_tags"
            operation["message"] = "No recognizable tags found"
            print("❌ No tags found to process")
            return
        
        # Get MCP agent for tag processing
        agent = await get_agent()
        if not agent:
            operation["status"] = "error"
            operation["message"] = "Agent not configured (missing HF_TOKEN)"
            print("❌ No agent available")
            return
        
        # Process each extracted tag
        operation["results"] = []
        for tag in all_tags:
            try:
                print(f"🤖 Processing tag '{tag}' for repo '{repo_name}'")
                
                # Create prompt for agent to handle tag processing
                prompt = f"""
                Analyze the repository '{repo_name}' and determine if the tag '{tag}' should be added.
                
                First, check the current tags using get_current_tags.
                If '{tag}' is not already present and it's a valid tag, add it using add_new_tag.
                
                Repository: {repo_name}
                Tag to process: {tag}
                
                Provide a clear summary of what was done.
                """
                
                response = await agent.run(prompt)
                print(f"🤖 Agent response for '{tag}': {response}")
                
                # Parse response and store result
                tag_result = {
                    "tag": tag,
                    "response": response,
                    "timestamp": datetime.now().isoformat()
                }
                operation["results"].append(tag_result)
                
            except Exception as e:
                error_msg = f"❌ Error processing tag '{tag}': {str(e)}"
                print(error_msg)
                operation["results"].append({
                    "tag": tag,
                    "error": str(e),
                    "timestamp": datetime.now().isoformat()
                })
        
        operation["status"] = "completed"
        print(f"✅ Completed processing {len(all_tags)} tags")

此部分处理核心业务逻辑

  1. 验证:确保我们有要处理的标签和可用的代理
  2. 处理:对于每个标签,为代理创建自然语言提示
  3. 记录:存储所有结果以供监控和调试
  4. 错误处理:优雅地处理单个标签的错误

代理提示经过精心设计,以指示 AI 确切地采取哪些步骤:首先检查当前标签,然后在适当的情况下添加新标签。

5. 健康和监控端点

除了 webhook 处理程序之外,我们还需要用于监控和调试的端点。让我们添加这些基本端点

@app.get("/")
async def root():
    """Root endpoint with basic information"""
    return {
        "name": "HF Tagging Bot",
        "status": "running",
        "description": "Webhook listener for automatic model tagging",
        "endpoints": {
            "webhook": "/webhook",
            "health": "/health",
            "operations": "/operations"
        }
    }

根端点提供有关您的服务及其可用端点的基本信息。

@app.get("/health")
async def health_check():
    """Health check endpoint for monitoring"""
    agent = await get_agent()
    
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "components": {
            "webhook_secret": "configured" if WEBHOOK_SECRET else "missing",
            "hf_token": "configured" if HF_TOKEN else "missing",
            "mcp_agent": "ready" if agent else "not_ready"
        }
    }

健康检查端点验证您的所有组件是否都已正确配置。这对于生产监控至关重要。

@app.get("/operations")
async def get_operations():
    """Get recent tag operations for monitoring"""
    # Return last 50 operations
    recent_ops = tag_operations_store[-50:] if tag_operations_store else []
    return {
        "total_operations": len(tag_operations_store),
        "recent_operations": recent_ops
    }

操作端点让您可以查看最近的 webhook 处理活动,这对于调试和监控非常宝贵。

健康和监控端点对于生产部署至关重要。它们可帮助您快速识别配置问题并监控应用程序活动,而无需深入研究日志。

Hugging Face Hub 上的 Webhook 配置

现在我们的 webhook 监听器已准备就绪,让我们在 Hugging Face Hub 上配置它。在这里,我们将我们的应用程序连接到真实的仓库事件。

1. 在设置中创建 Webhook

遵循 webhook 设置指南

Webhook Settings

导航到您的 Hugging Face 设置 并配置

  1. 目标仓库:指定要监控的仓库
  2. Webhook URL:您的已部署应用程序端点(例如,`https://your-space.hf.space/webhook`)
  3. 密钥:使用与您的 `WEBHOOK_SECRET` 环境变量相同的密钥
  4. 事件:订阅“社区(PR 和讨论)”事件

在为许多仓库配置 webhook 之前,先从一两个测试仓库开始。这使您可以在扩展之前验证您的应用程序是否正常运行。

2. Space URL 配置

对于 Hugging Face Spaces 部署,您需要获取您的直接 URL

Direct URL

过程是

  1. 在您的 Space 设置中单击“嵌入此 Space”
  2. 复制“直接 URL”
  3. 附加 `/webhook` 以创建您的 webhook 端点
  4. 使用此 URL 更新您的 webhook 配置

例如,如果您的 Space URL 是 `https://username-space-name.hf.space`,您的 webhook 端点将是 `https://username-space-name.hf.space/webhook`。

Space URL

测试 Webhook 监听器

在部署到生产环境之前,测试至关重要。让我们通过不同的测试方法进行演练

1. 本地测试

您可以使用一个简单的脚本在本地测试您的 webhook 处理程序

# test_webhook_local.py
import requests
import json

# Test data matching webhook format
test_webhook_data = {
    "event": {
        "action": "create",
        "scope": "discussion.comment"
    },
    "comment": {
        "content": "This model needs tags: pytorch, transformers",
        "author": {"id": "test-user"}
    },
    "discussion": {
        "title": "Missing tags",
        "num": 1
    },
    "repo": {
        "name": "test-user/test-model"
    }
}

# Send test webhook
response = requests.post(
    "https://:8000/webhook",
    json=test_webhook_data,
    headers={"X-Webhook-Secret": "your-test-secret"}
)

print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")

此脚本模拟真实的 webhook 请求,让您无需等待真实事件即可测试您的处理程序。

2. 开发用的模拟端点

您还可以向 FastAPI 应用程序添加一个模拟端点,以便于测试

@app.post("/simulate_webhook")
async def simulate_webhook(
    repo_name: str, 
    discussion_title: str, 
    comment_content: str
) -> str:
    """Simulate webhook for testing purposes"""
    
    # Create mock webhook data
    mock_webhook_data = {
        "event": {
            "action": "create",
            "scope": "discussion.comment"
        },
        "comment": {
            "content": comment_content,
            "author": {"id": "test-user"}
        },
        "discussion": {
            "title": discussion_title,
            "num": 999
        },
        "repo": {
            "name": repo_name
        }
    }
    
    # Process the simulated webhook
    await process_webhook_comment(mock_webhook_data)
    
    return f"Simulated webhook processed for {repo_name}"

此端点使通过应用程序界面测试不同场景变得容易。

模拟端点在开发过程中非常有用。它们让您可以在不创建实际仓库讨论的情况下测试不同的标签组合和边缘情况。

预期的 Webhook 结果

当一切正常时,您应该看到类似讨论机器人示例的结果

Discussion Result

此屏幕截图显示了成功的 webhook 处理,其中机器人响应讨论评论创建了一个拉取请求。

下一步

通过实现 webhook 监听器,我们现在拥有

  1. 安全的 webhook 验证,遵循 Hugging Face 最佳实践
  2. 实时事件处理,带有后台任务处理
  3. MCP 集成,用于智能标签管理
  4. 监控和调试功能

在下一节中,我们将把所有内容集成到一个完整的 Pull Request 代理中,该代理演示从 webhook 到 PR 创建的完整工作流程。

始终快速(在 10 秒内)返回 webhook 响应,以避免超时。对于 MCP 工具执行和拉取请求创建等较长的处理操作,请使用后台任务。

< > 在 GitHub 上更新