从零构建一个 MCP Agent 运行时(三):Tool Execution Engine,LLM 说句话到你拿到结果之间发生了什么
LLM 发起一个 tool call,到结果回到 LLM 手里,中间经过的完整链路。参数校验、隐式类型转换、拦截器编排、超时控制、重试策略、兜底降级——每个环节都可能在线上咬你一口。
Summary: LLM 发起一个 tool call,到结果回到 LLM 手里,中间经过的完整链路。参数校验、隐式类型转换、拦截器编排、超时控制、重试策略、兜底降级——每个环节都可能在线上咬你一口。
Part-02 把 ToolRegistry 和拦截器链搭好了。Registry 里注册了一堆 tool,拦截器挂在上面,看起来一切就绪。但你真的在生产环境跑过 tool call 吗?
LLM 说了一句"调用 get_weather('Beijing')",到最终结果返回给 LLM——这条链路上任何一个环节出错,LLM 就会给出一个莫名其妙的下文。参数类型不匹配,LLM 不会告诉你"参数类型错了",它会自己编一个结果。超时了,模型会换一种方式重新调用,然后你又收到了同样的请求。重试逻辑写错了,同一个幂等操作被执行了三次,用户在 UI 上看到了三条重复的 booking confirmation。
这一篇就把这条链路拆干净。从 MCP 的 JSON-RPC 请求进入 Runtime 开始,到执行结果返回给 LLM 结束,中间每一个环节——参数解析、类型校验、拦截器链、超时控制、重试策略、兜底降级——全部展开。
你能看到的完整链路
从调用方视角看,执行一个 tool 的流程可以抽象成六个步骤:
CallToolRequest ──→ ArgParser ──→ ToolResolver ──→ ArgValidator
│
↓
CallToolResult ←── ResultFormatter ←── ExecutorChain
↑
TimeoutWatcher
RetryHandler
FallbackResolver
每个步骤都可以单独替换实现。这是后面一切可观测性、热更新、A/B 测试的基础。
先从最外层的入口说起。
从 MCP 请求到执行上下文
Part-02 的 McpToolBridge 已经处理了 callHandler 的回调。但那段代码我写得很糙——直接把 request.arguments() 塞进 ToolExecutionContext 就完事了。生产环境里不能这么做。
MCP 协议层返回的 arguments 是一个 Map<String, Object>,但 Object 的实际类型是 JSON 反序列化后的产物——数字是 Integer 或 Double,嵌套对象是 Map<String, Object>,数组是 List<Object>。没有任何类型保证。
所以 ArgParser 的第一件事不是解析参数,而是校验参数的完整性:
@Component
public class ArgParser {
public ParsedArgs parse(Map<String, Object> rawArgs, JsonSchema schema) {
Map<String, Object> parsed = new LinkedHashMap<>();
List<ValidationError> errors = new ArrayList<>();
// 1. 检查必需参数
for (String required : schema.getRequired()) {
if (rawArgs == null || !rawArgs.containsKey(required)) {
errors.add(new ValidationError(required, "missing",
"Required parameter '" + required + "' is missing"));
continue;
}
Object value = rawArgs.get(required);
if (value == null || (value instanceof String && ((String) value).isBlank())) {
errors.add(new ValidationError(required, "null_or_empty",
"Required parameter '" + required + "' is null or empty"));
}
}
if (!errors.isEmpty()) {
throw new ArgumentValidationException(errors);
}
// 2. 按 schema 定义的类型逐个转换
for (Map.Entry<String, SchemaProperty> entry : schema.getProperties().entrySet()) {
String name = entry.getKey();
SchemaProperty prop = entry.getValue();
Object rawValue = rawArgs != null ? rawArgs.get(name) : null;
if (rawValue == null) {
// 有默认值就用默认值
if (prop.getDefaultValue() != null) {
parsed.put(name, prop.getDefaultValue());
}
continue;
}
try {
Object coerced = coerce(rawValue, prop.getType());
parsed.put(name, coerced);
} catch (TypeCoercionException e) {
errors.add(new ValidationError(name, "type_mismatch", e.getMessage()));
}
}
if (!errors.isEmpty()) {
throw new ArgumentValidationException(errors);
}
return new ParsedArgs(parsed);
}
@SuppressWarnings("unchecked")
private Object coerce(Object value, String targetType) {
if (value == null) return null;
return switch (targetType) {
case "string" -> {
if (value instanceof String s) yield s;
if (value instanceof Number n) yield n.toString();
if (value instanceof Boolean b) yield b.toString();
throw new TypeCoercionException("Cannot coerce " +
value.getClass().getSimpleName() + " to string");
}
case "integer" -> {
if (value instanceof Integer i) yield i;
if (value instanceof Long l) {
if (l > Integer.MAX_VALUE || l < Integer.MIN_VALUE) {
throw new TypeCoercionException("Long value " + l + " out of int range");
}
yield l.intValue();
}
if (value instanceof Double d) {
int i = d.intValue();
if (d != i) {
throw new TypeCoercionException("Cannot coerce float " + d + " to integer");
}
yield i;
}
if (value instanceof String s) {
try {
yield Integer.parseInt(s.trim());
} catch (NumberFormatException e) {
throw new TypeCoercionException("Cannot coerce string '" + s + "' to integer");
}
}
throw new TypeCoercionException("Cannot coerce " +
value.getClass().getSimpleName() + " to integer");
}
case "number" -> {
if (value instanceof Number n) yield n;
if (value instanceof String s) {
try {
yield Double.parseDouble(s.trim());
} catch (NumberFormatException e) {
throw new TypeCoercionException("Cannot coerce string '" + s + "' to number");
}
}
throw new TypeCoercionException("Cannot coerce " +
value.getClass().getSimpleName() + " to number");
}
case "boolean" -> {
if (value instanceof Boolean b) yield b;
if (value instanceof String s) {
if ("true".equalsIgnoreCase(s)) yield true;
if ("false".equalsIgnoreCase(s)) yield false;
throw new TypeCoercionException("Cannot coerce string '" + s + "' to boolean");
}
throw new TypeCoercionException("Cannot coerce " +
value.getClass().getSimpleName() + " to boolean");
}
case "array" -> {
if (value instanceof List<?>) yield value;
throw new TypeCoercionException("Cannot coerce " +
value.getClass().getSimpleName() + " to array");
}
case "object" -> {
if (value instanceof Map<?, ?>) yield value;
throw new TypeCoercionException("Cannot coerce " +
value.getClass().getSimpleName() + " to object");
}
default ->
throw new TypeCoercionException("Unknown target type: " + targetType);
};
}
}
这段代码看起来啰嗦。每个类型转换都要显式处理,JSON 反序列化出来什么类型、目标类型是什么——都必须逐个映射。我用 switch 表达式处理了六种基础类型和两种复合类型。string → integer 这种跨类型的转换在后面接推理模型时特别常见——Claude 喜欢传数字不加引号,GPT 习惯传字符串,混合用的时候你这里不做兜底,模型就炸了。
ParsedArgs 是一个简单的包装类,提供类型安全的 getter:
public class ParsedArgs {
private final Map<String, Object> values;
ParsedArgs(Map<String, Object> values) {
this.values = values;
}
@SuppressWarnings("unchecked")
public <T> T get(String name) {
return (T) values.get(name);
}
public <T> T getOrDefault(String name, T defaultValue) {
return (T) values.getOrDefault(name, defaultValue);
}
public String getString(String name) {
Object v = values.get(name);
return v != null ? v.toString() : null;
}
public Integer getInt(String name) {
Object v = values.get(name);
if (v instanceof Integer i) return i;
if (v instanceof Long l) return l.intValue();
if (v instanceof Number n) return n.intValue();
return null;
}
public boolean has(String name) {
return values.containsKey(name);
}
public Map<String, Object> toMap() {
return values;
}
}
ExecutionChain:拦截器编排的完整形态
Part-02 的 InterceptorChain 做得太简单了——before → execute → after 三步走完。但生产级的执行链不止三个钩子。
实际中你需要的是一个责任链模式,每个拦截器都能决定"放行"还是"短路":
public class ExecutionChain {
private final List<ToolInterceptor> interceptors;
private final ToolExecutor executor;
private int currentIndex = 0;
public ExecutionChain(List<ToolInterceptor> interceptors, ToolExecutor executor) {
this.interceptors = interceptors;
this.executor = executor;
}
/**
* 执行拦截器链,最终到达实际的 executor。
* 每个拦截器可以:
* - 什么都不做,直接 chain.proceed() 放行
* - 修改 ctx 后再 proceed()
* - 不调用 proceed(),直接返回结果(短路)
* - 抛出异常中断链路
*/
public ToolExecutionResult proceed(ToolExecutionContext ctx) throws Exception {
if (currentIndex < interceptors.size()) {
ToolInterceptor interceptor = interceptors.get(currentIndex);
currentIndex++;
return interceptor.intercept(ctx, this);
}
// 所有拦截器放行,执行实际的 tool
Object result = executor.execute(ctx);
return ToolExecutionResult.success(
ctx.getToolName(), result, ctx);
}
}
拦截器接口也要变。之前的 beforeExecute/afterExecute/onError 三个方法虽然直观,但不够灵活——拦截器不能中途短路,不能绕过后置逻辑。改成单一 intercept 方法,用 chain.proceed() 控制流程:
@FunctionalInterface
public interface ToolInterceptor {
ToolExecutionResult intercept(ToolExecutionContext ctx, ExecutionChain chain)
throws Exception;
}
这个改动看似简单,但差别很大。以前拦截器只能"在 executor 执行前/后插一脚",现在拦截器可以完全接管执行流程。举个例子:缓存拦截器。
@Component
public class CacheInterceptor implements ToolInterceptor {
private final Cache<String, ToolExecutionResult> cache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
@Override
public ToolExecutionResult intercept(ToolExecutionContext ctx, ExecutionChain chain)
throws Exception {
// 只缓存幂等的 GET 类 tool
if (!isIdempotent(ctx.getToolName())) {
return chain.proceed(ctx);
}
String cacheKey = buildCacheKey(ctx);
ToolExecutionResult cached = cache.getIfPresent(cacheKey);
if (cached != null) {
return cached.withMetadata("from_cache", "true");
}
ToolExecutionResult result = chain.proceed(ctx);
if (result.isSuccess()) {
cache.put(cacheKey, result);
}
return result;
}
private String buildCacheKey(ToolExecutionContext ctx) {
// toolName + 参数排序后作为 key
return ctx.getToolName() + ":" + new TreeMap<>(ctx.getArgs());
}
private boolean isIdempotent(String toolName) {
return !toolName.startsWith("create_")
&& !toolName.startsWith("delete_")
&& !toolName.startsWith("update_");
}
}
CacheInterceptor 没有调用 chain.proceed() 就返回了结果——这就是短路。拦截器链后面的所有逻辑(rate limit、logging、metrics)都不执行了。如果这是一次幂等查询,这个行为是对的。但如果这个 tool 其实是"查完缓存就够了的查询",被跳过的 rate limit 反而没问题——缓存命中本来就不应该算一次配额。
但这里有一个陷阱:拦截器顺序是确定的,写在代码中的。 如果你注册的顺序是 [rateLimit, cache, logging],cache 在前面短路了,rateLimit 根本没机会执行。你需要想清楚哪些拦截器应该无条件执行,哪些可以被跳过。
我的做法是把拦截器分成两个阶段:
public class ExecutionChain {
private final List<ToolInterceptor> mandatoryInterceptors; // 无条件执行
private final List<ToolInterceptor> optionalInterceptors; // 可被短路
private final ToolExecutor executor;
private int mandatoryIndex = 0;
private int optionalIndex = 0;
private boolean executorReached = false;
public ToolExecutionResult proceed(ToolExecutionContext ctx) throws Exception {
// 第一阶段:强制拦截器(审计、tracing)
if (mandatoryIndex < mandatoryInterceptors.size()) {
ToolInterceptor interceptor = mandatoryInterceptors.get(mandatoryIndex);
mandatoryIndex++;
return interceptor.intercept(ctx, this);
}
// 第二阶段:可选拦截器(缓存、rate limit)
if (optionalIndex < optionalInterceptors.size()) {
ToolInterceptor interceptor = optionalInterceptors.get(optionalIndex);
optionalIndex++;
return interceptor.intercept(ctx, this);
}
// 执行
executorReached = true;
Object result = executor.execute(ctx);
return ToolExecutionResult.success(ctx.getToolName(), result, ctx);
}
public boolean isExecutorReached() {
return executorReached;
}
}
这样审计和 tracing 插桩永远不会被短路。缓存命中了,metrics 照样录入了。rate limit 超了,日志里照常能看到调用的 trace。
超时控制:CompletableFuture 是唯一正确的路
Java 里做超时看起来选择很多,但多数是假的。
Future.get(timeout, TimeUnit) 可以设超时,但如果 worker 线程还在后台跑,超时后的线程并不会被中断——它会在你不知情的情况下继续消耗线程池资源。Thread.interrupt() 不保证停下 IO 操作。在 tool 调用这种场景下,executor 可能是 HTTP 请求、数据库查询、甚至是另一个 MCP Server 的调用,超时后不真正停止执行,线程池很快就满了。
正确的做法是 CompletableFuture.orTimeout()——Java 9 引入的 API,配合独立的线程池隔离:
@Component
public class TimeoutExecutor {
private final ThreadPoolExecutor toolThreadPool;
private final ThreadPoolExecutor timeoutWatcherPool;
public TimeoutExecutor(
@Value("${tool.executor.core-pool:16}") int corePool,
@Value("${tool.executor.max-pool:64}") int maxPool,
@Value("${tool.executor.queue-capacity:256}") int queueCapacity) {
this.toolThreadPool = new ThreadPoolExecutor(
corePool, maxPool,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new ThreadFactoryBuilder().setNameFormat("tool-worker-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 独立的小线程池用于超时调度,不跟业务线程抢资源
this.timeoutWatcherPool = new ThreadPoolExecutor(
2, 4,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("tool-timeout-%d").build()
);
}
public ToolExecutionResult executeWithTimeout(
ToolDefinition def,
ToolExecutionContext ctx,
Duration timeout) {
CompletableFuture<ToolExecutionResult> future = CompletableFuture
.supplyAsync(() -> {
try {
ExecutionChain chain = buildChain(def);
return chain.proceed(ctx);
} catch (Exception e) {
throw new CompletionException(e);
}
}, toolThreadPool);
try {
return future
.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.get();
} catch (TimeoutException e) {
// 关键:超时后主动取消,中断 worker 线程
future.cancel(true);
return ToolExecutionResult.failure(
def.getName(),
new ToolTimeoutException("Tool '" + def.getName()
+ "' timed out after " + timeout.toMillis() + "ms"),
ctx);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
return ToolExecutionResult.failure(
def.getName(),
cause instanceof Exception ex ? ex : new RuntimeException(cause),
ctx);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ToolExecutionResult.failure(
def.getName(), e, ctx);
}
}
private ExecutionChain buildChain(ToolDefinition def) {
return new ExecutionChain(
def.getMandatoryInterceptors(),
def.getOptionalInterceptors(),
def.getExecutor());
}
}
future.cancel(true) 会中断执行线程。注意这里有个前提——你的 executor 实现必须对中断有响应。如果 executor 里是一个 Thread.sleep() 或者 InputStream.read(),中断信号是有效的。但如果 executor 里是一个不会检查中断标志位的纯 CPU 计算,cancel(true) 杀不死它。这种情况下你需要业务层面的超时感知——在 executor 里主动检查中断状态。
超时值怎么取?不要全局写死一个值。
ToolMetadata 里已经预留了 timeout 字段。我的做法是三层兜底:
Duration resolveTimeout(ToolDefinition def, ToolExecutionContext ctx) {
// 1. tool 级别的超时(精度最高)
if (def.getMetadata().getTimeout() != null) {
return def.getMetadata().getTimeout();
}
// 2. 根据 tool 名称模式推断
if (def.getName().startsWith("search_")) {
return Duration.ofSeconds(10);
}
if (def.getName().startsWith("create_")) {
return Duration.ofSeconds(30);
}
// 3. 全局默认
return Duration.ofSeconds(15);
}
重试策略:什么该重试,什么不该
tool 调用失败后要不要重试,这个决策比你想的复杂。很多人直接用 Spring Retry 的 @Retryable 一贴了事,结果该重试的没重试、不该重试的反复执行。
我的重试策略基于错误类型分类:
public class RetryPolicy {
private final int maxAttempts;
private final Duration baseDelay;
private final double backoffMultiplier;
private final Predicate<Throwable> retryablePredicate;
private RetryPolicy(Builder builder) {
this.maxAttempts = builder.maxAttempts;
this.baseDelay = builder.baseDelay;
this.backoffMultiplier = builder.backoffMultiplier;
this.retryablePredicate = builder.retryablePredicate;
}
public boolean shouldRetry(int attempt, Throwable error) {
if (attempt >= maxAttempts) return false;
return retryablePredicate.test(error);
}
public Duration nextDelay(int attempt) {
long delay = (long) (baseDelay.toMillis() * Math.pow(backoffMultiplier, attempt));
return Duration.ofMillis(Math.min(delay, 30_000)); // 上限 30s
}
// Builder 略
// 预置策略
public static RetryPolicy networkRetry() {
return RetryPolicy.builder()
.maxAttempts(3)
.baseDelay(Duration.ofMillis(500))
.backoffMultiplier(2.0)
.retryablePredicate(error ->
error instanceof SocketTimeoutException
|| error instanceof ConnectException
|| error instanceof IOException)
.build();
}
public static RetryPolicy idempotentSafeRetry() {
return RetryPolicy.builder()
.maxAttempts(2)
.baseDelay(Duration.ofMillis(200))
.backoffMultiplier(1.5)
.retryablePredicate(error -> true) // 幂等操作可以无脑重试
.build();
}
public static RetryPolicy noRetry() {
return RetryPolicy.builder()
.maxAttempts(1)
.baseDelay(Duration.ZERO)
.backoffMultiplier(1.0)
.retryablePredicate(error -> false)
.build();
}
}
真正重要的是那个 retryablePredicate。我的分类原则只有一条:
把 tool 失败分为可重试和不可重试两类,分类标准看 tool 的行为,而不是看异常类型。
get_weather超时 → 可重试(幂等查询)create_order超时 → 不可自动重试(可能已经创建成功,没等到返回)search_hotels返回 500 → 可重试(服务端错误)send_email返回 500 → 不可自动重试(邮件可能已经被发送了,但响应丢了)
所以 retry policy 应该由 tool 自己声明,而不是全局统一:
ToolDefinition.builder()
.name("send_email")
.executor(ctx -> emailService.send(ctx.getArg("to"), ctx.getArg("body")))
.retryPolicy(RetryPolicy.noRetry())
.build();
ToolDefinition.builder()
.name("search_hotels")
.executor(ctx -> hotelService.search(ctx.getArg("city"), ...))
.retryPolicy(RetryPolicy.networkRetry())
.build();
ToolDefinition.Builder 加上 retryPolicy 字段,ToolMetadata 里也可以有一个兜底的默认值。
有了分类和策略,重试的执行器长这样:
@Component
public class RetryExecutor {
private static final Logger log = LoggerFactory.getLogger(RetryExecutor.class);
public ToolExecutionResult executeWithRetry(
ToolDefinition def,
ToolExecutionContext ctx,
TimeoutExecutor timeoutExecutor) {
RetryPolicy policy = def.getRetryPolicy();
Duration timeout = resolveTimeout(def, ctx);
Throwable lastError = null;
for (int attempt = 1; attempt <= policy.getMaxAttempts(); attempt++) {
try {
if (attempt > 1) {
Duration delay = policy.nextDelay(attempt - 1);
log.warn("Retrying tool {} (attempt {}/{}, delay={})",
def.getName(), attempt, policy.getMaxAttempts(), delay);
Thread.sleep(delay.toMillis());
}
return timeoutExecutor.executeWithTimeout(def, ctx, timeout);
} catch (ToolTimeoutException e) {
// 超时:如果是幂等操作,可以重试
if (policy.shouldRetry(attempt, e)) {
lastError = e;
continue;
}
return ToolExecutionResult.failure(def.getName(), e, ctx);
} catch (Exception e) {
if (policy.shouldRetry(attempt, e)) {
lastError = e;
continue;
}
return ToolExecutionResult.failure(def.getName(), e, ctx);
}
}
return ToolExecutionResult.failure(def.getName(),
new ToolExecutionException("All " + policy.getMaxAttempts()
+ " attempts failed", lastError), ctx);
}
}
兜底降级:失败也要有尊严
所有重试都失败了怎么办?直接抛异常是最简单的做法,但不一定是最好的。
LLM 拿到一个 error result 后,行为不可控——有的模型会重新尝试,有的会跟用户道歉,有的会自己编一个结果。更好的做法是给 LLM 一个"优雅降级后的数据",让它至少能继续推理。
兜底策略有三个层次:
@FunctionalInterface
public interface FallbackStrategy {
Optional<ToolExecutionResult> fallback(ToolDefinition def,
ToolExecutionContext ctx,
Throwable error);
}
@Component
public class FallbackManager {
private final List<FallbackStrategy> strategies = List.of(
new CacheFallback(),
new StubFallback(),
new GracefulErrorFallback()
);
public ToolExecutionResult resolve(ToolDefinition def,
ToolExecutionContext ctx,
Throwable error) {
for (FallbackStrategy strategy : strategies) {
Optional<ToolExecutionResult> result = strategy.fallback(def, ctx, error);
if (result.isPresent()) {
return result.get();
}
}
// 最终保底:返回格式化的错误消息
return GracefulErrorFallback.INSTANCE.fallback(def, ctx, error)
.orElseThrow(() -> new IllegalStateException("No fallback available"));
}
}
第一层:缓存降级。 如果之前成功执行过同参数的调用,返回上次的结果。加一个 stale 标记。
public class CacheFallback implements FallbackStrategy {
private final Cache<String, ToolExecutionResult> resultCache;
public CacheFallback() {
this.resultCache = Caffeine.newBuilder()
.maximumSize(5000)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
}
@Override
public Optional<ToolExecutionResult> fallback(
ToolDefinition def, ToolExecutionContext ctx, Throwable error) {
String cacheKey = buildKey(def, ctx);
ToolExecutionResult cached = resultCache.getIfPresent(cacheKey);
if (cached != null) {
return Optional.of(cached.withMetadata("fallback", "stale_cache"));
}
return Optional.empty();
}
// 这个方法给 CacheInterceptor 调用,记录正常执行的结果
public void recordSuccess(ToolDefinition def, ToolExecutionContext ctx,
ToolExecutionResult result) {
if (result.isSuccess()) {
resultCache.put(buildKey(def, ctx), result);
}
}
private String buildKey(ToolDefinition def, ToolExecutionContext ctx) {
return def.getName() + ":" + new TreeMap<>(ctx.getArgs());
}
}
第二层:桩数据降级。 有些 tool 在失败时可以返回"我知道这可能是错的,但至少不是空"的数据。
比如 search_hotels 接口挂了,可以返回一些热门酒店的桩数据,加上一个明确的 disclaimer。LLM 拿到这些数据后会怎么做?取决于模型。但至少比直接抛一个 HTTP 500 好。
public class StubFallback implements FallbackStrategy {
@Override
public Optional<ToolExecutionResult> fallback(
ToolDefinition def, ToolExecutionContext ctx, Throwable error) {
// 只有显式标注了 stub 的 tool 才启用
String stubData = def.getMetadata().getStubData();
if (stubData == null) {
return Optional.empty();
}
ToolExecutionResult result = ToolExecutionResult.success(
def.getName(), stubData, ctx);
return Optional.of(result.withMetadata("fallback", "stub_data"));
}
}
tool 声明时带上桩数据:
ToolDefinition.builder()
.name("search_hotels")
.metadata(ToolMetadata.builder()
.stubData("{\"hotels\":[{\"name\":\"示例酒店A\",\"price\":599}," +
"{\"name\":\"示例酒店B\",\"price\":799}]," +
"\"note\":\"此为降级数据,非实时查询结果\"}")
.build())
.build();
第三层:优雅错误消息。 以上两层都没命中时,构造一个对 LLM 友好的错误消息。不是简单的 "Error: timeout",而是告诉 LLM 发生了什么、可以做什么补救:
public class GracefulErrorFallback implements FallbackStrategy {
public static final GracefulErrorFallback INSTANCE = new GracefulErrorFallback();
@Override
public Optional<ToolExecutionResult> fallback(
ToolDefinition def, ToolExecutionContext ctx, Throwable error) {
String message = String.format(
"{\"error\":\"%s\",\"tool\":\"%s\",\"message\":\"%s\",\"suggestion\":\"%s\"}",
classifyError(error),
def.getName(),
escapeJson(error.getMessage()),
suggestAction(error)
);
ToolExecutionResult result = ToolExecutionResult.success(
def.getName(), message, ctx);
result.addMetadata("fallback", "graceful_error");
return Optional.of(result);
}
private String classifyError(Throwable error) {
if (error instanceof ToolTimeoutException) return "timeout";
if (error instanceof ArgumentValidationException) return "invalid_arguments";
if (error instanceof ToolNotFoundException) return "tool_not_found";
if (error instanceof SocketTimeoutException || error instanceof ConnectException) {
return "network_error";
}
return "internal_error";
}
private String suggestAction(Throwable error) {
if (error instanceof ToolTimeoutException) {
return "请稍后重试,或简化查询条件";
}
if (error instanceof ArgumentValidationException) {
return "请检查参数格式和类型后重试";
}
return "系统暂时不可用,请稍后再试";
}
private String escapeJson(String s) {
if (s == null) return "";
return s.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}
}
这样拿到的结果结构是:
{
"error": "timeout",
"tool": "get_weather",
"message": "Tool 'get_weather' timed out after 10000ms",
"suggestion": "请稍后重试,或简化查询条件"
}
LLM 读到这个 JSON,至少知道是超时而不是"查不到数据"。大多数模型会根据 suggestion 字段决定下一步——延迟重试、询问用户、或者跳过这个 tool 继续推理。
组装:Engine 完整形态
把上面所有部件拼起来,就是 Execution Engine 的入口类:
@Component
public class ToolExecutionEngine {
private final ToolRegistry registry;
private final ArgParser argParser;
private final RetryExecutor retryExecutor;
private final TimeoutExecutor timeoutExecutor;
private final FallbackManager fallbackManager;
public ToolExecutionEngine(ToolRegistry registry,
ArgParser argParser,
RetryExecutor retryExecutor,
TimeoutExecutor timeoutExecutor,
FallbackManager fallbackManager) {
this.registry = registry;
this.argParser = argParser;
this.retryExecutor = retryExecutor;
this.timeoutExecutor = timeoutExecutor;
this.fallbackManager = fallbackManager;
}
public ToolExecutionResult execute(String toolName,
Map<String, Object> rawArgs,
String sessionId,
String traceId) {
// 1. 查找 tool
ToolDefinition def = registry.getRequired(toolName);
// 2. 构建上下文
ToolExecutionContext ctx = ToolExecutionContext.builder()
.toolName(def.getName())
.sessionId(sessionId)
.traceId(traceId)
.callId(UUID.randomUUID().toString())
.args(rawArgs)
.build();
// 3. 参数解析与校验
try {
ParsedArgs parsed = argParser.parse(rawArgs, def.getInputSchema());
ctx.setParsedArgs(parsed);
} catch (ArgumentValidationException e) {
// 参数校验失败不走重试,直接 fallback
return fallbackManager.resolve(def, ctx, e);
}
// 4. 执行(含重试)
try {
return retryExecutor.executeWithRetry(def, ctx, timeoutExecutor);
} catch (Exception e) {
// 5. 所有重试失败,走兜底降级
return fallbackManager.resolve(def, ctx, e);
}
}
}
然后在 McpToolBridge 里替换原来简单的执行逻辑:
private McpServerFeatures.SyncToolSpecification buildSpecification(ToolDefinition def) {
McpSchema.Tool mcpTool = McpSchema.Tool.builder()
.name(def.getName())
.description(def.getDescription())
.inputSchema(def.getInputSchema().toMap())
.build();
return McpServerFeatures.SyncToolSpecification.builder()
.tool(mcpTool)
.callHandler((exchange, request) -> {
ToolExecutionResult result = executionEngine.execute(
def.getName(),
request.arguments() != null ? request.arguments() : Map.of(),
exchange.sessionId(),
UUID.randomUUID().toString()
);
String content = result.getResult() != null
? result.getResult().toString()
: "";
return McpSchema.CallToolResult.builder()
.content(List.of(new McpSchema.TextContent(content)))
.isError(result.isError())
.build();
})
.build();
}
一条链路走完,算一笔账
从 LLM 的 JSON-RPC 请求到结果返回给 LLM,不要觉得"不就是调个方法吗"。把这一层做厚,收益在后面积累——Part-07 的可观测性插桩会直接挂在这条链路上,Part-06 的状态管理也会复用这里的分发逻辑。
这条链路上几个容易被低估的细节:
参数校验不做,模型会教你做人。 LLM 输出 JSON 的格式因模型而异——Grok 喜欢不带引号的数字,Claude 会在 string 字段里传空字符串,Qwen 有时塞进去一个多余的嵌套。ArgParser 的隐式类型转换是我踩坑踩出来的妥协。
超时别用 Future.get(timeout)。 不做 cancel 的超时是假的。超时后线程还在后台跑,请求量一上来线程池就满了,然后连锁反应——心跳超时、TCP 连接池枯竭、整个 Runtime 无响应。
重试谁执行谁决策。 全局统一的重试策略是错的。幂等查询可以自动重试,非幂等的写操作重试等于帮用户下单三次。让 tool 自己决定是否可重试,engine 只做执行者。
降级比失败好,但降级要有标记。 缓存降级返回的数据打上 fallback: stale_cache 标记,LLM 可以决定是否信任。没有标记的自动降级等于在模型面前说谎。
接下来
这一篇讲了 Execution Engine 的核心链路——参数解析、类型校验与转换、拦截器链的两种阶段划分、超时控制的正确姿势、基于 tool 语义的重试策略、以及三层兜底降级。
下一篇讲 Structured Output 兼容层。推理模型在工具调用上的特殊性会让你重新审视这里的每一个设计——尤其是当模型输出的参数格式跟你预期的完全不一样的时候。
好,先写到这。
评论
发表评论