MCP 课程文档
MCP 客户端
并获得增强的文档体验
开始使用
MCP 客户端
现在我们有了带有标签工具的 MCP 服务器,我们需要创建一个可以与这些工具交互的客户端。MCP 客户端充当我们的 webhook 处理程序和 MCP 服务器之间的桥梁,使我们的代理能够使用 Hub 标签功能。
出于本项目的目的,我们将构建一个 API 和一个 Gradio 应用程序。API 将用于测试 MCP 服务器和 webhook 监听器,Gradio 应用程序将用于通过模拟的 webhook 事件测试 MCP 客户端。
出于教育目的,我们将在同一个仓库中构建 MCP 服务器和 MCP 客户端。在实际应用中,您可能会为 MCP 服务器和 MCP 客户端分别建立一个仓库。实际上,您可能只构建其中一个组件。
理解 MCP 客户端架构
在我们的应用程序中,MCP 客户端集成到主 FastAPI 应用程序 (app.py
) 中。它创建并管理与 MCP 服务器的连接,为工具执行提供无缝接口。
基于代理的 MCP 客户端
我们使用内置 MCP 支持的 huggingface_hub
Agent 类。这在一个组件中提供了语言模型功能和 MCP 工具集成。
1. 代理配置
让我们从设置代理配置并理解每个组件开始
from huggingface_hub.inference._mcp.agent import Agent
from typing import Optional, Literal
# Configuration
HF_TOKEN = os.getenv("HF_TOKEN")
HF_MODEL = os.getenv("HF_MODEL", "microsoft/DialoGPT-medium")
DEFAULT_PROVIDER: Literal["hf-inference"] = "hf-inference"
# Global agent instance
agent_instance: Optional[Agent] = None
我们从必要的导入和配置开始。全局 agent_instance
变量确保我们只创建一次代理并在多个请求中重用它。这对于性能很重要,因为代理初始化可能很耗时。
现在让我们实现创建和管理代理的函数
async def get_agent():
"""Get or create Agent instance"""
print("🤖 get_agent() called...")
global agent_instance
if agent_instance is None and HF_TOKEN:
print("🔧 Creating new Agent instance...")
print(f"🔑 HF_TOKEN present: {bool(HF_TOKEN)}")
print(f"🤖 Model: {HF_MODEL}")
print(f"🔗 Provider: {DEFAULT_PROVIDER}")
该函数首先检查是否已经存在代理实例。这种单例模式可以防止不必要的重新创建并确保一致的状态。
让我们继续代理创建
try:
agent_instance = Agent(
model=HF_MODEL,
provider=DEFAULT_PROVIDER,
api_key=HF_TOKEN,
servers=[
{
"type": "stdio",
"config": {
"command": "python",
"args": ["mcp_server.py"],
"cwd": ".",
"env": {"HF_TOKEN": HF_TOKEN} if HF_TOKEN else {},
},
}
],
)
print("✅ Agent instance created successfully")
print("🔧 Loading tools...")
await agent_instance.load_tools()
print("✅ Tools loaded successfully")
except Exception as e:
print(f"❌ Error creating/loading agent: {str(e)}")
agent_instance = None
这是重要部分!让我们分解代理配置
代理参数
model
:将对工具使用进行推理的语言模型provider
:如何访问模型(Hugging Face 推理提供者)api_key
:Hugging Face API 密钥
MCP 服务器连接
type: "stdio"
:通过标准输入/输出连接到 MCP 服务器command: "python"
:将我们的 MCP 服务器作为 Python 子进程运行args: ["mcp_server.py"]
:要执行的脚本文件env
:将 HF_TOKEN 传递给服务器进程
stdio
连接类型意味着代理将您的 MCP 服务器作为子进程启动,并通过标准输入/输出与其通信。这非常适合开发和单机部署。
load_tools()
调用至关重要 - 它发现 MCP 服务器提供了哪些工具,并使它们可供代理的推理引擎访问。
这完成了我们带有适当错误处理和日志记录的代理管理函数。
工具发现与使用
一旦创建了代理并加载了工具,它就可以自动发现和使用 MCP 工具。这就是代理方法的真正强大之处。
可用工具
代理自动发现我们的 MCP 工具
get_current_tags(repo_id: str)
- 检索现有存储库标签add_new_tag(repo_id: str, new_tag: str)
- 通过拉取请求添加新标签
代理不仅仅是盲目地调用这些工具——它会根据您给出的提示来推理何时以及如何使用它们。
工具执行示例
代理如何智能地使用工具:
# Example of how the agent would use tools
async def example_tool_usage():
agent = await get_agent()
if agent:
# The agent can reason about which tools to use
response = await agent.run(
"Check the current tags for microsoft/DialoGPT-medium and add the tag 'conversational-ai' if it's not already present"
)
print(response)
请注意,我们如何给代理一个自然语言指令,它会找出
- 首先调用
get_current_tags
查看现有标签 - 检查
conversational-ai
是否已存在 - 如果不存在,则调用
add_new_tag
添加它 - 提供其操作的摘要
这比直接调用工具智能得多!
与 Webhook 处理集成
现在让我们看看 MCP 客户端如何集成到我们的 webhook 处理管道中。这就是一切的结合点。
1. 标签提取和处理
这是处理 webhook 事件并使用我们的 MCP 代理的主要函数
async def process_webhook_comment(webhook_data: Dict[str, Any]):
"""Process webhook to detect and add tags"""
print("🏷️ Starting process_webhook_comment...")
try:
comment_content = webhook_data["comment"]["content"]
discussion_title = webhook_data["discussion"]["title"]
repo_name = webhook_data["repo"]["name"]
# Extract potential tags from the comment and discussion title
comment_tags = extract_tags_from_text(comment_content)
title_tags = extract_tags_from_text(discussion_title)
all_tags = list(set(comment_tags + title_tags))
print(f"🔍 All unique tags: {all_tags}")
if not all_tags:
return ["No recognizable tags found in the discussion."]
第一部分从评论内容和讨论标题中提取并组合标签。我们使用集合来去除任何在两个地方都出现的重复标签。
同时处理评论和讨论标题增加了我们捕获相关标签的机会。用户可能会在标题中提及标签,例如“缺少 pytorch 标签”,或者在评论中提及“这需要 #transformers”。
接下来,我们获取代理并处理每个标签
# Get agent instance
agent = await get_agent()
if not agent:
return ["Error: Agent not configured (missing HF_TOKEN)"]
# Process each tag
result_messages = []
for tag in all_tags:
try:
# Use agent to process the tag
prompt = f"""
For the repository '{repo_name}', check if the tag '{tag}' already exists.
If it doesn't exist, add it via a pull request.
Repository: {repo_name}
Tag to check/add: {tag}
"""
print(f"🤖 Processing tag '{tag}' for repo '{repo_name}'")
response = await agent.run(prompt)
# Parse agent response for success/failure
if "success" in response.lower():
result_messages.append(f"✅ Tag '{tag}' processed successfully")
else:
result_messages.append(f"⚠️ Issue with tag '{tag}': {response}")
except Exception as e:
error_msg = f"❌ Error processing tag '{tag}': {str(e)}"
print(error_msg)
result_messages.append(error_msg)
return result_messages
这里的关键在于,我们为每个标签给代理一个清晰、结构化的提示。然后代理会
- 了解它需要首先检查当前标签
- 与我们要添加的新标签进行比较
- 如果需要,创建拉取请求
- 返回其操作的摘要
这种方法自动处理工具编排的复杂性。
2. 标签提取逻辑
让我们检查一下馈入 MCP 处理的标签提取逻辑
import re
from typing import List
# Recognized ML/AI tags for validation
RECOGNIZED_TAGS = {
"pytorch", "tensorflow", "jax", "transformers", "diffusers",
"text-generation", "text-classification", "question-answering",
"text-to-image", "image-classification", "object-detection",
"fill-mask", "token-classification", "translation", "summarization",
"feature-extraction", "sentence-similarity", "zero-shot-classification",
"image-to-text", "automatic-speech-recognition", "audio-classification",
"voice-activity-detection", "depth-estimation", "image-segmentation",
"video-classification", "reinforcement-learning", "tabular-classification",
"tabular-regression", "time-series-forecasting", "graph-ml", "robotics",
"computer-vision", "nlp", "cv", "multimodal",
}
这份精选的已识别标签列表有助于我们专注于相关的 ML/AI 标签,并避免向存储库添加不适当的标签。
现在是提取函数本身
def extract_tags_from_text(text: str) -> List[str]:
"""Extract potential tags from discussion text"""
text_lower = text.lower()
explicit_tags = []
# Pattern 1: "tag: something" or "tags: something"
tag_pattern = r"tags?:\s*([a-zA-Z0-9-_,\s]+)"
matches = re.findall(tag_pattern, text_lower)
for match in matches:
tags = [tag.strip() for tag in match.split(",")]
explicit_tags.extend(tags)
# Pattern 2: "#hashtag" style
hashtag_pattern = r"#([a-zA-Z0-9-_]+)"
hashtag_matches = re.findall(hashtag_pattern, text_lower)
explicit_tags.extend(hashtag_matches)
# Pattern 3: Look for recognized tags mentioned in natural text
mentioned_tags = []
for tag in RECOGNIZED_TAGS:
if tag in text_lower:
mentioned_tags.append(tag)
# Combine and deduplicate
all_tags = list(set(explicit_tags + mentioned_tags))
# Filter to only include recognized tags or explicitly mentioned ones
valid_tags = []
for tag in all_tags:
if tag in RECOGNIZED_TAGS or tag in explicit_tags:
valid_tags.append(tag)
return valid_tags
此函数使用多种策略提取标签
- 显式模式:“tags: pytorch, transformers” 或 “tag: nlp”
- 话题标签:“#pytorch #nlp”
- 自然提及:“This transformers model does text-generation”
验证步骤确保我们只建议适当的标签,防止添加垃圾邮件或不相关的标签。
性能考量
在构建生产 MCP 客户端时,性能对于保持响应式 webhook 处理至关重要。让我们看看我们做出的一些考虑。
1. 代理单例模式
代理一次创建并重复使用,以避免
- 重复的 MCP 服务器启动开销
- 工具加载延迟
- 连接建立成本
这种模式对于需要快速响应的 webhook 处理程序至关重要。
2. 异步处理
所有 MCP 操作都是异步的,以便
- 并发处理多个 webhook 请求
- 避免阻塞主 FastAPI 线程
- 提供响应迅速的 webhook 响应
异步性质允许您的 webhook 处理程序在后台处理标签时接受新请求。
3. 后台任务处理
FastAPI 内置了 BackgroundTasks
类,可用于在后台运行任务。这对于运行长时间运行的任务而不会阻塞主线程非常有用。
from fastapi import BackgroundTasks
@app.post("/webhook")
async def webhook_handler(request: Request, background_tasks: BackgroundTasks):
"""Handle webhook and process in background"""
# Validate webhook quickly
if request.headers.get("X-Webhook-Secret") != WEBHOOK_SECRET:
return {"error": "Invalid secret"}
webhook_data = await request.json()
# Process in background to return quickly
background_tasks.add_task(process_webhook_comment, webhook_data)
return {"status": "accepted"}
此模式可确保 webhook 响应速度快(不到 1 秒),同时允许在后台进行复杂的标签处理。
Webhook 端点应在 10 秒内响应,否则平台可能会将其视为超时。使用后台任务可确保您始终能够快速响应,同时异步处理复杂任务。
后续步骤
实现了 MCP 客户端后,我们现在可以
- 实现 Webhook 监听器 - 创建接收 Hub 事件的 FastAPI 端点
- 集成所有组件 - 将 webhooks、客户端和服务器连接成一个完整的系统
- 添加测试接口 - 创建用于开发和监控的 Gradio 接口
- 部署和测试 - 在生产环境中验证完整系统
在下一节中,我们将实现 webhook 监听器,它将触发我们的 MCP 支持的标签代理。
huggingface_hub
中的 Agent 类同时提供了 MCP 工具集成和语言模型推理,使其非常适合构建像我们的 PR 代理这样的智能自动化工作流。