在现代 AI 架构中,Model Context Protocol (MCP) 逐渐成为连接模型与外部上下文的标准。然而,MCP 经常使用 SSE (Server-Sent Events) 进行长连接通信,这给传统的抓包分析带来了挑战:请求与响应在时间线上高度碎片化,且 SSE 的流式特性导致数据包难以实时聚合。

本文将详细介绍如何使用 Rust 开发一款高性能的 MCP 流量分析工具,实现 Request 与 Response 的逻辑关联以及流式数据的深度解析。


1. 核心挑战与设计思路

在处理 MCP PCAP 流量时,我们面临三个核心问题:

  1. **TCP 粘包与碎包**:一个 HTTP 头部可能跨越多个 TCP 包,或者多个 HTTP 响应粘在一个包里。
  2. **SSE 聚合**:SSE 响应没有明确的 `Content-Length`(或者是持续的长连接),需要逻辑识别事件边界。
  3. **双向流关联**:必须将客户端发出的 `POST` 请求与其后来自服务器的异步 `SSE` 事件在逻辑上绑定。

为此,我们引入了 “对等 FlowKey”“显式状态机” 的设计。


2. 核心架构设计

2.1 双向流匹配:FlowKey

为了让 A 到 B 的请求和 B 到 A 的响应能找到同一个“账本”,我们定义了一个无视方向的 `FlowKey`:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

impl FlowKey {
    fn new(src: IpAddr, src_port: u16, dst: IpAddr, dst_port: u16) -> Self {
        // 始终让地址/端口较小的一方排在前面,确保双向流量 Key 一致
        let (a, pa, b, pb) = if (src, src_port) < (dst, dst_port) {
            (src, src_port, dst, dst_port)
        } else {
            (dst, dst_port, src, src_port)
        };
        FlowKey { addr_a: a, port_a: pa, addr_b: b, port_b: pb }
    }
}

2.2 严谨的状态机隔离

程序通过 `TransactionState` 追踪每个 HTTP 事务的生命周期,确保数据不会“张冠李戴”。

1
2
3
4
5
enum TransactionState {
    RequestBody,    // 正在收集请求体(如 POST  JSON
    ResponseHeader, // 正在等待响应头
    ResponseBody,   // 正在收集响应体或 SSE 事件
}

3. 代码实现详解

3.1 状态转移逻辑

这是程序最核心的部分。在 `process_stream` 函数中,我们使用一个 `loop` 配合 `match` 来消费缓冲区数据:

  1. **解析请求头**:使用 `httparse` 库。如果解析成功,提取 `Content-Length` 并转入 `RequestBody` 状态。
  2. **消费 Body**:
1
2
3
4
let remaining = tx.expected_req_len - tx.req_body.len();
let take = std::cmp::min(remaining, stream.data.len());
tx.req_body.extend_from_slice(&stream.data[..take]);
consumed = take;

这种**精确计数消费**的方式保证了即使 Body 后面紧跟响应包,也不会误读数据。

3.2 SSE 的特殊处理

当检测到 `Content-Type: text/event-stream` 时,程序进入流式模式:

1
2
3
4
5
6
7
8
9
if tx.is_sse {
    let body_part = String::from_utf8_lossy(&stream.data).to_string();
    for event in body_part.split("\n\n") {
        if !event.trim().is_empty() {
            tx.res_body_events.push(event.trim().to_string());
        }
    }
    consumed = stream.data.len();
}

SSE 消息以 `\n\n` 分隔。我们将每个事件存入 `res_body_events` 列表,直到下一个请求到来时统一 Flush 输出。

3.3 美化输出格式化

为了提高可读性,程序对 JSON 进行了自动识别与缩进:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
fn pretty_print_json(raw: &str, indent: &str) {
    // 自动剥离 SSE 的 "data: " 前缀
    let clean = raw.strip_prefix("data: ").unwrap_or(raw).trim();
    if let Ok(json) = serde_json::from_str::<serde_json::Value>(clean) {
        if let Ok(pretty) = serde_json::to_string_pretty(&json) {
            for line in pretty.lines() { println!("{}{}", indent, line); }
            return;
        }
    }
    println!("{}{}", indent, raw);
}

4. 运行与场景示例

场景:分析 MCP 会话

当你运行程序并输入一个 PCAP 文件时:

  1. 程序首先捕获到 `GET /sse`,记录响应头。
  2. 随后捕获到多个 TCP 包,里面装载着 `: ping`。
  3. 接着客户端发送 `POST /messages`。
  4. 程序会将 `/messages` 的请求内容与其结果即时打印。

输出预览:

1
2
3
4
5
6
7
8
9
 REQUEST: POST /messages
  Host: api.mcp.com
  [Request Body]
    { "method": "list_tools", "params": {} }

 RESPONSE: 200 OK
  Content-Type: application/json
  [Response Body]
    { "tools": [...] }

5. 跨平台编译

为了方便在不同环境中部署,工具支持交叉编译。推荐使用 Docker 容器方案:

1
2
# 编译为 Linux x86_64
cross build --target x86_64-unknown-linux-gnu --release

源代码

github


结语

通过 Rust 的所有权模型和强类型枚举,我们构建了一个既能保证内存安全,又能精确处理字节级流转的分析工具。这对于排查 MCP 接口的认证问题、数据一致性问题具有极大的辅助价值。


希望这篇博文能帮助你理解该工具的设计精髓!如果你有关于 `pcap` 异步处理或更深层的 `HTTP/2` 解析需求,欢迎交流。