Agent2Agent 和 MCP:一个完整 Agentic Pipeline 的端到端教程

社区文章 发布于 2025 年 4 月 29 日

image/png

引言

在快速发展的代理人工智能世界中,两个开放协议悄然解决了曾困扰多代理项目无法走出实验室的难题。

几个月前,Anthropic 推出了 模型上下文协议 (MCP):一旦对话开始,代理即可可靠地访问其所需的数据和工具。Anthropic 将 MCP 描述为语言模型的 USB-C 接口——一个单一的、定义明确的连接器,允许您将相同的模型插入 GitHub、Postgres 数据库或自定义知识库,而无需每次都重写集成。通过标准化主机、客户端和服务器之间交换“工具”、“资源”和“提示”的方式,MCP 将上下文变为模型可以信赖的东西,而不是开发人员每次都临时拼凑的东西。

Agent2Agent (A2A) 是 Google 推出的一个新协议,它解决了另一个障碍:让自主代理(通常基于不同的框架并在不同位置托管)相互理解。A2A 没有采用脆弱的一次性桥梁,而是为每个代理提供一张小小的“代理卡”,宣传其技能,并提供一个 HTTP 接口,允许其他代理协商任务、流式传输中间结果和移交工件。Google 启动该项目是为了让代理拥有通用的语言,无论供应商如何,开源规范已经展示了发现、任务生命周期更新和安全推送通知如何开箱即用。

通过 A2A 处理代理之间的通信,MCP 处理代理如何访问外部世界,您最终将拥有小型、专注的代理,它们可以流畅地协作并仍然看到大局——这种架构感觉不像脚本的集合,而更像一个协作的劳动力。

image/png

本教程的结构

本教程将使用 Agent2Agent 和 MCP 构建一个完整的代理流水线。首先,我们将创建并测试几个简单的 MCP 服务器。然后,我们将创建一个使用 MCP 获取信息的简单代理。最后,我们将拥有一个完整的代理组,它们使用 Agent2Agent 相互协调,同时使用 MCP 获取信息。

为了简单起见,我们将使用仅能访问单个 MCP 的基本代理,以及执行简单操作(如从 API 获取数据或搜索网络)的 MCP。

我们设想我们要创建一个能够制作关于美国公司简单报告的代理团队;特别是,我们希望能够提出诸如“标准普尔 500 指数中排名前 10 的公司股票价格是多少?”或“美国排名前 5 的木材生产商是哪些?”之类的问题。

关于代码的说明

本教程的代码可在 GitHub 上找到。您应该克隆存储库,在本地运行代码,并进行尝试甚至修改。本教程由以下两个来源创建:

MCP 文档很棒,但 Google Agent2Agent 存储库存在多个问题,并且未按预期工作。因此,我大量修改了提供的代码。

如何运行

uv by Astral 被用作 Python 包和项目管理器。您可以克隆仓库,运行 uv sync,然后运行您需要的任何东西。

MCP 服务器

鉴于我们的目标,我们首先定义两个服务

  • 一个能够谷歌搜索和阅读网页的爬虫
  • 一个股票检索器,能够获取特定时间的股票价格和其他有用信息

股票检索服务

我们使用 FinHub API 定义了一个简单的股票检索服务。检索器将通过一个端点返回股票的当前价格、最低价、最高开盘价和收盘价(重要提示:如果使用免费版 API,则只能查询美国市场),通过另一个端点返回股票代码。

class FinHubService:
    def __init__(self):
        """
 Initializes the finhubservice.
 """
        self.client = finnhub.Client(api_key=os.getenv("FINNHUB_API_KEY"))

    def get_symbol_from_query(self, query: str) -> Dict[str,Any]:
        """
 Given a query (e.g. name of a company) returns a dictionary with info. Use only if you have no idea about the symbol.
 :param query: name of company
 :return: dictionary with the response to the query
 """

        return self.client.symbol_lookup(
            query=query,
 )

    def get_price_of_stock(self, symbol: str) -> Dict[str,Any]:
        """
 Given the symbol of a certain strock, returns the live info about it.
 :param symbol: The symbol of a stock, e.g. AAPL
 :return: a dictionary containing the current_price, change, %change, day high, low, opening and previous closing price
 """
 resp = self.client.quote(symbol)
        return {
             'current_price': resp['c'],
             'change': resp['d'],
             'percentage_change': resp['dp'],
             'day_high': resp['h'],
             'day_low': resp['l'],
             'day_open_price': resp['o'],
             'previous_close_price': resp['pc'],
 }

如果我们运行以下代码

 service = FinHubService()
    print(service.get_price_of_stock("AAPL"))

我们会得到类似这样的结果

 {'current_price': 199.74, 'change': 6.58, 'percentage_change': 3.4065, 'day_high': 201.59, 'day_low': 195.97, 'day_open_price': 196.12, 'previous_close_price': 193.16}

爬虫服务

现在我们有了检索器,我们想要一个非常简单的爬虫来在 Google 上搜索信息并阅读返回的网页结果。

    class SerperDevService:
    def __init__(self):
        self.__api_key__=os.getenv("SERPER_DEV_API_KEY")
        self.search_url = "https://google.serper.dev/search"
        self.scraper_url = "https://scrape.serper.dev"

    def search_google(
            self,
            query: str,
            n_results: int = 10,
            page: int = 1,
 ) -> List[Dict[str, Any]]:
        """
 Search Google using the Serper.dev API.
 :param query: the query to search on google
 :param n_results: number of results to return per page
 :param page: page number to return
 :return: a list of dictionaries containing the search results
 """
 payload = json.dumps(
 {
                "q": query,
                "num": n_results,
                "page": page
 },
 )
 headers = {
            'X-API-KEY': self.__api_key__,
            'Content-Type': 'application/json'
 }

 response = requests.request(
            method="POST",
            url=self.search_url,
            headers=headers,
            data=payload,
 )

        return response.json()['organic']


    def get_text_from_page(self, url_to_scrape: str) -> str:
        """
 Get text from a page using the Serper.dev API.
 :param url_to_scrape: the url of the page to scrape
 :return: the text content of the page
 """
 payload = json.dumps(
 {
                "url": url_to_scrape,
 }
 )
 headers = {
            'X-API-KEY': self.__api_key__,
            'Content-Type': 'application/json'
 }

 response = requests.request(
            method="POST",
            url=self.scraper_url,
            headers=headers,
            data=payload,
 )

        return response.text

从服务到服务器

现在是时候将这两个服务转换为 MCP 服务器了。鉴于本教程中代理的功能有限,我们可以创建一个单一的 MCP 服务器,提供代理所需的所有工具。尽管如此,这里的目标不是提供最佳的生产就绪解决方案,恰恰相反,而是为了实验:因此,我们将创建两个服务器,每个服务一个。

在模型上下文协议中,服务器有两种类型,通过其传输层区分。STDIO MCP 服务器作为本地子进程运行,并通过其 stdin/stdout 流传输 JSON-RPC 消息,提供最小延迟、全双工消息传递和零网络依赖:它非常适合命令行工具或同机集成。服务器发送事件 (SSE) MCP 服务器则公开一个 HTTP 端点:客户端通过轻量级 POST 发送请求,而服务器在单个 SSE 流上将结果推送回客户端,使其自然地适用于 Web,并且可以在网络或从浏览器访问。实际上,当所有内容都位于一台主机上时,stdio 是精简、无附加功能的选项,而 SSE 则牺牲了一点 HTTP 开销和单向流语义,以换取防火墙遍历、浏览器兼容性和远程可达性。

鉴于我们的用例,使用第一种解决方案似乎很自然(但我们会发现它并非那么简单)。

mcp = FastMCP("Search Engine Server")

search_service = SerperDevService()

@mcp.tool()
def search_google(
    query: str,
    n_results: int = 10,
    page: int = 1,
) -> list:
    """
 Search Google using the Serper.dev API.
 :param query: the query to search on google
 :param n_results: number of results to return per page
 :param page: page number to return
 :return: a list of dictionaries containing the search results
 """
    return search_service.search_google(query, n_results, page)

@mcp.tool()
def get_text_from_page(url_to_scrape: str) -> str:
    """
 Get text from a page using the Serper.dev API.
 :param url_to_scrape: the url of the page to scrape
 :return: the text content of the page
 """
    return search_service.get_text_from_page(url_to_scrape)

if __name__ == "__main__":
 mcp.run(transport='stdio')

代码很简单(而且与 FastAPI 非常相似,鉴于它用于实现此 MCP 库,这并不奇怪)。

我们首先使用 mcp = FastMCP("<服务器名称>") 初始化 MCP 服务器,然后,使用装饰器 @mcp.tool() 定义端点/工具。

我们以同样的方式定义股票爬虫 MCP 服务器。

让我们使用 MCP 服务器

所以,至此我们已经有了可用的 MCP 服务器,现在只需使用它。鉴于本教程也涉及 A2A 框架,并且 A2A 由 Google 创建,我们将使用新的 Google ADK 来创建我们的代理。首先,我们创建一个简单的代理,它使用我们的 MCP 服务器来搜索网络信息。

我们首先创建一个函数,该函数将启动我们的 MCP 服务器并将其“转换”为 ADK 代理的工具。

async def get_tools_async():
  """Gets tools from the Search MCP Server."""
  print("Attempting to connect to MCP Filesystem server...")
  tools, exit_stack = await MCPToolset.from_server(
      connection_params=StdioServerParameters(
          command="/opt/homebrew/bin/uv", # on macos you need to use this path
          args=[
              "--directory",
              "/root/path/to/mcp_server",
              "run",
              "search_server.py"
          ],
          env={
              "PYTHONPATH": <YOUR_PYTHONPATH_IF_NEEDED>
          },
      )
  )
  print("MCP Toolset created successfully.")
  return tools, exit_stack

此函数将启动我们的 MCP 服务器并返回工具和退出堆栈。

现在,我们可以以类似的方式创建我们的代理。

async def get_agent_async():
  """Creates an ADK Agent equipped with tools from the MCP Server."""
  tools, exit_stack = await get_tools_async()
  print(f"Fetched {len(tools)} tools from MCP server.")
  root_agent = LlmAgent(
      model='gemini-2.5-pro-exp-03-25',
      name='search_agent',
      description="Agent to answer questions using Google Search.",
      instruction="You are an expert researcher. When someone asks you something you always double check online. You always stick to the facts.",
      tools=tools, # 
  )
  return root_agent, exit_stack

此函数将创建一个配备有 MCP 服务器工具的 ADK 代理。

现在,我们可以将所有内容整合起来,创建我们的代理管道。


async def async_main():
    session_service = InMemorySessionService()
    artifacts_service = InMemoryArtifactService()
    print("Creating session...")
    session = session_service.create_session(
        state={}, app_name='mcp_search_app', user_id='searcher_usr', session_id='searcher_session'
    )
    print(f"Session created with ID: {session.id}")

    query = "What are the most tipical sports of the Aosta Valley? Answer with a lot of details."
    print(f"User Query: '{query}'")
    content = types.Content(role='user', parts=[types.Part(text=query)])
    root_agent, exit_stack = await get_agent_async()

    runner = Runner(
        app_name='mcp_search_app',
        agent=root_agent,
        artifact_service=artifacts_service,
        session_service=session_service,
    )

    print("Running agent...")
    events_async = runner.run_async(
        session_id=session.id, user_id=session.user_id, new_message=content
    )

    async for event in events_async:
        if event.is_final_response():
            if event.content and event.content.parts:
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate:  # Handle potential errors/escalations
                final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
            print(f"############# Final Response #############\n\n{final_response_text}")
            break


    print("Closing MCP server connection...")
    await exit_stack.aclose()
    print("Cleanup complete.")

此功能将创建一个会话,运行代理,并打印最终响应(如果您对奥斯塔山谷的问题感兴趣,请参见下文)。

如果您对奥斯塔山谷问题的答案感兴趣,请点击此处好的,以下是奥斯塔山谷特有的传统体育项目,并附有详细的比赛说明

奥斯塔山谷拥有几种独特的传统体育项目,深深植根于其乡村历史和文化,主要在春季和秋季进行。主要项目有 TsanRebattaFioletPalet。此外,Morra 游戏也是一种流行的传统消遣。

  • Tsan:Tsan(在当地 dialect 中意为“田地”)通常被模糊地比作棒球或罗马尼亚的 Oina,它是一种在大型草地上进行的团队运动,理想情况下至少有 135 米长。两队各 12 名球员进行比赛。游戏涉及两个主要阶段。在第一阶段('battià' 或 'tsachà'),一方的球员轮流击打一个小球('tsan'),该球放置在长木或塑料杆('percha' 或 'pertse')的凹槽('t aspot')中,该杆倾斜地放置在支架('bes')上。击球手使用木棍('baquet')击打 'percha' 的末端,将 'tsan' 弹射到空中,然后用 'baquet' 击打空中的 'tsan',将其打向远方。对方球队则散布在场地上,试图在 'tsan' 落地之前将其截住,主要使用木制球拍('pilon' 或 'boquet'),甚至可以抛出球拍来阻挡球。如果击球落在指定场地边界内且未被截住,则视为“好球”('bon')。击球手继续击球,直到其击球被截住,或连续三次或总共四次将 'tsan' 击出界外。每位击球手积累的“好球”数量将计入团队总分。当第一队所有球员都击球完毕后,两队交换角色。在第二阶段('servià' 或 'paletà'),每位球员必须将其积累的“好球”转化为距离。对方球队的一名球员将 'tsan' 高高抛向原始击球区。轮到击球的球员试图用不同的木制球拍('paleta' 或 'piota')击打这个被抛出的 'tsan',目标是尽可能远的距离。此击球的距离以米为单位测量。最后,将每队所有球员达到的总距离相加。累积领先对手至少 40 米的球队赢得比赛;如果差距小于 40 米,则为平局(尽管在决赛或附加赛中,领先一米就足以获胜)。由于木制 'tsan' 坚硬,球员通常会佩戴头盔以确保安全。

  • Rebatta:这项运动属于球棒类运动,如高尔夫或棒球。它涉及将一个小球(“rebatta”,由木头制成,镶有钉子或金属,直径约 30 毫米)尽可能远地击出。为此,球员使用两种特殊的木制工具。首先是“fioletta”,一根约 20 厘米长的管状杠杆。“rebatta”放置在“fioletta”略微凹陷的一端,该端放在地上。然后,球员用一根长木棍(“masetta”,100-140 厘米长,带有一个独特的头部,称为“maciocca”)击打“fioletta”的另一端,使“fioletta”充当杠杆,将“rebatta”弹出空中(此动作称为“levoù”)。紧接着,球员再次挥动“masetta”,用力击打空中的“rebatta”,将其击向远方。比赛场地通常是草地上划出的等腰三角形,顶点位于击球点(“place”)。长度因类别而异,但最长可达 240 米。场地上每 15 米划一条线,将“rebatta”击过一条线即可获得相应的分数(通过第一条线得 1 分,第二条线得 2 分,依此类推)。比赛通常在团队之间进行(例如,每队五名球员),在设定的轮次(“bars”)中竞争,并举行团队和个人锦标赛。

  • Fiolet:与 Rebatta 的目标相似(将物体击出最大距离),Fiolet 使用略有不同的设备和技术。击打的物体是“fiolet”本身,它呈卵形,有一部分略微扁平。与 Rebatta 使用单独的杠杆“fioletta”不同,“fiolet”直接平衡在光滑的石头(“pira”)或专门制造的支架上。玩家使用球棒(也常称为“masetta”或“baton”)巧妙地击打平衡的“fiolet”的边缘。这种最初的冲击使“fiolet”垂直跳到空中。然后,玩家用球棒完整挥杆,击打空中的“fiolet”,并将其尽可能远地击向场地,可能超过 150 米。与 Rebatta 一样,得分基于达到的距离,通常以区域或与设定距离相对应的点数来衡量。比赛包括团队赛(通常每队 5 名球员)和一项著名的个人锦标赛,获胜者将获得当年的“Baton d'Or”(金球棒)奖杯。

  • Palet:这项运动类似于马蹄铁或法国的“jeu de palets”。玩家将沉重、圆形、扁平的金属盘(称为“palets”)抛向一个较小的目标钉,称为“bolin”或“billet”,目标钉放置在一定距离(通常约为 10-20 米)的指定场地(通常由粘土或压实土制成)上。目标是将您的“palet”尽可能靠近“bolin”。玩家通常每轮抛掷固定数量的“palets”。得分在所有“palets”在一轮中都抛掷完毕后进行。得分授予“palet(s)”最靠近“bolin”的玩家或团队。通常,只有比对手最靠近的“palet”更近的“palet”才能得分。它要求在抛掷沉重的盘子时具有精确性和策略性。它既可以单独进行,也可以团队进行,通常在山谷中随处可见的专用场地进行。

  • Morra (或 Moura):虽然比需要专用场地的体育运动更简单,并且通常被认为是游戏而非运动,但 Morra 是奥斯塔山谷非常传统且生动的消遣方式。它通常在两个人之间进行。双方玩家同时伸出一只手,显示 1 到 5 根手指(或代表零的握紧拳头)。在伸出手的同一瞬间,每位玩家大声喊出他们猜测的双方玩家显示手指的总和(因此猜测将在 0 到 10 之间)。正确猜出总和的玩家得分。游戏节奏快,通常以固定分数进行回合制,并以其生动的喊叫和手势而闻名。

多个本地 MCP 服务器:STDIO 不足时

现在,我们有一个可供代理使用的单个 MCP 服务器。然而,特别是如果我们打算与多个代理一起使用,我们希望拥有多个 MCP 服务器,每个服务器都有特定的用途。例如,我们希望一个 MCP 服务器能够搜索网络,另一个能够检索股票,等等。

如果您尝试启动多个基于 STDIO 的 MCP 服务器,您会发现它们无法同时工作。为了解决这个问题,我们可以使用一些技巧来使用基于 SSE 的 MCP 服务器。

这很简单:我们像以前一样声明“端点”,

@mcp.tool()
def search_google(
    query: str,
    n_results: int = 10,
    page: int = 1,
) -> list:
    """
    Search Google using the Serper.dev API.
    :param query: the query to search on google
    :param n_results: number of results to return per page
    :param page: page number to return
    :return: a list of dictionaries containing the search results
    """
    return search_service.search_google(query, n_results, page)

@mcp.tool()
def get_text_from_page(url_to_scrape: str) -> str:
    """
    Get text from a page using the Serper.dev API.
    :param url_to_scrape: the url of the page to scrape
    :return: the text content of the page
    """
    return search_service.get_text_from_page(url_to_scrape)

然后,我们将服务器封装在 Startlette 应用程序中

def create_starlette_app(
        mcp_server: Server,
        *,
        debug: bool = False,
) -> Starlette:
    """
    Create a Starlette application that can server the provied mcp server with SSE.
    :param mcp_server: the mcp server to serve
    :param debug: whether to enable debug mode
    :return: a Starlette application
    """

    sse = SseServerTransport("/messages/")

    async def handle_sse(request: Request) -> None:
        async with sse.connect_sse(
                request.scope,
                request.receive,
                request._send,
        ) as (read_stream, write_stream):
            await mcp_server.run(
                read_stream,
                write_stream,
                mcp_server.create_initialization_options(),
            )

    return Starlette(
        debug=debug,
        routes=[
            Route("/sse", endpoint=handle_sse),
            Mount("/messages/", app=sse.handle_post_message),
        ],
    )

然后,我们可以通过不同的端口运行每个服务器,如下所示:

mcp_server = mcp._mcp_server  # noqa: WPS437
parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=8080, help='Port to listen on')
args = parser.parse_args()
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)

要将此类服务器作为工具传递给 ADK 代理,我们需要类似以下代码:

async def return_sse_mcp_tools_search():
    print("Attempting to connect to MCP server for search and page read...")
    server_params = SseServerParams(
        url="https://:<CHOSEN_PORT>/sse",
    )
    tools, exit_stack = await MCPToolset.from_server(connection_params=server_params)
    print("MCP Toolset created successfully.")
    return tools, exit_stack

多个代理

现在是时候创建更复杂的包含多个代理的流水线了。首先,我们将使用 Google 的 ADK 库创建一个分层代理组,然后,在确认我们的解决方案有效后,我们将使用 Agent2Agent 框架来创建能够相互协调的对等代理。

分层代理组

现在我们已经设置好 MCP 服务器,我们可以使用 ADK 库创建分层代理组。在此示例中,我们将创建一个能够分析公司及其股票的代理团队,其中一个主协调代理将任务委托给专门的子代理。

设置代理组

首先,我们为应用程序定义一些常量

MODEL = 'gemini-2.5-pro-exp-03-25'
APP_NAME = 'company_analysis_app'
USER_ID = 'searcher_usr'
SESSION_ID = 'searcher_session'

这些常量定义了我们将使用的模型(Gemini 2.5 Pro)、应用程序名称和会话标识符。

创建专业代理

我们创建了两个专门的代理,每个代理都有自己的一套工具

  1. 股票分析代理:该代理负责分析股票数据并提供有关股票价格和市场表现的见解。
stock_analysis_agent = Agent(
    model=MODEL,
    name="stock_analysis_agent",
    instruction="Analyze stock data and provide insights.",
    description="Handles stock analysis and provides insights, in particular, can get the latest stock price.",
    tools=stocks_tools,
)
  1. 搜索代理:该代理专门从事网络搜索和在线内容阅读。
search_agent = Agent(
    model=MODEL,
    name="search_agent",
    instruction="Expert googler. Can search anything on google and read pages online.",
    description="Handles search queries and can read pages online.",
    tools=search_tools,
)

根代理

根代理充当协调者,根据用户的查询将任务委托给专门的子代理

root_agent = Agent(
    name="company_analysis_assistant",
    model=MODEL,
    description="Main assistant: Handles requests about stocks and information of companies.",
    instruction=(
        "You are the main Assistant coordinating a team. Your primary responsibilities are providing company and stocks reports and delegating other tasks.\n"
        "1. If the user asks about a company, provide a detailed report.\n"
        "2. If you need any information about the current stock price, delegate to the stock_analysis_agent.\n"
        "3. If you need to search for information, delegate to the search_agent.\n"
        "Analyze the user's query and delegate or handle it appropriately. If unsure, ask for clarification. Only use tools or delegate as described."
    ),
    sub_agents=[search_agent, stock_analysis_agent],
    output_key="last_assistant_response",
)

根代理的指令清晰地定义了其角色以及如何将其任务委托给其子代理。它的设计目的是:

  • 提供详细的公司报告
  • 将股票价格查询委托给股票分析代理
  • 将搜索查询委托给搜索代理
  • 在需要时请求澄清

运行代理组

主函数设置会话并运行代理流水线

async def async_main():
    # Initialize services
    session_service = InMemorySessionService()
    artifacts_service = InMemoryArtifactService()
    
    # Create session
    session = session_service.create_session(
        state={},
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=SESSION_ID,
    )

    # Get user query
    query = input("Enter your query:\n")
    content = types.Content(role='user', parts=[types.Part(text=query)])
    
    # Initialize tools from MCP servers
    search_tools, search_exit_stack = await return_sse_mcp_tools_search()
    stocks_tools, stocks_exit_stack = await return_sse_mcp_tools_stocks()

    # Create and run the agent pipeline
    runner = Runner(
        app_name=APP_NAME,
        agent=root_agent,
        artifact_service=artifacts_service,
        session_service=session_service,
    )

    # Process events and get final response
    events_async = runner.run_async(
        session_id=session.id, 
        user_id=session.user_id, 
        new_message=content
    )

    async for event in events_async:
        if event.is_final_response():
            if event.content and event.content.parts:
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate:
                final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
            print(colored(text=f"############# Final Response #############\n\n{final_response_text}", color='green'))
            break
        else:
            print(event)

    # Cleanup
    await stocks_exit_stack.aclose()
    await search_exit_stack.aclose()

此实现创建了一个分层代理组,其中

  1. 根代理接收用户的查询
  2. 它分析查询并决定是否
    • 直接处理
    • 委托给股票分析代理
    • 委托给搜索代理
  3. 专业代理使用各自的 MCP 工具收集信息
  4. 根代理协调响应并向用户提供最终答案

代理组将自动将任务委托给相应的代理,并提供结合了股市和网络搜索信息的全面响应。

Agent2Agent 代理组

现在,是时候最终使用 Agent2Agent 框架了。正如我们之前所说,A2A 是一个允许代理通过标准化接口以点对点方式相互协调的框架。

所述代理可以使用支持 A2A 接口的任何语言或框架来实现。在本示例中,为简单起见,我们将使用 Google 的 ADK 库来创建代理。

进一步的免责声明:在撰写本文时,我在线找到的 A2A 实现(包括 Google 的实现)都相当原始,因此在此示例中,我大量借鉴并修改了官方存储库中A2A 教程的代码。

主要概念组件

Agent2Agent (A2A) 协议实现了自主 AI 代理之间的无缝交互。以下是其基本构建块:

代理卡:一种标准化元数据文档(通常位于 /.well-known/agent.json),作为代理的数字身份卡。它详细说明了代理的功能、可用技能、服务端点和安全要求,使其他代理能够发现并了解如何与其交互。

A2A 服务器:通过公开 HTTP 端点实现 A2A 协议的代理。它根据协议规范处理传入请求、管理任务执行并维护与客户端的通信。

A2A 客户端:任何使用 A2A 服务的应用程序或代理。它通过向 A2A 服务器的端点发送请求(例如 tasks/sendtasks/sendSubscribe)来启动交互。

任务:A2A 中的主要工作单元。当客户端启动任务时,它会创建一个唯一的对话线程,该线程可以经历各种状态:已提交、正在工作、需要输入、已完成、失败或已取消。每个任务都维护自己的上下文和历史记录。

消息:客户端和代理之间通信的基本单元。消息包含一个角色(“用户”或“代理”),并由一个或多个部分组成,形成对话线程。

部分:消息或工件中的原子内容单元。部分可以是不同类型:

  • 文本部分:用于纯文本内容
  • 文件部分:用于文件数据(内联或通过 URI 引用)
  • 数据部分:用于结构化 JSON 数据(通常用于表单或结构化响应)

工件:表示代理在任务执行期间生成的任何输出。这可能包括生成的文件、结构化数据或其他资源。与消息一样,工件也由部分组成。

流式传输:对于需要较长处理时间的任务,服务器可以通过 tasks/sendSubscribe 实现流式传输功能。这允许客户端通过服务器发送事件 (SSE) 接收实时更新,包括任务状态更改和新工件。

推送通知:支持此功能的服务器可以通过 webhook URL 主动通知客户端任务更新。客户端使用 tasks/pushNotification/set 配置其通知端点。

典型交互流程

  1. 发现:客户端从服务器的众所周知 URL 检索代理卡,以了解代理的功能和要求。

  2. 启动:客户端通过发送 tasks/sendtasks/sendSubscribe 请求来启动新任务,其中包含初始消息和唯一的任务 ID。

  3. 处理中:

    • 在流模式下:服务器通过 SSE 发送实时更新,包括状态更改和新工件
    • 在非流模式下:服务器同步处理任务并返回最终结果
  4. 交互:如果任务需要额外输入,客户端可以使用相同的任务 ID 发送后续消息。

  5. 完成:任务最终达到最终状态(完成、失败或取消),从而结束交互。

ADKAgent 类

现在我们了解了 A2A 的概念组件,接下来看看如何使用 ADK 库实现它。以下代码展示了如何创建一个既可以作为独立代理,又可以与其他网络中的代理协调的代理。

ADKAgent 类是我们 A2A 实现的基础。它将 Google ADK 框架与 A2A 协议连接起来,允许代理通信和协调任务。以下是其关键组件的概述:

class ADKAgent:
    """An agent that handles stock report requests."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    def __init__(
            self,
            model: str,
            name: str,
            description: str,
            instructions: str,
            tools: List[Any],
            is_host_agent: bool = False,
            remote_agent_addresses: List[str] = None,
            task_callback: TaskUpdateCallback | None = None
    ):
        # ... initialization code ...

该类可以配置为两种模式

  1. 独立代理:当 is_host_agent=False 时,它充当具有自己的工具和功能的常规 ADK 代理。
  2. 主机代理:当 is_host_agent=True 时,它成为一个可以向其他代理委派任务的协调器。

主要特点

  1. 远程代理管理:
def register_agent_card(self, card: AgentCard):
    remote_connection = RemoteAgentConnections(card)
    self.remote_agent_connections[card.name] = remote_connection
    self.cards[card.name] = card

此方法允许代理发现并连接到网络中的其他代理。每个代理的功能都在其 AgentCard 中描述,该文件存储在 .well-known/agent.json 文件中。

  1. 任务委托:
async def send_task(
    self,
    agent_name: str,
    message: str,
    tool_context: ToolContext
):
    # ... task delegation code ...

send_task 方法处理核心 A2A 协议交互。它

  • 创建一个带有唯一 ID 的新任务
  • 将任务发送到指定的远程代理
  • 管理任务生命周期(已提交、正在工作、已完成等)
  • 处理来自远程代理的响应和工件
  1. 状态管理:
def check_state(self, context: ReadonlyContext):
    state = context.state
    if ('session_id' in state and
            'session_active' in state and
            state['session_active'] and
            'agent' in state):
        return {"active_agent": f'{state["agent"]}'}
    return {"active_agent": "None"}

代理维护以下状态信息:

  • 活动会话
  • 当前任务
  • 连接的代理
  • 会话上下文
  1. 响应处理:
def convert_parts(parts: list[Part], tool_context: ToolContext)
def convert_part(part: Part, tool_context: ToolContext)

这些方法处理不同响应格式之间的转换,支持

  • 文本响应
  • 结构化数据
  • 文件附件
  • 工件

使用示例

以下是如何使用 ADKAgent 创建一个将任务委托给专业代理的协调代理:

# Create a host agent
host_agent = ADKAgent(
    model="gemini-2.5-pro",
    name="coordinator",
    description="Main coordinator agent",
    instructions="Coordinate tasks between agents",
    tools=[],
    is_host_agent=True,
    remote_agent_addresses=["http://agent1:8080", "http://agent2:8080"]
)

Agent Card 类

在前面的代码片段中,我们看到 AgentCard 对象在许多地方被使用。

AgentCard 类是 Agent2Agent (A2A) 协议的关键组件,它作为描述代理能力和需求的标准化方式。它本质上是代理的数字身份卡,其他代理可以使用它来发现并了解如何与它进行交互。

让我们分解一下 generate_agent_card 函数的关键组件:

  1. 基本代理信息:

    • agent_name:代理的唯一标识符
    • agent_description:代理功能的易读描述
    • agent_url:代理可访问的端点
    • agent_version:代理实现的版本
  2. 功能:

    • can_stream:指示代理是否支持通过服务器发送事件 (SSE) 进行流式响应。
    • can_push_notifications:指定代理是否可以发送推送通知。
    • can_state_transition_history:确定代理是否维护状态转换历史。
  3. 通信设置:

    • authentication:定义与代理交互所需的身份验证方法
    • default_input_modes:列出支持的输入格式(例如,“text”、“json”)
    • default_output_modes:列出支持的输出格式
  4. 技能:

    • skills:一个 AgentSkill 对象列表,描述代理的特定能力

以下是使用 generate_agent_card 函数为股票分析代理创建卡片的示例:

stock_agent_card = generate_agent_card(
    agent_name="stock_analyzer",
    agent_description="Analyzes stock market data and provides insights",
    agent_url="https://:8080",
    agent_version="1.0.0",
    can_stream=True,
    can_push_notifications=True,
    skills=[
        AgentSkill(
            name="stock_analysis",
            description="Analyzes stock prices and market trends",
            input_schema={
                "type": "object",
                "properties": {
                    "symbol": {"type": "string"},
                    "timeframe": {"type": "string"}
                }
            },
            output_schema={
                "type": "object",
                "properties": {
                    "price": {"type": "number"},
                    "trend": {"type": "string"}
                }
            }
        )
    ]
)

生成的代理卡通常在代理 URL 的 /.well-known/agent.json 端点提供。这个标准化位置允许其他代理发现并了解如何与代理交互,而无需事先了解其实现细节。

当另一个代理想要与此代理交互时,它可以

  1. /.well-known/agent.json 获取代理卡片
  2. 验证代理是否支持所需功能
  3. 检查代理是否具备必要的技能
  4. 使用提供的 URL 和身份验证方法建立通信

A2A 服务器

为了使代理“可联系”,我们需要实现 A2A 服务器。服务器负责处理来自其他代理的传入请求并管理通信协议。以下是关键组件的概述:

  1. 服务器初始化:
class A2AServer:
    def __init__(
        self,
        host="0.0.0.0",
        port=5000,
        endpoint="/",
        agent_card: AgentCard = None,
        task_manager: TaskManager = None,
    ):
        self.host = host
        self.port = port
        self.endpoint = endpoint
        self.task_manager = task_manager
        self.agent_card = agent_card
        self.app = Starlette()
        self.app.add_route(self.endpoint, self._process_request, methods=["POST"])
        self.app.add_route(
            "/.well-known/agent.json", self._get_agent_card, methods=["GET"]
        )

服务器使用以下内容初始化:

  • 主机和端口配置
  • 用于处理请求的端点
  • 描述代理功能的代理卡片
  • 用于处理任务执行的任务管理器
  1. 请求处理:
async def _process_request(self, request: Request):
    try:
        body = await request.json()
        json_rpc_request = A2ARequest.validate_python(body)
        if isinstance(json_rpc_request, GetTaskRequest):
            result = await self.task_manager.on_get_task(json_rpc_request)
        elif isinstance(json_rpc_request, SendTaskRequest):
            result = await self.task_manager.on_send_task(json_rpc_request)
        # ... handle other request types ...

服务器处理不同类型的请求

  • 任务创建与管理
  • 流式任务更新
  • 推送通知
  • 任务取消
  1. 异步服务器启动:
async def astart(self):
    if self.agent_card is None:
        raise ValueError("agent_card is not defined")

    if self.task_manager is None:
        raise ValueError("request_handler is not defined")

    config = uvicorn.Config(self.app, host=self.host, port=self.port, loop="asyncio")
    server = uvicorn.Server(config)

    # start in the background
    server_task = asyncio.create_task(server.serve())

    # wait for startup
    while not server.started:
        await asyncio.sleep(0.1)
    print("Server is up – press Ctrl+C to shut it down manually")

    try:
        await server_task
    except KeyboardInterrupt:
        server.should_exit = True
        await server_task

astart 方法是原始 Google 实现中的一个关键修改。原因如下:

  1. 原始实现:
def start(self):
    if self.agent_card is None:
        raise ValueError("agent_card is not defined")
    if self.task_manager is None:
        raise ValueError("request_handler is not defined")
    uvicorn.run(self.app, host=self.host, port=self.port)

原始实现使用同步 start 方法,该方法会阻塞主线程。这带来了问题,因为

  • 它无法集成到现有异步事件循环中
  • 这使得与其它异步组件(如 MCP 工具)的协调变得困难
  1. 新实现:新的 astart 方法
  • 创建带异步事件循环的 Uvicorn 服务器
  • 使用 asyncio.create_task 在后台启动服务器
  • 等待服务器完全启动
  • 处理键盘中断时的优雅关闭
  • 可以集成到现有异步应用程序中

此更改是必要的,因为

  • MCP 工具本质上是异步的
  • 服务器需要能够与其他异步组件并行运行
  • 我们需要对服务器生命周期进行适当的控制
  1. 响应处理:
def _create_response(self, result: Any) -> JSONResponse | EventSourceResponse:
    if isinstance(result, AsyncIterable):
        async def event_generator(result) -> AsyncIterable[dict[str, str]]:
            async for item in result:
                yield {"data": item.model_dump_json(exclude_none=True)}
        return EventSourceResponse(event_generator(result))
    elif isinstance(result, JSONRPCResponse):
        return JSONResponse(result.model_dump(exclude_none=True))

服务器支持两种类型的响应

  • 用于即时结果的常规 JSON 响应
  • 用于流式更新的服务器发送事件 (SSE)

此实现允许 A2A 服务器

  • 处理多个并发请求
  • 支持流式响应
  • 与异步组件集成
  • 提供适当的错误处理
  • 支持优雅关闭

服务器现在可以在异步上下文中像这样使用

async def main():
    server = A2AServer(
        host="0.0.0.0",
        port=5000,
        agent_card=my_agent_card,
        task_manager=my_task_manager
    )
    await server.astart()

这使得将 A2A 服务器与其他异步组件(例如 MCP 工具或其他代理)集成成为可能,同时保持对服务器生命周期的适当控制。

A2A 卡片解析器

A2A 卡片解析器是一个用于解析代理卡片的类。它用于在代理卡片未知时查找代理卡片。代码非常简单。

class A2ACardResolver:
    def __init__(self, base_url, agent_card_path="/.well-known/agent.json"):
        self.base_url = base_url
        self.agent_card_path = agent_card_path.lstrip("/")

    def get_agent_card(self) -> AgentCard:
        with httpx.Client() as client:
            url = re.match(r'(https?://[^/]+)', self.base_url).group(1).rstrip("/")
            response = client.get(url + "/" + self.agent_card_path)
            response.raise_for_status()
            try:
                resp_dict = response.json()
                resp_dict['url'] = self.base_url
                return AgentCard(**resp_dict)
            except json.JSONDecodeError as e:
                raise A2AClientJSONError(str(e)) from e

出于某种原因,谷歌提供的原始示例无法正常工作,因为它在附加卡片路径时“破坏”了 URL。

通用资源、实用工具等

除了这些主要组件,在本教程中我们还使用了许多其他资源,其中大部分与原始教程保持不变:绝大多数可在此处找到。

将所有内容整合在一起

所以,现在我们拥有了将所有内容整合在一起所需的所有组件。

我们创建主机代理

async def run_agent():
    AGENT_NAME = "host_agent"
    AGENT_DESCRIPTION = "An agent orchestrates the decomposition of the user request into tasks that can be performed by the child agents."
    PORT = 12000
    HOST = "0.0.0.0"
    AGENT_URL = f"http://{HOST}:{PORT}"
    AGENT_VERSION = "1.0.0"
    MODEL = 'gemini-2.5-pro-preview-03-25'
    AGENT_SKILLS = [
        AgentSkill(
            id="COORDINATE_AGENT_TASKS",
            name="coordinate_tasks",
            description="coordinate tasks between agents.",
        ),
    ]

    list_urls = [
        "https://:11000/google_search_agent",
        "https://:10000/stock_agent",
    ]

    AGENT_CARD = generate_agent_card(
        agent_name=AGENT_NAME,
        agent_description=AGENT_DESCRIPTION,
        agent_url=AGENT_URL,
        agent_version=AGENT_VERSION,
        can_stream=False,
        can_push_notifications=False,
        can_state_transition_history=True,
        default_input_modes=["text"],
        default_output_modes=["text"],
        skills=AGENT_SKILLS,
    )

    host_agent = ADKAgent(
        model=MODEL,
        name="host_agent",
        description="",
        tools=[],
        instructions="",
        is_host_agent=True,
        remote_agent_addresses=list_urls,
    )

    task_manager = generate_agent_task_manager(
        agent=host_agent,
    )
    server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/host_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
    )
    print(f"Starting {AGENT_NAME} A2A Server on {AGENT_URL}")
    await server.astart()

搜索代理


import asyncio
from typing import List, Any

from dotenv import load_dotenv, find_dotenv
from google.adk import Agent
from google.adk.tools import google_search

from a2a_servers.agent_servers.utils import generate_agent_task_manager, generate_agent_card
from a2a_servers.agents.adk_agent import ADKAgent
from a2a_servers.common.agent_task_manager import AgentTaskManager
from a2a_servers.common.server.server import A2AServer
from a2a_servers.common.types import (
    AgentCard,
    AgentCapabilities,
    AgentSkill,
)
from adk_agents_testing.mcp_tools.mcp_tool_search import return_sse_mcp_tools_search

load_dotenv(find_dotenv())

async def run_agent():
    AGENT_NAME = "google_search_agent"
    AGENT_DESCRIPTION = "An agent that handles search queries and can read pages online."
    HOST = "0.0.0.0"
    PORT = 11000
    AGENT_URL = f"http://{HOST}:{PORT}"
    AGENT_VERSION = "1.0.0"
    MODEL = 'gemini-2.5-pro-preview-03-25'
    AGENT_SKILLS = [
        AgentSkill(
            id="GOOGLE_SEARCH",
            name="google_search",
            description="Handles search queries and can read pages online.",
        ),
    ]

    AGENT_CARD = generate_agent_card(
        agent_name=AGENT_NAME,
        agent_description=AGENT_DESCRIPTION,
        agent_url=AGENT_URL,
        agent_version=AGENT_VERSION,
        can_stream=False,
        can_push_notifications=False,
        can_state_transition_history=True,
        default_input_modes=["text"],
        default_output_modes=["text"],
        skills=AGENT_SKILLS,
    )

    gsearch_tools, g_search_exit_stack = await return_sse_mcp_tools_search()

    google_search_agent = ADKAgent(
        model=MODEL,
        name="google_search_agent",
        description="Handles search queries and can read pages online.",
        tools=gsearch_tools,
        instructions=(
            "You are an expert googler. Can search anything on google and read pages online."
        ),
    )

    task_manager = generate_agent_task_manager(
        agent=google_search_agent,
    )
    server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/google_search_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
    )
    print(f"Starting {AGENT_NAME} A2A Server on {AGENT_URL}")
    await server.astart()


if __name__ == "__main__":
    asyncio.run(
        run_agent()
    )

以及股票代理

async def run_agent():
    AGENT_NAME = "stock_report_agent"
    AGENT_DESCRIPTION = "An agent that provides US stock prices and info."
    PORT = 10000
    HOST = "0.0.0.0"
    AGENT_URL = f"http://{HOST}:{PORT}"
    AGENT_VERSION = "1.0.0"
    MODEL = 'gemini-2.5-pro-preview-03-25'
    AGENT_SKILLS = [
        AgentSkill(
            id="SKILL_STOCK_REPORT",
            name="stock_report",
            description="Provides stock prices and info.",
        ),
    ]

    AGENT_CARD = generate_agent_card(
        agent_name=AGENT_NAME,
        agent_description=AGENT_DESCRIPTION,
        agent_url=AGENT_URL,
        agent_version=AGENT_VERSION,
        can_stream=False,
        can_push_notifications=False,
        can_state_transition_history=True,
        default_input_modes=["text"],
        default_output_modes=["text"],
        skills=AGENT_SKILLS,
    )

    stocks_tools, stocks_exit_stack = await return_sse_mcp_tools_stocks()

    stock_analysis_agent = ADKAgent(
        model=MODEL,
        name="stock_analysis_agent",
        description="Handles stock analysis and provides insights, in particular, can get the latest stock price.",
        tools=stocks_tools,
        instructions=(
            "Analyze stock data and provide insights. You can also get the latest stock price."
            "If the user asks about a company, the stock prices for the said company."
            "If the user asks about a stock, provide the latest stock price and any other relevant information."
            "You can get only the latest stock price for US companies."
        ),
    )

    task_manager = generate_agent_task_manager(
        agent=stock_analysis_agent,
    )
    server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/stock_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
    )
    print(f"Starting {AGENT_NAME} A2A Server on {AGENT_URL}")
    await server.astart()

然后,运行所有程序。

首先是 MCP 服务器

uv run mcp server/sse/search_server.py
uv run mcp server/sse/stocks_server.py

然后是 A2A 服务器

uv run a2a_servers/agent_servers/stock_report_agent_server.py
uv run a2a_servers/agent_servers/google_search_agent_server.py

最后,是主机代理

uv run a2a_servers/host_agent_server.py

然后我们可以联系主机代理(例如,使用 a2a_servers/run_from_local_client.py 脚本)。

在这里您可以看到整个过程的视频

<video controls autoplay src="

">

社区

注册登录 发表评论