在现代 AI 架构中,Model Context Protocol (MCP) 逐渐成为连接模型与外部上下文的标准。然而,MCP 经常使用 SSE (Server-Sent Events) 进行长连接通信,这给传统的抓包分析带来了挑战:请求与响应在时间线上高度碎片化,且 SSE 的流式特性导致数据包难以实时聚合。
本文将详细介绍如何使用 Rust 开发一款高性能的 MCP 流量分析工具,实现 Request 与 Response 的逻辑关联以及流式数据的深度解析。
1. 核心挑战与设计思路
在处理 MCP PCAP 流量时,我们面临三个核心问题:
- **TCP 粘包与碎包**:一个 HTTP 头部可能跨越多个 TCP 包,或者多个 HTTP 响应粘在一个包里。
- **SSE 聚合**:SSE 响应没有明确的 `Content-Length`(或者是持续的长连接),需要逻辑识别事件边界。
- **双向流关联**:必须将客户端发出的 `POST` 请求与其后来自服务器的异步 `SSE` 事件在逻辑上绑定。
为此,我们引入了 “对等 FlowKey” 和 “显式状态机” 的设计。
2. 核心架构设计
2.1 双向流匹配:FlowKey
为了让 A 到 B 的请求和 B 到 A 的响应能找到同一个“账本”,我们定义了一个无视方向的 `FlowKey`:
1
2
3
4
5
6
7
8
9
10
11
12
|
impl FlowKey {
fn new(src: IpAddr, src_port: u16, dst: IpAddr, dst_port: u16) -> Self {
// 始终让地址/端口较小的一方排在前面,确保双向流量 Key 一致
let (a, pa, b, pb) = if (src, src_port) < (dst, dst_port) {
(src, src_port, dst, dst_port)
} else {
(dst, dst_port, src, src_port)
};
FlowKey { addr_a: a, port_a: pa, addr_b: b, port_b: pb }
}
}
|
2.2 严谨的状态机隔离
程序通过 `TransactionState` 追踪每个 HTTP 事务的生命周期,确保数据不会“张冠李戴”。
1
2
3
4
5
| enum TransactionState {
RequestBody, // 正在收集请求体(如 POST 的 JSON)
ResponseHeader, // 正在等待响应头
ResponseBody, // 正在收集响应体或 SSE 事件
}
|
3. 代码实现详解
3.1 状态转移逻辑
这是程序最核心的部分。在 `process_stream` 函数中,我们使用一个 `loop` 配合 `match` 来消费缓冲区数据:
- **解析请求头**:使用 `httparse` 库。如果解析成功,提取 `Content-Length` 并转入 `RequestBody` 状态。
- **消费 Body**:
1
2
3
4
| let remaining = tx.expected_req_len - tx.req_body.len();
let take = std::cmp::min(remaining, stream.data.len());
tx.req_body.extend_from_slice(&stream.data[..take]);
consumed = take;
|
这种**精确计数消费**的方式保证了即使 Body 后面紧跟响应包,也不会误读数据。
3.2 SSE 的特殊处理
当检测到 `Content-Type: text/event-stream` 时,程序进入流式模式:
1
2
3
4
5
6
7
8
9
| if tx.is_sse {
let body_part = String::from_utf8_lossy(&stream.data).to_string();
for event in body_part.split("\n\n") {
if !event.trim().is_empty() {
tx.res_body_events.push(event.trim().to_string());
}
}
consumed = stream.data.len();
}
|
SSE 消息以 `\n\n` 分隔。我们将每个事件存入 `res_body_events` 列表,直到下一个请求到来时统一 Flush 输出。
3.3 美化输出格式化
为了提高可读性,程序对 JSON 进行了自动识别与缩进:
1
2
3
4
5
6
7
8
9
10
11
| fn pretty_print_json(raw: &str, indent: &str) {
// 自动剥离 SSE 的 "data: " 前缀
let clean = raw.strip_prefix("data: ").unwrap_or(raw).trim();
if let Ok(json) = serde_json::from_str::<serde_json::Value>(clean) {
if let Ok(pretty) = serde_json::to_string_pretty(&json) {
for line in pretty.lines() { println!("{}{}", indent, line); }
return;
}
}
println!("{}{}", indent, raw);
}
|
4. 运行与场景示例
场景:分析 MCP 会话
当你运行程序并输入一个 PCAP 文件时:
- 程序首先捕获到 `GET /sse`,记录响应头。
- 随后捕获到多个 TCP 包,里面装载着 `: ping`。
- 接着客户端发送 `POST /messages`。
- 程序会将 `/messages` 的请求内容与其结果即时打印。
输出预览:
1
2
3
4
5
6
7
8
9
| ▶ REQUEST: POST /messages
Host: api.mcp.com
[Request Body]
{ "method": "list_tools", "params": {} }
◀ RESPONSE: 200 OK
Content-Type: application/json
[Response Body]
{ "tools": [...] }
|
5. 跨平台编译
为了方便在不同环境中部署,工具支持交叉编译。推荐使用 Docker 容器方案:
1
2
| # 编译为 Linux x86_64
cross build --target x86_64-unknown-linux-gnu --release
|
源代码
github
结语
通过 Rust 的所有权模型和强类型枚举,我们构建了一个既能保证内存安全,又能精确处理字节级流转的分析工具。这对于排查 MCP 接口的认证问题、数据一致性问题具有极大的辅助价值。
希望这篇博文能帮助你理解该工具的设计精髓!如果你有关于 `pcap` 异步处理或更深层的 `HTTP/2` 解析需求,欢迎交流。