Agentic 任务委托——让智能体再次完整
想象一下,你让你的 AI 智能体规划你的假期,但当你提到需要预订航班、酒店和当地景点列表时,它却卡住了。令人沮丧,不是吗?欢迎来到单一任务 AI 智能体的世界——它们样样涉猎,却一无所精。
当前的 AI 系统虽然强大,但由于工具集成薄弱、成本高昂和带宽不足等诸多问题,它们无法成为出色的可操作智能体。我们 @ Capx AI 探索了由去中心化基础设施提供支持的多智能体生态系统,以将可操作任务卸载到网络中的其他智能体服务。
单一智能体系统的局限性
概览:
带宽:同时处理多个任务的能力有限。
处理能力:处理各种任务时的计算限制。
内存限制:难以处理不同任务的上下文切换。
任务复杂性:难以应对需要多种技能的复杂多步任务。
带宽是指计算机上一个进程可以使用的计算资源。在 AI 智能体系统的情况下,带宽是指智能体处理和通信信息的能力。当 AI 被要求同时处理多个复杂任务时,这就像试图将过多的数据挤过狭窄的管道。智能体的通信通道会变得拥堵,导致响应时间变慢,并可能丢失或损坏信息。这种瓶颈会严重损害智能体在多个任务中处理输入和生成连贯输出的能力。
处理能力是另一个关键限制。虽然现代 AI 系统拥有令人印象深刻的计算能力,但它们仍然有上限。在单个系统上处理不同任务是计算密集型的,因为每个任务可能需要不同的算法、模型或处理技术。这就像要求一位厨师同时准备五道菜的餐点、平衡餐厅账目并管理顾客预订一样。
内存限制进一步加剧了这些问题。AI 智能体,特别是那些处理语言和上下文密集型任务的智能体,需要处理大量信息。每个任务都需要自己的上下文数据、相关知识和操作参数集。当一个智能体尝试处理多个不同的任务时,它被迫在不同的内存上下文之间快速切换。这种不断的上下文切换可能导致信息丢失、任务之间的混淆,或者无法为任何单一任务维护长期上下文。
任务复杂性:当面对复杂的、多步骤的任务时,单一任务 AI 智能体的局限性变得非常明显。一个单一任务 AI 可能擅长某个方面,例如数据分析,但却难以处理书面部分所需的细致语言。智能体需要不断地切换任务,这可能会降低效率和质量。
任务委托
智能体任务委托代表了我们处理 AI 问题解决方式的范式转变。其核心是将复杂的、多方面的任务分解成更小、更易于管理的组件,并将其分配给运行专用智能体的主机网络,每个主机在完成任务时都会获得奖励。
这个概念基于智能任务**分解和分发**的原则运作。当引入一个复杂任务时,系统首先对其进行分析并将其分解为更小、更易于管理的子任务。然后,每个子任务被分配给最适合该特定类型工作的专用智能体。例如,像“抓取 StackOverflow”这样的任务可能被分解为导航网站、搜索相关主题、提取信息和格式化数据。类似地,“在 HN 上撰写评论”可能涉及分析帖子、研究主题、形成观点、起草评论和发布。在“对文档进行 OCR”的情况下,过程可能包括预处理图像、应用 OCR 算法、后处理提取的文本和验证结果。
我们提出了一种架构
去中心化智能体网络
智能体任务委托系统在去中心化网络上运行,利用区块链技术确保透明度、安全性和高效协调。网络中的每个节点都代表一个专用智能体或服务,能够执行特定任务。区块链充当分布式账本,记录任务分配、完成和奖励。“任务委托合约”部署在链上,以管理智能体之间的交互,托管任务委托的逻辑,并与持有奖励的“奖励池”托管合约通信,奖励将根据任务分配和任务复杂性分配给智能体节点。
这从而带来了链上可验证性、透明度,并确保了系统中的公平奖励分配。
分布式边缘计算
所提出的多智能体任务委托框架的关键创新领域之一是引入了“分布式边缘计算”,实现了复杂任务在多个节点(边缘设备)上的并行处理。当一个任务进入系统时,控制平面将其分解为子任务,并将其分发到网络中最合适的智能体。每个智能体独立处理其分配的子任务,利用其在独立计算节点上的专门能力。
这种并行处理显著缩短了整体计算时间,并允许处理资源密集型任务,而这些任务对于单个智能体来说是不切实际的。任务委托合约与消息队列一起采用先进的负载均衡算法,以优化网络中的资源利用率,确保即使在不同工作负载下也能高效完成任务。
组件概览(TLDR;)
在该系统中,一个中央控制平面充当调度器,分析传入任务,确定最佳分解方式,然后将子任务分配给其网络中最合适的专用智能体。这些专用智能体,每个都擅长处理特定类型的任务,并行工作以完成其分配的部分。中央协调器随后聚合结果,确保最终输出的一致性。
消息队列
消息队列充当所有服务和控制平面的中央通信枢纽。它提供
- 向命名队列发布消息的方法
- 将消息委托给适当消费者的功能
消息队列核心操作
Initialize consumers and queues dictionaries
procedure Publish(message):
queues[message.type].append(message)
// Add the message to the appropriate queue
procedure ProcessingLoop():
while running do
for each non-empty queue do
PublishToConsumer(queue.dequeue())
// Dequeue and publish messages to consumers
end for
end while
procedure LaunchServer():
Start ProcessingLoop and run FastAPI server
// Initialize and run the server
控制平面
控制平面充当 Llama-Agents 系统的主要网关。其职责包括
- 跟踪当前任务
- 维护系统内服务的注册表
- 包含调度器模块
控制平面处理循环
procedure ControlPlaneProcessingLoop():
while true do
message ← CheckMessageQueue()
// Check for new messages in the queue
if message is NewTask then
HandleNewTask(message)
else if message is GeneralChat then
HandleGeneralChat(message)
else if message is CompletedTask then
HandleCompletedTask(message)
end if
UpdateSystemState()
Sleep(shortInterval)
// Process different types of messages and update system state
end while
procedure HandleNewTask(task):
RegisterTask(task)
service ← SelectAppropriateService(task)
SendTaskToService(task, service)
// Register and delegate new tasks to appropriate services
procedure HandleGeneralChat(chat):
ProcessChatMessage(chat)
UpdateUserInterface(chat)
// Process general chat messages and update UI
procedure HandleCompletedTask(result):
UpdateTaskState(result)
NotifyTaskCompletion(result)
if FollowUpTaskRequired(result) then
CreateFollowUpTask(result)
end if
// Handle completed tasks and create follow-ups if needed
调度器
该模块管理任务分配和结果处理。它可以实现为
- 利用 LLM 进行决策的智能体系统
- 具有预定义查询管道的显式系统
- 结合两种方法的混合方法
- 根据特定要求定制的自定义实现
基本调度器
class BaseOrchestrator:
abstract procedure get_next_messages(task_def, tools, state):
// Abstract method to determine next messages to process
// Returns: (List of QueueMessages, Updated state)
abstract procedure add_result_to_state(result, state):
// Abstract method to update state with processing results
// Returns: Updated state
智能体调度器
class AgentOrchestrator(BaseOrchestrator):
Initialize with LLM, prompts, and finalize tool
procedure get_next_messages(task_def, tools, state):
chat_history ← state.get(HISTORY_KEY, [])
memory ← ChatMemoryBuffer(chat_history, llm)
if chat_history is empty then
response ← llm.predict_and_call(tools, task_def.input)
else
response ← llm.predict_and_call(tools + finalize_tool, memory.get())
if response has no tool call or calls finalize tool then
Create TaskResult and QueueMessage for human
else
Create QueueMessages for each tool call
Update state with new chat history
return queue_messages, new_state
procedure add_result_to_state(result, state):
Summarize result if necessary
chat_history ← state.get(HISTORY_KEY, [])
Append summary to chat_history
Append followup prompt to chat_history
return updated state
服务
服务是任务处理发生的核心操作单元。每个服务
- 接受带有相关上下文的传入任务
- 根据其专业功能处理任务
- 发布结果输出
基本服务算法
class BaseService:
Initialize with service_name
abstract property service_definition():
// Returns ServiceDefinition
abstract procedure as_consumer(remote: bool):
// Returns BaseMessageQueueConsumer
abstract procedure processing_loop():
// Continuous processing of messages
abstract procedure process_message(message):
// Process a single message
abstract procedure launch_local():
// Launch service in-process
return asyncio.Task
abstract procedure launch_server():
// Launch service as a server
procedure register_to_control_plane(control_plane_url):
service_def = self.service_definition
Send POST request to f"{control_plane_url}/services/register"
with service_def as JSON payload
procedure register_to_message_queue():
return message_queue.register_consumer(self.as_consumer(remote=True))
工具服务
一种专门用于卸载智能体工具计算的服务。它允许智能体配备一个与工具服务接口的元工具,从而提高模块化和效率。
服务即工具算法
class ServiceAsTool:
// ServiceAsTool wraps a service and presents it as a tool for use in LLM systems
Initialize with tool_metadata, message_queue, service_name
// tool_metadata: describes the tool's function and parameters
// message_queue: for async communication with the service
// service_name: unique identifier for the underlying service
procedure FromServiceDefinition(service_definition):
// Factory method to create a ServiceAsTool from a service definition
Create tool_metadata from service_definition
return new ServiceAsTool instance
procedure ProcessMessage(message):
// Handles incoming messages from the message queue
if message is COMPLETED_TOOL_CALL:
Store result in tool_call_results
// Keeps track of completed tool calls for later retrieval
procedure AsyncCall(input):
// Asynchronous method to execute the tool
Register as consumer if not already registered
// Ensures the tool can receive messages from the queue
Create new task from input
Publish NEW_TOOL_CALL message to service
// Sends the task to the underlying service via the message queue
while result not received and not timed out:
Wait for short interval
Check for result in tool_call_results
// Polls for the result, implementing async behavior
if result received:
return result as ToolOutput
else:
return timeout error
// Handles both successful calls and timeouts
procedure Call(input):
// Synchronous wrapper for AsyncCall
return synchronous version of AsyncCall(input)
// Allows the tool to be used in synchronous contexts
多智能体系统的优势:
我们的智能体任务委托方法让用户能够无与伦比地利用休眠计算能力并加速任务!通过将大型任务分解为可管理的小块并动态适应任务需求,系统可以解决具有相互依赖组件的复杂问题,从而使整个系统更加健壮、强大和高效。
结论
智能体任务委托代表了 AI 系统解决复杂问题方式的范式转变。通过分解任务并利用专用智能体,与传统的单一 AI 方法相比,这种方法提供了更高的效率、适应性和可伸缩性。展望未来,我们可以预期这种方法将与先进的 AI 模型更深度地集成,这可能导致在不同领域实现更复杂的人工智能协作。