基于 Rust + Axum + Tokio 实现的多 Agent 通信网关,作为 Orchestrator(高阶编排器)与 Remote Bot(Worker)之间的中间人桥接层。
生产级多 Agent 通信基础设施,开箱即用
完整实现 A2A JSON-RPC 2.0 协议,兼容标准 Agent-to-Agent 通信规范,支持 tasks/send、tasks/cancel 等标准方法。
Orchestrator 通过 Server-Sent Events 实时接收任务进度流,无需轮询,支持 queued / progress / completed / failed 事件。
Remote Bot 主动出站连接 Gateway,无需公网地址,穿透内网防火墙,全双工双向消息通道。
自动维护 Agent Card,动态聚合所有在线 Remote Bot 的技能,支持 /.well-known/agent.json 标准发现端点。
基于技能 ID + 最低负载的路由算法,自动选择最优 Remote Bot,支持 max_concurrency 并发限制。
Gateway 每 10 秒下发 x-agent/ping,Remote Bot 必须回复 pong。30 秒无响应自动摘除,60 秒任务超时自动失败。
不同租户的 Agent 和任务完全隔离,JWT 中携带 namespace 字段,只能访问同 namespace 下的 Remote Bot。
Remote Bot 接入时验证客户端证书,防止未授权 Agent 接入,支持自签 CA 证书链。
Orchestrator 接入时验证 Bearer Token,支持租户身份识别,JWT Payload 携带 sub / namespace / exp 字段。
三层架构:Orchestrator ↔ Gateway ↔ Remote Bot
高阶编排器
HTTP / SSE
:8443
中间人桥接层
注册表 + 路由
任务管理
Worker 节点
主动出站连接
:8080
Orchestrator (高阶编排器)
│
│ HTTP POST /tasks ← 创建任务
│ GET /tasks/:id/stream ← SSE 订阅进度
│ GET /.well-known/agent.json ← 能力发现
▼
┌─────────────────────────────────────────────┐
│ Agent Gateway :8443 │
│ │
│ Registry (DashMap) + TaskStore (DashMap) │
│ ┌──────────────┐ ┌──────────────────┐ │
│ │ AgentHandle │ │ TaskEntry │ │
│ │ - id │ │ - TaskRecord │ │
│ │ - card │ │ - event_tx │ │
│ │ - tx (mpsc) │ │ - broadcast │ │
│ │ - load │ └──────────────────┘ │
│ └──────────────┘ │
│ background_monitor (10s ticker) │
└─────────────────────────────────────────────┘
│
│ WebSocket ws://:8080/agent/register
│ A2A JSON-RPC 2.0 (双向)
▼
Remote Bot (Nanobot Worker)
│ x-agent/register → 注册 Agent Card
│ tasks/status → 上报进度/结果
│ x-agent/pong → 心跳响应
A2A JSON-RPC 2.0 双向通信协议,也可通过 GET /.well-known/skill.md 在线获取
# 普通模式 ws://127.0.0.1:8080/agent/register # mTLS 模式(需携带客户端证书) wss://127.0.0.1:8080/agent/register
| 方法 | 说明 | 必须 |
|---|---|---|
x-agent/register | 注册 Agent Card,连接后第一条消息 | ✅ |
tasks/status | 上报任务进度(running / completed / failed) | ✅ |
x-agent/pong / response | 心跳响应,收到 ping 后必须回复 | ✅ |
{
"jsonrpc": "2.0",
"method": "x-agent/register",
"params": {
"card": {
"name": "nanobot-text-001",
"version": "1.0.0",
"skills": [
{ "id": "summarize", "name": "Text Summarization" },
{ "id": "translate", "name": "Translation" }
],
"max_concurrency": 3
},
"namespace": "production"
}
}
{
"jsonrpc": "2.0",
"method": "tasks/status",
"params": {
"id": "t-a1b2c3d4-...",
"status": "completed",
"artifact": { "text": "摘要结果..." }
}
}
| 方法 | 说明 |
|---|---|
tasks/send | 分派任务,携带 skill + instruction + constraints |
tasks/cancel | 取消任务 |
x-agent/ping | 心跳探活,每 10 秒一次,id 固定为 "ping" |
{
"jsonrpc": "2.0",
"method": "tasks/send",
"params": {
"id": "t-a1b2c3d4-...",
"message": {
"role": "user",
"parts": [{ "type": "text", "text": "请摘要:..." }]
},
"metadata": { "skill": "summarize", "constraints": {} }
},
"id": "t-a1b2c3d4-..."
}
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... // JWT Payload { "sub": "tenant-001", "namespace": "production", "exp": 1700000000, "iss": "gateway-auth" }
curl -X POST http://127.0.0.1:8443/tasks \ -H "Content-Type: application/json" \ -d '{"skill":"summarize","instruction":"请摘要:AI正在改变世界..."}' # 响应 { "task_id": "t-xxxx", "status": "queued", "assigned_agent": "nanobot-text-001" }
curl -N http://127.0.0.1:8443/tasks/t-xxxx/stream # 输出 event: queued data: {"event":"queued","task_id":"t-xxxx","agent_id":"nanobot-text-001"} event: progress data: {"event":"progress","task_id":"t-xxxx","pct":50,"message":"processing..."} event: completed data: {"event":"completed","task_id":"t-xxxx","artifact":{"text":"摘要结果..."}}
{
"jsonrpc": "2.0",
"method": "tasks/send",
"params": { "skill": "translate", "instruction": "Translate: 今天天气真好" },
"id": "req-001"
}
| SSE 事件 | 触发时机 | data 字段 |
|---|---|---|
queued | 任务已入队,等待 Agent 处理 | task_id, agent_id |
running | Agent 开始处理 | task_id |
progress | 进度更新(0-100) | task_id, pct, message |
completed | 任务完成,SSE 流关闭 | task_id, artifact |
failed | 任务失败,SSE 流关闭 | task_id, error |
canceled | 任务被取消 | task_id |
两个监听端口,分别服务 Orchestrator 和 Remote Bot
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /.well-known/agent.json |
Agent Card 能力发现,动态聚合所有在线 Remote Bot 的 skills |
| GET | /.well-known/skill.md |
协议文档(Markdown 格式) |
| POST | /tasks |
创建任务,返回 task_id + assigned_agent |
| GET | /tasks/:task_id |
查询任务状态和结果 |
| GET | /tasks/:task_id/stream |
SSE 订阅任务进度流(text/event-stream) |
| POST | /jsonrpc |
JSON-RPC 2.0 统一入口(tasks/send, tasks/get, agents/list) |
| GET | /agents |
列出所有在线 Remote Bot 及其负载 |
| GET | /health |
健康检查 |
| 方法 | 路径 | 说明 |
|---|---|---|
| WS | /agent/register |
WebSocket 升级,Remote Bot 注册并建立长连接 |
| GET | /health |
健康检查 |
5 分钟跑通完整的 Orchestrator → Gateway → Remote Bot 链路
默认端口:8443(Orchestrator 接入)+ 8080(Remote Bot 注册)
cargo build --release cargo run --release
内置 mock_nanobot 示例,支持 summarize 和 translate 技能,模拟进度上报
cargo run --example mock_nanobot -- \ ws://127.0.0.1:8080/agent/register nanobot-text-001
curl -s -X POST http://127.0.0.1:8443/tasks \ -H "Content-Type: application/json" \ -d '{"skill":"summarize","instruction":"请摘要:AI正在改变世界"}' | jq . # 输出 { "task_id": "t-xxxx", "status": "queued", "assigned_agent": "nanobot-text-001" }
使用上一步返回的 task_id,-N 禁用输出缓冲
curl -N http://127.0.0.1:8443/tasks/t-xxxx/stream
event: queued
data: {"event":"queued","agent_id":"nanobot-text-001"}
event: progress
data: {"event":"progress","pct":30,"message":"processing..."}
event: completed
data: {"event":"completed","artifact":{"text":"[mock summary]..."}}
curl http://127.0.0.1:8443/agents | jq . curl http://127.0.0.1:8443/.well-known/agent.json | jq .skills
清晰的分层设计,每个模块职责单一
入口,启动两个 server + 后台监控 + mTLS 服务
配置加载(含 mTLS / OAuth2 配置),支持环境变量和 config.toml
认证模块:OAuth2 中间件 + mTLS 配置构建
Agent 注册表 + Agent Card + Namespace 隔离,DashMap 线程安全
任务存储 + 事件广播(broadcast channel),SSE 推送基础
AppState 共享状态,Arc 包装 Registry + TaskStore
路由策略(LeastLoad / Affinity / RoundRobin),预留扩展
错误类型 + IntoResponse,统一错误处理
Remote Bot WebSocket handler,支持 namespace 注册
Orchestrator HTTP/SSE/JSON-RPC handler,支持 JWT 鉴权
health、agent_card、skill_md 公共端点
Mock Remote Bot,用于本地测试,模拟进度上报和心跳响应