番外篇:当 Tool 需要跑 30 秒——为 MCP Runtime 拆出异步骨架

前八篇跑通了一个完整的同步 Agent 运行时。但一旦遇上耗时 Tool——爬网页、跑报告、调用慢 API——整个会话就得卡死。这篇讲怎么用 Spring 事件机制,零侵入地把同步循环拆成 Orchestrator + Worker,让 Agent 能在后台干活的同时继续跟用户交互。

Summary: 前八篇跑通了一个完整的同步 Agent 运行时。但一旦遇上耗时 Tool——爬网页、跑报告、调用慢 API——整个会话就得卡死。这篇讲怎么用 Spring 事件机制,零侵入地把同步循环拆成 Orchestrator + Worker,让 Agent 能在后台干活的同时继续跟用户交互。


系列八篇写完了。从架构总览到生产化收尾,你拿到的是一个能跑、能扛、能 trace、可配置的 MCP Agent 运行时。但它有一个内生的局限:同步

用户发一条消息 → LLM 推理 → 可能调 Tool → 等 Tool 回来 → 继续 LLM 推理 → 输出。全程线性的、阻塞的。LLM 推理很快(秒级),但 Tool 调用不一定。如果你挂了一个爬网页的工具,一个页面花 5 秒,爬三个就是 15 秒——用户对着光标干等,不能打断、不能调整、不能中途说"刚才那个网站别看了"。

这不是代码写得不好,这是架构的锅。单线程循环压不住耗时操作。

解决方案不是引入 A2A

说"上多 Agent",第一反应往往是 A2A 协议——每个 Agent 暴露 Agent Card,通过 HTTP 互相发现、委托任务、流式推送。Google 的 A2A 协议设计得很好,但它是为跨组织、异构框架的 Agent 互操作准备的。你的两个 Agent 跑在同一个 JVM 里,同一个 Spring 上下文里,用同一套 Tool Registry 和 State Manager。

搬 A2A 进来等于拿卡车运一瓶水:Agent Card 的 .well-known/agent.json 端点、Task 六状态机(pending/working/input-required/completed/failed/canceled)、流式推送协商——每一个都是解决"我不认识你"的问题,但你俩在一台机器上,天天见面。

更务实的方案是 Spring 的事件机制ApplicationEventPublisher + @Async + @EventListener。三样东西都是 Spring Boot 自带的,零外部依赖。

拆成两个角色

(改造前)
用户 → [Runtime: LLM ↔ Tool(同步)] → 用户
        所有事情都在一个线程里串行

(改造后)
用户 → [Orchestrator: LLM 推理循环]
              │  耗时 Tool 走异步事件
              ↓
           [Worker: 后台线程池执行 Tool]
              │  完成后事件通知
              ↓
           Orchestrator 拿到结果 → 下一轮 LLM 推理

Orchestrator(协调者):负责 LLM 推理循环、用户连接管理。当 LLM 决定调一个 Tool 时,检查它是不是标记为长时间运行。如果是,不走同步执行,而是发一个事件,告诉 Worker"你帮我跑一下",然后立刻告诉用户"任务已提交,等结果"。Orchestrator 在下一轮 LLM 推理前,从注册表里拿走已完成的结果,注入到上下文中。

Worker(执行者):没有跟用户直接交互。它的全部工作就是监听 AsyncTaskEvent,在专有的线程池上执行 Tool,执行完发一个 TaskCompletedEvent 回去。

在哪切一刀

关键问题是:系统怎么知道哪个 Tool 是该走异步的?

答案不是在调用链路中加 if-else,而是让 Tool 自己声明。在 ToolMetadata 里加一个标记:

// 定义 Tool 的时候,告诉 Runtime "我跑得慢"
ToolDefinition scrape = ToolDefinition.builder()
    .name("web_scrape")
    .executor(ctx -> {
        // 耗时 5 秒的操作
        Thread.sleep(5000);
        return "page content";
    })
    .metadata(ToolMetadata.builder()
        .timeout(Duration.ofSeconds(30))
        .longRunning(true)          // ← 这一行决定了走异步路径
        .build())
    .build();

然后在 ToolExecutorRouter 里判断:

public ToolExecutionResult execute(ToolDefinition def, ToolExecutionContext ctx) {
    // 异步路径
    if (def.getMetadata().isLongRunning()) {
        String taskId = UUID.randomUUID().toString().replace("-", "");
        taskRegistry.register(taskId, ctx.getSessionId(), ctx.getTraceId(), def.getName());
        eventPublisher.publishEvent(new AsyncTaskEvent(...));
        // 立刻返回——不阻塞
        return ToolExecutionResult.asyncSubmitted(def.getName(), taskId, ctx);
    }
    // 同步路径(原来的逻辑不变)
    ...
}

这一刀切得干净:对于调用方来说,execute 方法的行为不变——传进去一个 def 和 ctx,返回一个结果。 只是长耗时 Tool 返回的不是执行结果,而是一个占位符 [Task submitted: xxx. The result will be available shortly.]。真正的结果延迟到达,通过事件机制送回。

四个核心组件

整个改动新增了 8 个 Java 文件,没有修改任何已有代码。下面把这四个最关键的讲完。

1. AsyncTaskRegistry — 任务的"等候室"

Worker 在后台跑,Orchestrator 不知道什么时候跑完。需要一个共享的状态表,按 session 分组:

@Component
public class AsyncTaskRegistry {

    private final Map<String, List<TaskEntry>> tasksBySession = new ConcurrentHashMap<>();

    // task 的生命周期
    public void register(String taskId, String sessionId, String traceId, String toolName)
    public void markInProgress(String taskId)
    public void complete(String taskId, ToolExecutionResult result)

    // Orchestrator 在下一轮 LLM 推理前调用,取走已完成的
    // 取走后从表里删除,不会重复消费
    public List<ToolExecutionResult> drainCompleted(String sessionId)

    // 判断 session 是否有后台任务还在跑
    public boolean hasPendingWork(String sessionId)
}

drainCompleted 是设计的关键。它只拿走 COMPLETED 或 FAILED 的任务,PENDING 和 IN_PROGRESS 的留在表里。Orchestrator 每次处理用户消息前调一次,拿到结果就注入 LLM 输入上下文。

2. AsyncTaskEvent + TaskCompletedEvent

两个 Spring 事件类,结构简单到不需要多解释:

// Orchestrator → Worker 的指令
public class AsyncTaskEvent extends ApplicationEvent {
    String taskId, sessionId, traceId;
    ToolDefinition toolDefinition;
    Map<String, Object> args;
}

// Worker → Orchestrator 的回执
public class TaskCompletedEvent extends ApplicationEvent {
    String taskId, sessionId, traceId;
    ToolExecutionResult result;
}

Spring 的 ApplicationEventPublisher 是同步 publish 的——发事件的线程会阻塞直到所有监听器处理完。但 WorkerAgent 的监听器加了 @Async("toolTaskExecutor"),所以实际上发布完 AsyncTaskEvent 就立刻返回了,Worker 在线程池里异步处理。

3. WorkerAgent — 沉默的打工仔

@Component
public class WorkerAgent {

    @EventListener
    @Async("toolTaskExecutor")
    public void handleAsyncTask(AsyncTaskEvent event) {
        // 1. 标记 in-progress
        taskRegistry.markInProgress(event.getTaskId());

        // 2. 构建执行上下文,调现有的 ToolExecutorRouter
        ToolExecutionContext ctx = ToolExecutionContext.builder()
            .sessionId(event.getSessionId())
            .traceId(event.getTraceId())
            .callId(event.getTaskId())
            .args(event.getArgs())
            .build();

        ToolExecutionResult result = executorRouter.execute(def, ctx);

        // 3. 注册完成 + 发事件通知 Orchestrator
        taskRegistry.complete(event.getTaskId(), result);
        eventPublisher.publishEvent(new TaskCompletedEvent(...));
    }
}

注意这里 executorRouter.execute(def, ctx) 调用的是同一个 ToolExecutorRouter——但因为 def 的 longRunning 标记,如果 Worker 也调用了一个 longRunning tool,会递归吗?不会。Worker 接到的 def 已经是在路由层面被判定为 longRunning 的那个 Tool 定义。调用 executorRouter.execute 会再次检查 longRunning 标记——死循环风险。

实际上 Worker 不应该走路由,应该直调 def.getExecutor().execute(ctx) 这是我的代码里需要修的一个点。当前实现中 WorkerAgent 调 executorRouter.execute(def, ctx),而 executorRouter 会再次检查 longRunning 标记。更好的做法是 Worker 直接调 executor,跳过路由层的异步判定。

4. OrchestratorAgent — 结果的中转站

@Component
public class OrchestratorAgent {

    // 监听完成事件——目前只打日志
    @EventListener
    public void onTaskCompleted(TaskCompletedEvent event) {
        log.info("Task {} completed for session {}", event.getTaskId(), event.getSessionId());
    }

    // 给 LLM 循环用的 API
    public List<ToolExecutionResult> drainCompletedResults(String sessionId) {
        return taskRegistry.drainCompleted(sessionId);
    }

    // 生成后台任务摘要,注入到 LLM system prompt
    public String buildBackgroundSummary(String sessionId) {
        // 拼接成类似:
        // [Background Tasks]
        // - Tool 'web_scrape' is in_progress (taskId: abc123)
    }

    public boolean hasBackgroundWork(String sessionId) {
        return taskRegistry.hasPendingWork(sessionId);
    }
}

异步处理的线程池

AsyncConfig 里配了一个专门的线程池:

@Bean("toolTaskExecutor")
public Executor toolTaskExecutor(RuntimeConfig runtimeConfig) {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);     // 可配置
    executor.setMaxPoolSize(8);      // 可配置
    executor.setQueueCapacity(50);
    executor.setThreadNamePrefix("tool-worker-");
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(30);
    return executor;
}

四个核心线程意味着最多四个 Tool 同时执行。如果你有 10 个耗时 Tool 请求同时进来,6 个会在队列里等着。这比让它们全部并发跑更安全——不会一瞬间打爆下游服务。

用户交互模型怎么变

这是目前实现中还没有完全覆盖的部分。让我画出理想中的交互流程:

→ 用户: 爬这三个网页,帮我写个摘要
  Orchestrator:
    1. LLM 推理 → 决定调 3 次 web_scrape
    2. 检查标记 → 都是 longRunning
    3. 发 3 个 AsyncTaskEvent
    4. 返回用户: "已开始爬取,完成后我会整理摘要"
    (SSE 推送: task-started, taskId=xxx)

→ 用户: 对了重点看第二部分内容
  Orchestrator:
    1. 用户消息进入 LLM 推理
    2. 同时注入后台任务状态:
       "[Background Tasks]
        - Tool 'web_scrape(url=page1)' is in_progress
        - Tool 'web_scrape(url=page2)' is in_progress
        - Tool 'web_scrape(url=page3)' is in_progress"
    3. LLM 理解后回复: "好的,我会重点关注第二部分"

  (Worker 陆续完成 → TaskCompletedEvent)
  Orchestrator drain → 拿到三个结果

→ Orchestrator 自动触发下一轮 LLM 推理:
    注入三个爬取结果 → LLM 生成摘要 → 推送给用户

关键点在于:Orchestrator 的 LLM 循环需要感知后台任务的存在。每次用户发消息时,Orchestrator 不只是把消息丢给 LLM,还要带上当前的 background summary——告诉模型"有几个 Tool 还在跑、分别是什么、进度如何"。这样 LLM 才能做出合理的回应(比如"稍等"或"好的我记下了")。

当前实现中 OrchestratorAgent 提供了 buildBackgroundSummarydrainCompletedResults,但完整的 LLM 循环集成——包括自动触发推理当所有后台任务完成——留给了下一步。这也是一个有意思的工程问题:是轮询?还是事件驱动?我个人倾向事件驱动——当 Worker 发出 TaskCompletedEvent 时,Orchestrator 检查该 session 是否所有任务都完成了,如果是,自动执行下一轮推理。

改了哪些

操作 说明
agent/event/ 新增 2 个文件 AsyncTaskEvent, TaskCompletedEvent
agent/worker/ 新增 2 个文件 AsyncTaskRegistry, WorkerAgent
agent/orchestrator/ 新增 1 个文件 OrchestratorAgent
agent/ 新增 1 个文件 DemoLongRunningTool
core/config/ 新增 1 个文件 AsyncConfig
core/registry/ 改 3 个文件 ToolMetadata + longRunning 标记; ToolExecutorRouter + 异步分支; ToolExecutionResult + asyncSubmitted 工厂

已有代码零删除。所有改动是增量叠加。

什么时候该拆、什么时候不该

写完这篇我最大的体会是:不是所有多 Agent 都是架构炫技,但也不是单 Agent 一慢就该拆。

该拆的特征:
- Tool 执行时间明显超过用户可接受的等待阈值(>3-5 秒)
- 耗时操作期间用户有调整方向的需求
- 耗时操作可以并行(比如爬三个网页,不是串行依赖)

不该拆的特征:
- Tool 本身很快(毫秒级),拆了引入的异步复杂度远超收益
- LLM 推理本身是瓶颈(换模型比换架构有效)
- 你只需要一个"跑完了叫我"的通知,不需要中途交互

对于这个 MCP Runtime 项目,现在你有两条路可以跑了:同步的短 Tool 走老路,异步的长 Tool 走新路。两条路的工具注册、执行、监控机制是同一套——区别只在路由那一个 if 分支。

代码在 GitHub:betterpursue/mcp-agent-runtimeagent/ 包下是这篇新增的全部内容。


(系列目录:一:架构总览 | 二:Tool Registry | 三:Execution Engine | 四:Structured Output 兼容层 | 五:Context Management | 六:State Management | 七:Observability | 八:生产化 | 番外:异步分解)

评论

此博客中的热门博文

我写了半年 prompt,最后发现最好的技巧就三个