从零构建一个 MCP Agent 运行时(三):Tool Execution Engine,LLM 说句话到你拿到结果之间发生了什么

LLM 发起一个 tool call,到结果回到 LLM 手里,中间经过的完整链路。参数校验、隐式类型转换、拦截器编排、超时控制、重试策略、兜底降级——每个环节都可能在线上咬你一口。

Summary: LLM 发起一个 tool call,到结果回到 LLM 手里,中间经过的完整链路。参数校验、隐式类型转换、拦截器编排、超时控制、重试策略、兜底降级——每个环节都可能在线上咬你一口。


Part-02 把 ToolRegistry 和拦截器链搭好了。Registry 里注册了一堆 tool,拦截器挂在上面,看起来一切就绪。但你真的在生产环境跑过 tool call 吗?

LLM 说了一句"调用 get_weather('Beijing')",到最终结果返回给 LLM——这条链路上任何一个环节出错,LLM 就会给出一个莫名其妙的下文。参数类型不匹配,LLM 不会告诉你"参数类型错了",它会自己编一个结果。超时了,模型会换一种方式重新调用,然后你又收到了同样的请求。重试逻辑写错了,同一个幂等操作被执行了三次,用户在 UI 上看到了三条重复的 booking confirmation。

这一篇就把这条链路拆干净。从 MCP 的 JSON-RPC 请求进入 Runtime 开始,到执行结果返回给 LLM 结束,中间每一个环节——参数解析、类型校验、拦截器链、超时控制、重试策略、兜底降级——全部展开。

你能看到的完整链路

从调用方视角看,执行一个 tool 的流程可以抽象成六个步骤:

CallToolRequest ──→ ArgParser ──→ ToolResolver ──→ ArgValidator
                                                 │
                                                 ↓
                   CallToolResult ←── ResultFormatter ←── ExecutorChain
                                              ↑
                                        TimeoutWatcher
                                        RetryHandler
                                        FallbackResolver

每个步骤都可以单独替换实现。这是后面一切可观测性、热更新、A/B 测试的基础。

先从最外层的入口说起。

从 MCP 请求到执行上下文

Part-02 的 McpToolBridge 已经处理了 callHandler 的回调。但那段代码我写得很糙——直接把 request.arguments() 塞进 ToolExecutionContext 就完事了。生产环境里不能这么做。

MCP 协议层返回的 arguments 是一个 Map<String, Object>,但 Object 的实际类型是 JSON 反序列化后的产物——数字是 IntegerDouble,嵌套对象是 Map<String, Object>,数组是 List<Object>。没有任何类型保证。

所以 ArgParser 的第一件事不是解析参数,而是校验参数的完整性

@Component
public class ArgParser {

    public ParsedArgs parse(Map<String, Object> rawArgs, JsonSchema schema) {
        Map<String, Object> parsed = new LinkedHashMap<>();
        List<ValidationError> errors = new ArrayList<>();

        // 1. 检查必需参数
        for (String required : schema.getRequired()) {
            if (rawArgs == null || !rawArgs.containsKey(required)) {
                errors.add(new ValidationError(required, "missing",
                    "Required parameter '" + required + "' is missing"));
                continue;
            }
            Object value = rawArgs.get(required);
            if (value == null || (value instanceof String && ((String) value).isBlank())) {
                errors.add(new ValidationError(required, "null_or_empty",
                    "Required parameter '" + required + "' is null or empty"));
            }
        }

        if (!errors.isEmpty()) {
            throw new ArgumentValidationException(errors);
        }

        // 2. 按 schema 定义的类型逐个转换
        for (Map.Entry<String, SchemaProperty> entry : schema.getProperties().entrySet()) {
            String name = entry.getKey();
            SchemaProperty prop = entry.getValue();
            Object rawValue = rawArgs != null ? rawArgs.get(name) : null;

            if (rawValue == null) {
                // 有默认值就用默认值
                if (prop.getDefaultValue() != null) {
                    parsed.put(name, prop.getDefaultValue());
                }
                continue;
            }

            try {
                Object coerced = coerce(rawValue, prop.getType());
                parsed.put(name, coerced);
            } catch (TypeCoercionException e) {
                errors.add(new ValidationError(name, "type_mismatch", e.getMessage()));
            }
        }

        if (!errors.isEmpty()) {
            throw new ArgumentValidationException(errors);
        }

        return new ParsedArgs(parsed);
    }

    @SuppressWarnings("unchecked")
    private Object coerce(Object value, String targetType) {
        if (value == null) return null;

        return switch (targetType) {
            case "string" -> {
                if (value instanceof String s) yield s;
                if (value instanceof Number n) yield n.toString();
                if (value instanceof Boolean b) yield b.toString();
                throw new TypeCoercionException("Cannot coerce " +
                    value.getClass().getSimpleName() + " to string");
            }
            case "integer" -> {
                if (value instanceof Integer i) yield i;
                if (value instanceof Long l) {
                    if (l > Integer.MAX_VALUE || l < Integer.MIN_VALUE) {
                        throw new TypeCoercionException("Long value " + l + " out of int range");
                    }
                    yield l.intValue();
                }
                if (value instanceof Double d) {
                    int i = d.intValue();
                    if (d != i) {
                        throw new TypeCoercionException("Cannot coerce float " + d + " to integer");
                    }
                    yield i;
                }
                if (value instanceof String s) {
                    try {
                        yield Integer.parseInt(s.trim());
                    } catch (NumberFormatException e) {
                        throw new TypeCoercionException("Cannot coerce string '" + s + "' to integer");
                    }
                }
                throw new TypeCoercionException("Cannot coerce " +
                    value.getClass().getSimpleName() + " to integer");
            }
            case "number" -> {
                if (value instanceof Number n) yield n;
                if (value instanceof String s) {
                    try {
                        yield Double.parseDouble(s.trim());
                    } catch (NumberFormatException e) {
                        throw new TypeCoercionException("Cannot coerce string '" + s + "' to number");
                    }
                }
                throw new TypeCoercionException("Cannot coerce " +
                    value.getClass().getSimpleName() + " to number");
            }
            case "boolean" -> {
                if (value instanceof Boolean b) yield b;
                if (value instanceof String s) {
                    if ("true".equalsIgnoreCase(s)) yield true;
                    if ("false".equalsIgnoreCase(s)) yield false;
                    throw new TypeCoercionException("Cannot coerce string '" + s + "' to boolean");
                }
                throw new TypeCoercionException("Cannot coerce " +
                    value.getClass().getSimpleName() + " to boolean");
            }
            case "array" -> {
                if (value instanceof List<?>) yield value;
                throw new TypeCoercionException("Cannot coerce " +
                    value.getClass().getSimpleName() + " to array");
            }
            case "object" -> {
                if (value instanceof Map<?, ?>) yield value;
                throw new TypeCoercionException("Cannot coerce " +
                    value.getClass().getSimpleName() + " to object");
            }
            default ->
                throw new TypeCoercionException("Unknown target type: " + targetType);
        };
    }
}

这段代码看起来啰嗦。每个类型转换都要显式处理,JSON 反序列化出来什么类型、目标类型是什么——都必须逐个映射。我用 switch 表达式处理了六种基础类型和两种复合类型。string → integer 这种跨类型的转换在后面接推理模型时特别常见——Claude 喜欢传数字不加引号,GPT 习惯传字符串,混合用的时候你这里不做兜底,模型就炸了。

ParsedArgs 是一个简单的包装类,提供类型安全的 getter:

public class ParsedArgs {

    private final Map<String, Object> values;

    ParsedArgs(Map<String, Object> values) {
        this.values = values;
    }

    @SuppressWarnings("unchecked")
    public <T> T get(String name) {
        return (T) values.get(name);
    }

    public <T> T getOrDefault(String name, T defaultValue) {
        return (T) values.getOrDefault(name, defaultValue);
    }

    public String getString(String name) {
        Object v = values.get(name);
        return v != null ? v.toString() : null;
    }

    public Integer getInt(String name) {
        Object v = values.get(name);
        if (v instanceof Integer i) return i;
        if (v instanceof Long l) return l.intValue();
        if (v instanceof Number n) return n.intValue();
        return null;
    }

    public boolean has(String name) {
        return values.containsKey(name);
    }

    public Map<String, Object> toMap() {
        return values;
    }
}

ExecutionChain:拦截器编排的完整形态

Part-02 的 InterceptorChain 做得太简单了——before → execute → after 三步走完。但生产级的执行链不止三个钩子。

实际中你需要的是一个责任链模式,每个拦截器都能决定"放行"还是"短路"

public class ExecutionChain {

    private final List<ToolInterceptor> interceptors;
    private final ToolExecutor executor;
    private int currentIndex = 0;

    public ExecutionChain(List<ToolInterceptor> interceptors, ToolExecutor executor) {
        this.interceptors = interceptors;
        this.executor = executor;
    }

    /**
     * 执行拦截器链,最终到达实际的 executor。
     * 每个拦截器可以:
     * - 什么都不做,直接 chain.proceed() 放行
     * - 修改 ctx 后再 proceed()
     * - 不调用 proceed(),直接返回结果(短路)
     * - 抛出异常中断链路
     */
    public ToolExecutionResult proceed(ToolExecutionContext ctx) throws Exception {
        if (currentIndex < interceptors.size()) {
            ToolInterceptor interceptor = interceptors.get(currentIndex);
            currentIndex++;
            return interceptor.intercept(ctx, this);
        }
        // 所有拦截器放行,执行实际的 tool
        Object result = executor.execute(ctx);
        return ToolExecutionResult.success(
            ctx.getToolName(), result, ctx);
    }
}

拦截器接口也要变。之前的 beforeExecute/afterExecute/onError 三个方法虽然直观,但不够灵活——拦截器不能中途短路,不能绕过后置逻辑。改成单一 intercept 方法,用 chain.proceed() 控制流程:

@FunctionalInterface
public interface ToolInterceptor {
    ToolExecutionResult intercept(ToolExecutionContext ctx, ExecutionChain chain)
        throws Exception;
}

这个改动看似简单,但差别很大。以前拦截器只能"在 executor 执行前/后插一脚",现在拦截器可以完全接管执行流程。举个例子:缓存拦截器

@Component
public class CacheInterceptor implements ToolInterceptor {

    private final Cache<String, ToolExecutionResult> cache = Caffeine.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .build();

    @Override
    public ToolExecutionResult intercept(ToolExecutionContext ctx, ExecutionChain chain)
            throws Exception {
        // 只缓存幂等的 GET 类 tool
        if (!isIdempotent(ctx.getToolName())) {
            return chain.proceed(ctx);
        }

        String cacheKey = buildCacheKey(ctx);
        ToolExecutionResult cached = cache.getIfPresent(cacheKey);
        if (cached != null) {
            return cached.withMetadata("from_cache", "true");
        }

        ToolExecutionResult result = chain.proceed(ctx);
        if (result.isSuccess()) {
            cache.put(cacheKey, result);
        }
        return result;
    }

    private String buildCacheKey(ToolExecutionContext ctx) {
        // toolName + 参数排序后作为 key
        return ctx.getToolName() + ":" + new TreeMap<>(ctx.getArgs());
    }

    private boolean isIdempotent(String toolName) {
        return !toolName.startsWith("create_")
            && !toolName.startsWith("delete_")
            && !toolName.startsWith("update_");
    }
}

CacheInterceptor 没有调用 chain.proceed() 就返回了结果——这就是短路。拦截器链后面的所有逻辑(rate limit、logging、metrics)都不执行了。如果这是一次幂等查询,这个行为是对的。但如果这个 tool 其实是"查完缓存就够了的查询",被跳过的 rate limit 反而没问题——缓存命中本来就不应该算一次配额。

但这里有一个陷阱:拦截器顺序是确定的,写在代码中的。 如果你注册的顺序是 [rateLimit, cache, logging],cache 在前面短路了,rateLimit 根本没机会执行。你需要想清楚哪些拦截器应该无条件执行,哪些可以被跳过。

我的做法是把拦截器分成两个阶段:

public class ExecutionChain {

    private final List<ToolInterceptor> mandatoryInterceptors;  // 无条件执行
    private final List<ToolInterceptor> optionalInterceptors;    // 可被短路
    private final ToolExecutor executor;
    private int mandatoryIndex = 0;
    private int optionalIndex = 0;
    private boolean executorReached = false;

    public ToolExecutionResult proceed(ToolExecutionContext ctx) throws Exception {
        // 第一阶段:强制拦截器(审计、tracing)
        if (mandatoryIndex < mandatoryInterceptors.size()) {
            ToolInterceptor interceptor = mandatoryInterceptors.get(mandatoryIndex);
            mandatoryIndex++;
            return interceptor.intercept(ctx, this);
        }
        // 第二阶段:可选拦截器(缓存、rate limit)
        if (optionalIndex < optionalInterceptors.size()) {
            ToolInterceptor interceptor = optionalInterceptors.get(optionalIndex);
            optionalIndex++;
            return interceptor.intercept(ctx, this);
        }
        // 执行
        executorReached = true;
        Object result = executor.execute(ctx);
        return ToolExecutionResult.success(ctx.getToolName(), result, ctx);
    }

    public boolean isExecutorReached() {
        return executorReached;
    }
}

这样审计和 tracing 插桩永远不会被短路。缓存命中了,metrics 照样录入了。rate limit 超了,日志里照常能看到调用的 trace。

超时控制:CompletableFuture 是唯一正确的路

Java 里做超时看起来选择很多,但多数是假的。

Future.get(timeout, TimeUnit) 可以设超时,但如果 worker 线程还在后台跑,超时后的线程并不会被中断——它会在你不知情的情况下继续消耗线程池资源。Thread.interrupt() 不保证停下 IO 操作。在 tool 调用这种场景下,executor 可能是 HTTP 请求、数据库查询、甚至是另一个 MCP Server 的调用,超时后不真正停止执行,线程池很快就满了。

正确的做法是 CompletableFuture.orTimeout()——Java 9 引入的 API,配合独立的线程池隔离:

@Component
public class TimeoutExecutor {

    private final ThreadPoolExecutor toolThreadPool;
    private final ThreadPoolExecutor timeoutWatcherPool;

    public TimeoutExecutor(
            @Value("${tool.executor.core-pool:16}") int corePool,
            @Value("${tool.executor.max-pool:64}") int maxPool,
            @Value("${tool.executor.queue-capacity:256}") int queueCapacity) {

        this.toolThreadPool = new ThreadPoolExecutor(
            corePool, maxPool,
            60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(queueCapacity),
            new ThreadFactoryBuilder().setNameFormat("tool-worker-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 独立的小线程池用于超时调度,不跟业务线程抢资源
        this.timeoutWatcherPool = new ThreadPoolExecutor(
            2, 4,
            60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder().setNameFormat("tool-timeout-%d").build()
        );
    }

    public ToolExecutionResult executeWithTimeout(
            ToolDefinition def,
            ToolExecutionContext ctx,
            Duration timeout) {

        CompletableFuture<ToolExecutionResult> future = CompletableFuture
            .supplyAsync(() -> {
                try {
                    ExecutionChain chain = buildChain(def);
                    return chain.proceed(ctx);
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            }, toolThreadPool);

        try {
            return future
                .orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
                .get();
        } catch (TimeoutException e) {
            // 关键:超时后主动取消,中断 worker 线程
            future.cancel(true);
            return ToolExecutionResult.failure(
                def.getName(),
                new ToolTimeoutException("Tool '" + def.getName()
                    + "' timed out after " + timeout.toMillis() + "ms"),
                ctx);
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            return ToolExecutionResult.failure(
                def.getName(),
                cause instanceof Exception ex ? ex : new RuntimeException(cause),
                ctx);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return ToolExecutionResult.failure(
                def.getName(), e, ctx);
        }
    }

    private ExecutionChain buildChain(ToolDefinition def) {
        return new ExecutionChain(
            def.getMandatoryInterceptors(),
            def.getOptionalInterceptors(),
            def.getExecutor());
    }
}

future.cancel(true) 会中断执行线程。注意这里有个前提——你的 executor 实现必须对中断有响应。如果 executor 里是一个 Thread.sleep() 或者 InputStream.read(),中断信号是有效的。但如果 executor 里是一个不会检查中断标志位的纯 CPU 计算,cancel(true) 杀不死它。这种情况下你需要业务层面的超时感知——在 executor 里主动检查中断状态。

超时值怎么取?不要全局写死一个值。

ToolMetadata 里已经预留了 timeout 字段。我的做法是三层兜底:

Duration resolveTimeout(ToolDefinition def, ToolExecutionContext ctx) {
    // 1. tool 级别的超时(精度最高)
    if (def.getMetadata().getTimeout() != null) {
        return def.getMetadata().getTimeout();
    }
    // 2. 根据 tool 名称模式推断
    if (def.getName().startsWith("search_")) {
        return Duration.ofSeconds(10);
    }
    if (def.getName().startsWith("create_")) {
        return Duration.ofSeconds(30);
    }
    // 3. 全局默认
    return Duration.ofSeconds(15);
}

重试策略:什么该重试,什么不该

tool 调用失败后要不要重试,这个决策比你想的复杂。很多人直接用 Spring Retry 的 @Retryable 一贴了事,结果该重试的没重试、不该重试的反复执行。

我的重试策略基于错误类型分类

public class RetryPolicy {

    private final int maxAttempts;
    private final Duration baseDelay;
    private final double backoffMultiplier;
    private final Predicate<Throwable> retryablePredicate;

    private RetryPolicy(Builder builder) {
        this.maxAttempts = builder.maxAttempts;
        this.baseDelay = builder.baseDelay;
        this.backoffMultiplier = builder.backoffMultiplier;
        this.retryablePredicate = builder.retryablePredicate;
    }

    public boolean shouldRetry(int attempt, Throwable error) {
        if (attempt >= maxAttempts) return false;
        return retryablePredicate.test(error);
    }

    public Duration nextDelay(int attempt) {
        long delay = (long) (baseDelay.toMillis() * Math.pow(backoffMultiplier, attempt));
        return Duration.ofMillis(Math.min(delay, 30_000)); // 上限 30s
    }

    // Builder 略

    // 预置策略
    public static RetryPolicy networkRetry() {
        return RetryPolicy.builder()
            .maxAttempts(3)
            .baseDelay(Duration.ofMillis(500))
            .backoffMultiplier(2.0)
            .retryablePredicate(error ->
                error instanceof SocketTimeoutException
                    || error instanceof ConnectException
                    || error instanceof IOException)
            .build();
    }

    public static RetryPolicy idempotentSafeRetry() {
        return RetryPolicy.builder()
            .maxAttempts(2)
            .baseDelay(Duration.ofMillis(200))
            .backoffMultiplier(1.5)
            .retryablePredicate(error -> true) // 幂等操作可以无脑重试
            .build();
    }

    public static RetryPolicy noRetry() {
        return RetryPolicy.builder()
            .maxAttempts(1)
            .baseDelay(Duration.ZERO)
            .backoffMultiplier(1.0)
            .retryablePredicate(error -> false)
            .build();
    }
}

真正重要的是那个 retryablePredicate。我的分类原则只有一条:

把 tool 失败分为可重试和不可重试两类,分类标准看 tool 的行为,而不是看异常类型。

  • get_weather 超时 → 可重试(幂等查询)
  • create_order 超时 → 不可自动重试(可能已经创建成功,没等到返回)
  • search_hotels 返回 500 → 可重试(服务端错误)
  • send_email 返回 500 → 不可自动重试(邮件可能已经被发送了,但响应丢了)

所以 retry policy 应该由 tool 自己声明,而不是全局统一:

ToolDefinition.builder()
    .name("send_email")
    .executor(ctx -> emailService.send(ctx.getArg("to"), ctx.getArg("body")))
    .retryPolicy(RetryPolicy.noRetry())
    .build();

ToolDefinition.builder()
    .name("search_hotels")
    .executor(ctx -> hotelService.search(ctx.getArg("city"), ...))
    .retryPolicy(RetryPolicy.networkRetry())
    .build();

ToolDefinition.Builder 加上 retryPolicy 字段,ToolMetadata 里也可以有一个兜底的默认值。

有了分类和策略,重试的执行器长这样:

@Component
public class RetryExecutor {

    private static final Logger log = LoggerFactory.getLogger(RetryExecutor.class);

    public ToolExecutionResult executeWithRetry(
            ToolDefinition def,
            ToolExecutionContext ctx,
            TimeoutExecutor timeoutExecutor) {

        RetryPolicy policy = def.getRetryPolicy();
        Duration timeout = resolveTimeout(def, ctx);
        Throwable lastError = null;

        for (int attempt = 1; attempt <= policy.getMaxAttempts(); attempt++) {
            try {
                if (attempt > 1) {
                    Duration delay = policy.nextDelay(attempt - 1);
                    log.warn("Retrying tool {} (attempt {}/{}, delay={})",
                        def.getName(), attempt, policy.getMaxAttempts(), delay);
                    Thread.sleep(delay.toMillis());
                }
                return timeoutExecutor.executeWithTimeout(def, ctx, timeout);

            } catch (ToolTimeoutException e) {
                // 超时:如果是幂等操作,可以重试
                if (policy.shouldRetry(attempt, e)) {
                    lastError = e;
                    continue;
                }
                return ToolExecutionResult.failure(def.getName(), e, ctx);

            } catch (Exception e) {
                if (policy.shouldRetry(attempt, e)) {
                    lastError = e;
                    continue;
                }
                return ToolExecutionResult.failure(def.getName(), e, ctx);
            }
        }

        return ToolExecutionResult.failure(def.getName(),
            new ToolExecutionException("All " + policy.getMaxAttempts()
                + " attempts failed", lastError), ctx);
    }
}

兜底降级:失败也要有尊严

所有重试都失败了怎么办?直接抛异常是最简单的做法,但不一定是最好的。

LLM 拿到一个 error result 后,行为不可控——有的模型会重新尝试,有的会跟用户道歉,有的会自己编一个结果。更好的做法是给 LLM 一个"优雅降级后的数据",让它至少能继续推理。

兜底策略有三个层次:

@FunctionalInterface
public interface FallbackStrategy {
    Optional<ToolExecutionResult> fallback(ToolDefinition def,
                                           ToolExecutionContext ctx,
                                           Throwable error);
}

@Component
public class FallbackManager {

    private final List<FallbackStrategy> strategies = List.of(
        new CacheFallback(),
        new StubFallback(),
        new GracefulErrorFallback()
    );

    public ToolExecutionResult resolve(ToolDefinition def,
                                       ToolExecutionContext ctx,
                                       Throwable error) {
        for (FallbackStrategy strategy : strategies) {
            Optional<ToolExecutionResult> result = strategy.fallback(def, ctx, error);
            if (result.isPresent()) {
                return result.get();
            }
        }
        // 最终保底:返回格式化的错误消息
        return GracefulErrorFallback.INSTANCE.fallback(def, ctx, error)
            .orElseThrow(() -> new IllegalStateException("No fallback available"));
    }
}

第一层:缓存降级。 如果之前成功执行过同参数的调用,返回上次的结果。加一个 stale 标记。

public class CacheFallback implements FallbackStrategy {

    private final Cache<String, ToolExecutionResult> resultCache;

    public CacheFallback() {
        this.resultCache = Caffeine.newBuilder()
            .maximumSize(5000)
            .expireAfterWrite(30, TimeUnit.MINUTES)
            .build();
    }

    @Override
    public Optional<ToolExecutionResult> fallback(
            ToolDefinition def, ToolExecutionContext ctx, Throwable error) {
        String cacheKey = buildKey(def, ctx);
        ToolExecutionResult cached = resultCache.getIfPresent(cacheKey);
        if (cached != null) {
            return Optional.of(cached.withMetadata("fallback", "stale_cache"));
        }
        return Optional.empty();
    }

    // 这个方法给 CacheInterceptor 调用,记录正常执行的结果
    public void recordSuccess(ToolDefinition def, ToolExecutionContext ctx,
                              ToolExecutionResult result) {
        if (result.isSuccess()) {
            resultCache.put(buildKey(def, ctx), result);
        }
    }

    private String buildKey(ToolDefinition def, ToolExecutionContext ctx) {
        return def.getName() + ":" + new TreeMap<>(ctx.getArgs());
    }
}

第二层:桩数据降级。 有些 tool 在失败时可以返回"我知道这可能是错的,但至少不是空"的数据。

比如 search_hotels 接口挂了,可以返回一些热门酒店的桩数据,加上一个明确的 disclaimer。LLM 拿到这些数据后会怎么做?取决于模型。但至少比直接抛一个 HTTP 500 好。

public class StubFallback implements FallbackStrategy {

    @Override
    public Optional<ToolExecutionResult> fallback(
            ToolDefinition def, ToolExecutionContext ctx, Throwable error) {
        // 只有显式标注了 stub 的 tool 才启用
        String stubData = def.getMetadata().getStubData();
        if (stubData == null) {
            return Optional.empty();
        }
        ToolExecutionResult result = ToolExecutionResult.success(
            def.getName(), stubData, ctx);
        return Optional.of(result.withMetadata("fallback", "stub_data"));
    }
}

tool 声明时带上桩数据:

ToolDefinition.builder()
    .name("search_hotels")
    .metadata(ToolMetadata.builder()
        .stubData("{\"hotels\":[{\"name\":\"示例酒店A\",\"price\":599}," +
                  "{\"name\":\"示例酒店B\",\"price\":799}]," +
                  "\"note\":\"此为降级数据,非实时查询结果\"}")
        .build())
    .build();

第三层:优雅错误消息。 以上两层都没命中时,构造一个对 LLM 友好的错误消息。不是简单的 "Error: timeout",而是告诉 LLM 发生了什么、可以做什么补救:

public class GracefulErrorFallback implements FallbackStrategy {

    public static final GracefulErrorFallback INSTANCE = new GracefulErrorFallback();

    @Override
    public Optional<ToolExecutionResult> fallback(
            ToolDefinition def, ToolExecutionContext ctx, Throwable error) {
        String message = String.format(
            "{\"error\":\"%s\",\"tool\":\"%s\",\"message\":\"%s\",\"suggestion\":\"%s\"}",
            classifyError(error),
            def.getName(),
            escapeJson(error.getMessage()),
            suggestAction(error)
        );
        ToolExecutionResult result = ToolExecutionResult.success(
            def.getName(), message, ctx);
        result.addMetadata("fallback", "graceful_error");
        return Optional.of(result);
    }

    private String classifyError(Throwable error) {
        if (error instanceof ToolTimeoutException) return "timeout";
        if (error instanceof ArgumentValidationException) return "invalid_arguments";
        if (error instanceof ToolNotFoundException) return "tool_not_found";
        if (error instanceof SocketTimeoutException || error instanceof ConnectException) {
            return "network_error";
        }
        return "internal_error";
    }

    private String suggestAction(Throwable error) {
        if (error instanceof ToolTimeoutException) {
            return "请稍后重试,或简化查询条件";
        }
        if (error instanceof ArgumentValidationException) {
            return "请检查参数格式和类型后重试";
        }
        return "系统暂时不可用,请稍后再试";
    }

    private String escapeJson(String s) {
        if (s == null) return "";
        return s.replace("\\", "\\\\")
                .replace("\"", "\\\"")
                .replace("\n", "\\n")
                .replace("\r", "\\r")
                .replace("\t", "\\t");
    }
}

这样拿到的结果结构是:

{
  "error": "timeout",
  "tool": "get_weather",
  "message": "Tool 'get_weather' timed out after 10000ms",
  "suggestion": "请稍后重试,或简化查询条件"
}

LLM 读到这个 JSON,至少知道是超时而不是"查不到数据"。大多数模型会根据 suggestion 字段决定下一步——延迟重试、询问用户、或者跳过这个 tool 继续推理。

组装:Engine 完整形态

把上面所有部件拼起来,就是 Execution Engine 的入口类:

@Component
public class ToolExecutionEngine {

    private final ToolRegistry registry;
    private final ArgParser argParser;
    private final RetryExecutor retryExecutor;
    private final TimeoutExecutor timeoutExecutor;
    private final FallbackManager fallbackManager;

    public ToolExecutionEngine(ToolRegistry registry,
                               ArgParser argParser,
                               RetryExecutor retryExecutor,
                               TimeoutExecutor timeoutExecutor,
                               FallbackManager fallbackManager) {
        this.registry = registry;
        this.argParser = argParser;
        this.retryExecutor = retryExecutor;
        this.timeoutExecutor = timeoutExecutor;
        this.fallbackManager = fallbackManager;
    }

    public ToolExecutionResult execute(String toolName,
                                       Map<String, Object> rawArgs,
                                       String sessionId,
                                       String traceId) {

        // 1. 查找 tool
        ToolDefinition def = registry.getRequired(toolName);

        // 2. 构建上下文
        ToolExecutionContext ctx = ToolExecutionContext.builder()
            .toolName(def.getName())
            .sessionId(sessionId)
            .traceId(traceId)
            .callId(UUID.randomUUID().toString())
            .args(rawArgs)
            .build();

        // 3. 参数解析与校验
        try {
            ParsedArgs parsed = argParser.parse(rawArgs, def.getInputSchema());
            ctx.setParsedArgs(parsed);
        } catch (ArgumentValidationException e) {
            // 参数校验失败不走重试,直接 fallback
            return fallbackManager.resolve(def, ctx, e);
        }

        // 4. 执行(含重试)
        try {
            return retryExecutor.executeWithRetry(def, ctx, timeoutExecutor);
        } catch (Exception e) {
            // 5. 所有重试失败,走兜底降级
            return fallbackManager.resolve(def, ctx, e);
        }
    }
}

然后在 McpToolBridge 里替换原来简单的执行逻辑:

private McpServerFeatures.SyncToolSpecification buildSpecification(ToolDefinition def) {
    McpSchema.Tool mcpTool = McpSchema.Tool.builder()
        .name(def.getName())
        .description(def.getDescription())
        .inputSchema(def.getInputSchema().toMap())
        .build();

    return McpServerFeatures.SyncToolSpecification.builder()
        .tool(mcpTool)
        .callHandler((exchange, request) -> {
            ToolExecutionResult result = executionEngine.execute(
                def.getName(),
                request.arguments() != null ? request.arguments() : Map.of(),
                exchange.sessionId(),
                UUID.randomUUID().toString()
            );

            String content = result.getResult() != null
                ? result.getResult().toString()
                : "";

            return McpSchema.CallToolResult.builder()
                .content(List.of(new McpSchema.TextContent(content)))
                .isError(result.isError())
                .build();
        })
        .build();
}

一条链路走完,算一笔账

从 LLM 的 JSON-RPC 请求到结果返回给 LLM,不要觉得"不就是调个方法吗"。把这一层做厚,收益在后面积累——Part-07 的可观测性插桩会直接挂在这条链路上,Part-06 的状态管理也会复用这里的分发逻辑。

这条链路上几个容易被低估的细节:

参数校验不做,模型会教你做人。 LLM 输出 JSON 的格式因模型而异——Grok 喜欢不带引号的数字,Claude 会在 string 字段里传空字符串,Qwen 有时塞进去一个多余的嵌套。ArgParser 的隐式类型转换是我踩坑踩出来的妥协。

超时别用 Future.get(timeout)。 不做 cancel 的超时是假的。超时后线程还在后台跑,请求量一上来线程池就满了,然后连锁反应——心跳超时、TCP 连接池枯竭、整个 Runtime 无响应。

重试谁执行谁决策。 全局统一的重试策略是错的。幂等查询可以自动重试,非幂等的写操作重试等于帮用户下单三次。让 tool 自己决定是否可重试,engine 只做执行者。

降级比失败好,但降级要有标记。 缓存降级返回的数据打上 fallback: stale_cache 标记,LLM 可以决定是否信任。没有标记的自动降级等于在模型面前说谎。

接下来

这一篇讲了 Execution Engine 的核心链路——参数解析、类型校验与转换、拦截器链的两种阶段划分、超时控制的正确姿势、基于 tool 语义的重试策略、以及三层兜底降级。

下一篇讲 Structured Output 兼容层。推理模型在工具调用上的特殊性会让你重新审视这里的每一个设计——尤其是当模型输出的参数格式跟你预期的完全不一样的时候。

好,先写到这。

评论

此博客中的热门博文

我写了半年 prompt,最后发现最好的技巧就三个