目录

  • 什么是 MCP?
  • 架构设计
  • 核心实现
    • 1. McpClient:连接与通信
    • 2. McpRegistry:服务器管理
    • 3. OAuth 认证
    • 4. 健康坚控
  • 工具动态注册
  • 错误处理与重连
  • 实战案例
  • 总结

什么是 MCP?

MCP(Model Context Protocol)是 Anthropic 推出的开放协议,用于 AI 应用与外部工具/数据源的标准化通信。它解决了以下问题:

  • 工具碎片化:每个 AI 应用都要重新实现工具集成
  • 协议不统一:没有标准的工具调用格式
  • 扩展性差:添加新工具需要修改核心代码

MCP 提供了:

  • 标准传输层:stdio、SSE、HTTP
  • 工具发现机制:动态获取可用工具列表
  • 认证支持:OAuth 2.0 集成
  • 资源管理:统一的资源读取接口

架构设计

blade-code 的 MCP 集成采用三层架构:

graph TB
    A[Agent Runtime 调用层] --> B[McpRegistry 管理层]
    B --> C[McpClient 通信层]
    
    A1[工具调用<br/>参数验证] -.-> A
    B1[服务器注册/注销<br/>工具冲突处理<br/>状态坚控] -.-> B
    C1[连接管理<br/>协议通信<br/>错误重试<br/>OAuth 认证] -.-> C
    
    style A fill:#e1f5ff
    style B fill:#fff4e1
    style C fill:#ffe1f5

设计原则:

  1. 单一职责:每层只负责自己的核心功能
  2. 事件驱动:通过 EventEmitter 解耦组件
  3. 容错优先:网络错误自动重试,永久错误快速失败
  4. 可观测性:完整的状态机和事件日志

核心实现

1. McpClient:连接与通信

McpClient 是 MCP 集成的核心,负责与单个 MCP 服务器的通信。

1.1 传输层抽象

MCP 支持三种传输方式,blade-code 通过工厂模式统一创建:

private async createTransport(): Promise<Transport> {
  const { type, command, args, env, url, headers } = this.config;

  if (type === 'stdio') {
    // 子进程通信(本地工具)
    return new StdioClientTransport({
      command,
      args: args || [],
      env: { ...process.env, ...env },
      stderr: 'ignore', // 忽略子进程的 stderr 输出
    });
  } else if (type === 'sse') {
    // Server-Sent Events(远程服务)
    return new SSEClientTransport(new URL(url), {
      requestInit: { headers },
    });
  } else if (type === 'http') {
    // HTTP 长轮询
    const { StreamableHTTPClientTransport } = await import(
      '@modelcontextprotocol/sdk/client/streamableHttp.js'
    );
    return new StreamableHTTPClientTransport(new URL(url), {
      requestInit: { headers },
    });
  }

  throw new Error(`不支持的传输类型: ${type}`);
}

关键点:

  • stdio 适合本地工具(如文件系统、数据库)
  • sse 适合远程服务(实时推送)
  • http 适合 RESTful API

1.2 连接管理与重试

生产环境中,网络不稳定是常态。blade-code 实现了智能重试机制:

async connectWithRetry(maxRetries = 3, initialDelay = 1000): Promise<void> {
  let lastError: Error | null = null;

  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      await this.doConnect();
      this.reconnectAttempts = 0; // 重置重连计数
      return; // 成功连接
    } catch (error) {
      lastError = error as Error;
      const classified = classifyError(error);

      // 如果是永久性错误,不重试
      if (!classified.isRetryable) {
        console.error('[McpClient] 检测到永久性错误,放弃重试:', classified.type);
        throw error;
      }

      // 指数退避
      if (attempt < maxRetries) {
        const delay = initialDelay * Math.pow(2, attempt - 1);
        console.warn(`[McpClient] 连接失败(${attempt}/${maxRetries}),${delay}ms 后重试...`);
        await new Promise((resolve) => setTimeout(resolve, delay));
      }
    }
  }

  throw lastError || new Error('连接失败');
}

错误分类:

enum ErrorType {
  NETWORK_TEMPORARY = 'network_temporary', // 临时网络错误(可重试)
  NETWORK_PERMANENT = 'network_permanent', // 永久网络错误
  CONFIG_ERROR = 'config_error',           // 配置错误
  AUTH_ERROR = 'auth_error',               // 认证错误
  PROTOCOL_ERROR = 'protocol_error',       // 协议错误
  UNKNOWN = 'unknown',                     // 未知错误
}

function classifyError(error: unknown): ClassifiedError {
  const msg = error.message.toLowerCase();

  // 永久性配置错误(不应重试)
  const permanentErrors = [
    'command not found',
    'no such file',
    'permission denied',
    'invalid configuration',
  ];

  if (permanentErrors.some((permanent) => msg.includes(permanent))) {
    return { type: ErrorType.CONFIG_ERROR, isRetryable: false, originalError: error };
  }

  // 临时网络错误(可重试)
  const temporaryErrors = [
    'timeout',
    'connection refused',
    'econnreset',
    'etimedout',
    '503',
    '429',
  ];

  if (temporaryErrors.some((temporary) => msg.includes(temporary))) {
    return { type: ErrorType.NETWORK_TEMPORARY, isRetryable: true, originalError: error };
  }

  // 默认视为临时错误(保守策略:允许重试)
  return { type: ErrorType.UNKNOWN, isRetryable: true, originalError: error };
}

为什么这样设计?

  • 快速失败:配置错误立即抛出,避免无意义的重试
  • 指数退避:避免雪崩效应,给服务器恢复时间
  • 保守策略:未知错误默认可重试,提高容错性

1.3 意外断连处理

MCP 服务器可能随时断开(进程崩溃、网络中断),blade-code 通过 onclose 事件自动重连:

this.sdkClient.onclose = () => {
  this.handleUnexpectedClose();
};

private handleUnexpectedClose(): void {
  if (this.isManualDisconnect) {
    return; // 手动断开,不重连
  }

  if (this.status === McpConnectionStatus.CONNECTED) {
    console.warn('[McpClient] 检测到意外断连,准备重连...');
    this.setStatus(McpConnectionStatus.ERROR);
    this.emit('error', new Error('MCP服务器连接意外关闭'));
    this.scheduleReconnect();
  }
}

private scheduleReconnect(): void {
  if (this.reconnectAttempts >= this.MAX_RECONNECT_ATTEMPTS) {
    console.error('[McpClient] 达到最大重连次数,放弃重连');
    this.emit('reconnectFailed');
    return;
  }

  // 指数退避:1s, 2s, 4s, 8s, 16s(最大30s)
  const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
  this.reconnectAttempts++;

  this.reconnectTimer = setTimeout(async () => {
    try {
      await this.doConnect();
      console.log('[McpClient] 重连成功');
      this.reconnectAttempts = 0;
      this.emit('reconnected');
    } catch (error) {
      const classified = classifyError(error);
      if (classified.isRetryable) {
        this.scheduleReconnect(); // 继续重连
      } else {
        this.emit('reconnectFailed'); // 永久失败
      }
    }
  }, delay);
}

关键点:

  • 区分手动断开和意外断连
  • 最多重连 5 次,避免无限循环
  • 重连成功后重置计数器

2. McpRegistry:服务器管理

McpRegistry 是单例模式的注册中心,管理多个 MCP 服务器。

2.1 服务器注册

async registerServer(name: string, config: McpServerConfig): Promise<void> {
  if (this.servers.has(name)) {
    throw new Error(`MCP服务器 "${name}" 已经注册`);
  }

  const client = new McpClient(config, name, config.healthCheck);
  const serverInfo: McpServerInfo = {
    config,
    client,
    status: McpConnectionStatus.DISCONNECTED,
    tools: [],
  };

  // 设置客户端事件处理器
  this.setupClientEventHandlers(client, serverInfo, name);

  this.servers.set(name, serverInfo);
  this.emit('serverRegistered', name, serverInfo);

  try {
    await this.connectServer(name);
  } catch (error) {
    console.warn(`MCP服务器 "${name}" 连接失败:`, error);
  }
}

2.2 工具冲突处理

多个 MCP 服务器可能提供同名工具,blade-code 通过前缀解决冲突:

async getAvailableTools(): Promise<Tool[]> {
  const tools: Tool[] = [];
  const nameConflicts = new Map<string, number>();

  // 第一遍:检测冲突
  for (const [_serverName, serverInfo] of this.servers) {
    if (serverInfo.status === McpConnectionStatus.CONNECTED) {
      for (const mcpTool of serverInfo.tools) {
        const count = nameConflicts.get(mcpTool.name) || 0;
        nameConflicts.set(mcpTool.name, count + 1);
      }
    }
  }

  // 第二遍:创建工具(冲突时添加前缀)
  for (const [serverName, serverInfo] of this.servers) {
    if (serverInfo.status === McpConnectionStatus.CONNECTED) {
      for (const mcpTool of serverInfo.tools) {
        const hasConflict = (nameConflicts.get(mcpTool.name) || 0) > 1;
        const toolName = hasConflict
          ? `${serverName}__${mcpTool.name}`
          : mcpTool.name;

        const tool = createMcpTool(serverInfo.client, serverName, mcpTool, toolName);
        tools.push(tool);
      }
    }
  }

  return tools;
}

命名策略:

  • 无冲突:toolName
  • 有冲突:serverName__toolName

示例:

服务器 A: read_file
服务器 B: read_file
 最终工具: A__read_file, B__read_file

3. OAuth 认证

MCP 支持 OAuth 2.0 认证,blade-code 实现了完整的 OAuth 流程。

3.1 令牌存储

export class OAuthTokenStorage {
  private readonly tokenFilePath: string;

  constructor() {
    const homeDir = os.homedir();
    const configDir = path.join(homeDir, '.blade');
    this.tokenFilePath = path.join(configDir, 'mcp-oauth-tokens.json');
  }

  async saveToken(
    serverName: string,
    token: OAuthToken,
    clientId?: string,
    tokenUrl?: string
  ): Promise<void> {
    const credentials = await this.loadAllCredentials();

    const credential: OAuthCredentials = {
      serverName,
      token,
      clientId,
      tokenUrl,
      updatedAt: Date.now(),
    };

    credentials.set(serverName, credential);
    await this.saveAllCredentials(credentials);
  }

  isTokenExpired(token: OAuthToken): boolean {
    if (!token.expiresAt) {
      return false; // 没有过期时间,认为不过期
    }

    // 提前 5 分钟视为过期,留出刷新时间
    const buffer = 5 * 60 * 1000;
    return Date.now() >= token.expiresAt - buffer;
  }
}

安全措施:

  • 令牌文件权限设置为 0o600(仅所有者可读写)
  • 提前 5 分钟刷新令牌,避免过期
  • 支持 refresh_token 自动续期

3.2 OAuth 流程

export class OAuthProvider {
  private tokenStorage = new OAuthTokenStorage();

  async getValidToken(
    serverName: string,
    oauthConfig: OAuthConfig
  ): Promise<string | null> {
    const credentials = await this.tokenStorage.getCredentials(serverName);

    if (!credentials) {
      return null; // 没有令牌,需要认证
    }

    // 检查是否过期
    if (this.tokenStorage.isTokenExpired(credentials.token)) {
      // 尝试刷新
      if (credentials.token.refreshToken) {
        try {
          const newToken = await this.refreshToken(credentials, oauthConfig);
          await this.tokenStorage.saveToken(
            serverName,
            newToken,
            credentials.clientId,
            credentials.tokenUrl
          );
          return newToken.accessToken;
        } catch (error) {
          console.error('[OAuthProvider] 刷新令牌失败:', error);
          return null; // 刷新失败,需要重新认证
        }
      }
      return null; // 没有 refresh_token,需要重新认证
    }

    return credentials.token.accessToken;
  }

  async authenticate(
    serverName: string,
    oauthConfig: OAuthConfig
  ): Promise<OAuthToken> {
    // 1. 生成授权 URL
    const authUrl = this.buildAuthUrl(oauthConfig);
    console.log(`请访问以下 URL 进行授权:n${authUrl}`);

    // 2. 启动本地回调服务器
    const code = await this.startCallbackServer(oauthConfig.redirectUri);

    // 3. 用授权码换取令牌
    const token = await this.exchangeCodeForToken(code, oauthConfig);

    // 4. 保存令牌
    await this.tokenStorage.saveToken(
      serverName,
      token,
      oauthConfig.clientId,
      oauthConfig.tokenUrl
    );

    return token;
  }
}

流程图:

graph TD
    A[用户请求] --> B{检查令牌}
    B -->|有效| C[返回令牌]
    B -->|无效/过期| D{有 refresh_token?}
    D -->|是| E[刷新令牌]
    E --> F[返回新令牌]
    D -->|否| G[启动 OAuth 流程]
    G --> H[返回新令牌]
    
    style C fill:#90EE90
    style F fill:#90EE90
    style H fill:#90EE90

4. 健康坚控

生产环境中,MCP 服务器可能"僵死"(连接正常但不响应)。blade-code 实现了主动健康检查:

export class HealthMonitor extends EventEmitter {
  private intervalTimer: NodeJS.Timeout | null = null;
  private consecutiveFailures = 0;

  constructor(
    private client: McpClient,
    private config: HealthCheckConfig
  ) {
    super();
  }

  start(): void {
    if (this.intervalTimer) {
      return; // 已经启动
    }

    this.intervalTimer = setInterval(async () => {
      try {
        await this.performHealthCheck();
        this.consecutiveFailures = 0; // 重置失败计数
      } catch (error) {
        this.consecutiveFailures++;
        console.warn(
          `[HealthMonitor] 健康检查失败 (${this.consecutiveFailures}/${this.config.maxFailures}):`,
          error
        );

        if (this.consecutiveFailures >= this.config.maxFailures) {
          this.emit('unhealthy', this.consecutiveFailures, error);
          await this.attemptReconnect();
        }
      }
    }, this.config.intervalMs);
  }

  private async performHealthCheck(): Promise<void> {
    const timeout = this.config.timeoutMs || 5000;

    await Promise.race([
      this.client.listTools(), // 调用一个轻量级方法
      new Promise((_, reject) =>
        setTimeout(() => reject(new Error('健康检查超时')), timeout)
      ),
    ]);
  }

  private async attemptReconnect(): Promise<void> {
    console.log('[HealthMonitor] 尝试重连...');
    try {
      await this.client.disconnect();
      await this.client.connect();
      this.consecutiveFailures = 0;
      this.emit('reconnected');
    } catch (error) {
      console.error('[HealthMonitor] 重连失败:', error);
    }
  }
}

配置示例:

{
  enabled: true,
  intervalMs: 30000,    // 每 30 秒检查一次
  timeoutMs: 5000,      // 超时时间 5 秒
  maxFailures: 3        // 连续失败 3 次触发重连
}

工具动态注册

MCP 工具需要转换为 blade-code 的 Tool 接口:

export function createMcpTool(
  client: McpClient,
  serverName: string,
  mcpTool: McpToolDefinition,
  toolName?: string
): Tool {
  return {
    name: toolName || mcpTool.name,
    description: mcpTool.description || `MCP工具: ${mcpTool.name}`,
    parameters: mcpTool.inputSchema || { type: 'object', properties: {} },
    metadata: {
      source: 'mcp',
      serverName,
      originalName: mcpTool.name,
    },

    async execute(args: Record<string, unknown>): Promise<ToolResult> {
      try {
        const response = await client.callTool(mcpTool.name, args);

        return {
          success: true,
          output: formatMcpResponse(response),
          metadata: {
            serverName,
            toolName: mcpTool.name,
          },
        };
      } catch (error) {
        return {
          success: false,
          error: error instanceof Error ? error.message : String(error),
          metadata: {
            serverName,
            toolName: mcpTool.name,
          },
        };
      }
    },
  };
}

关键点:

  • 保留原始工具名(originalName)用于调试
  • 统一错误处理格式
  • 支持元数据传递

错误处理与重连

blade-code 的错误处理遵循以下原则:

1. 错误分类

临时错误(可重试):
- 网络超时
- 连接被拒绝
- 速率限制(429)
- 服务不可用(503)

永久错误(不重试):
- 配置错误(命令不存在)
- 认证失败(401)
- 权限不足(403)
- 协议错误(格式错误)

2. 重试策略

指数退避:
- 第 1 次:1 秒后重试
- 第 2 次:2 秒后重试
- 第 3 次:4 秒后重试
- 第 4 次:8 秒后重试
- 第 5 次:16 秒后重试
- 最大延迟:30

3. 状态机

stateDiagram-v2
    [*] --> DISCONNECTED
    DISCONNECTED --> CONNECTING: connect()
    CONNECTING --> CONNECTED: 成功
    CONNECTING --> ERROR: 失败
    ERROR --> CONNECTING: 重试
    CONNECTED --> ERROR: 意外断连
    CONNECTED --> DISCONNECTED: disconnect()

实战案例

案例 1:集成文件系统 MCP 服务器

// 配置文件
{
  "mcpServers": {
    "filesystem": {
      "type": "stdio",
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/workspace"],
      "env": {
        "NODE_ENV": "production"
      }
    }
  }
}

// 使用
const registry = McpRegistry.getInstance();
await registry.registerServer('filesystem', config.mcpServers.filesystem);

const tools = await registry.getAvailableTools();
// 输出: [{ name: 'read_file', ... }, { name: 'write_file', ... }]

案例 2:集成远程 API(带 OAuth)

{
  "mcpServers": {
    "github": {
      "type": "sse",
      "url": "https://api.github.com/mcp",
      "oauth": {
        "enabled": true,
        "authUrl": "https://github.com/login/oauth/authorize",
        "tokenUrl": "https://github.com/login/oauth/access_token",
        "clientId": "your-client-id",
        "clientSecret": "your-client-secret",
        "scopes": ["repo", "user"],
        "redirectUri": "http://localhost:3000/callback"
      }
    }
  }
}

// 首次使用会自动触发 OAuth 流程
await registry.registerServer('github', config.mcpServers.github);
// 输出: 请访问以下 URL 进行授权: ?...

案例 3:健康坚控与自动恢复

{
  "mcpServers": {
    "database": {
      "type": "stdio",
      "command": "mcp-database-server",
      "healthCheck": {
        "enabled": true,
        "intervalMs": 30000,
        "timeoutMs": 5000,
        "maxFailures": 3
      }
    }
  }
}

// 健康事件
registry.on('serverError', (name, error) => {
  console.error(`服务器 ${name} 出错:`, error);
});

registry.on('healthMonitorReconnected', () => {
  console.log('健康坚控触发重连成功');
});

总结

blade-code 的 MCP 集成实现了以下核心功能:

已实现

  1. 多传输层支持:stdio、SSE、HTTP
  2. 智能重试:错误分类 + 指数退避
  3. 自动重连:意外断连自动恢复
  4. OAuth 认证:完整的 OAuth 2.0 流程
  5. 健康坚控:主动检测僵死连接
  6. 工具冲突处理:自动添加前缀
  7. 事件驱动:完整的状态机和事件系统

设计亮点

  • 容错优先:网络错误不会导致整个系统崩溃
  • 可观测性:丰富的日志和事件,便于调试
  • 扩展性:新增 MCP 服务器只需修改配置文件
  • 安全性:令牌加密存储,权限最小化

相关资源:

  • blade-code GitHub
  • MCP 官方文档
  • 作者博客

讨论:欢迎在 GitHub Issues 或我的博客评论区交流!

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com