logo蛋烘糕.

不写博客的工程师不是好的产品经理

AI 技术演进与核心算法实战 | 第二十四篇:通信与共识机制:Agent 间的消息协议、共享黑板(Blackboard)与冲突仲裁算法

Cover Image for AI 技术演进与核心算法实战 | 第二十四篇:通信与共识机制:Agent 间的消息协议、共享黑板(Blackboard)与冲突仲裁算法
蛋烘糕
蛋烘糕

独木难成林,单弦不成音。当多个智能体开始对话,真正的群体智慧才刚刚萌芽。

在上一篇中,我们探讨了三种主流的多 Agent 协作架构:Hierarchical(层级式)、Sequential(流水线式)和 Joint(联合式)。但无论采用哪种架构,一个核心问题始终存在:Agent 之间如何高效、可靠地通信?当意见不一致时,如何达成共识?

这就像人类社会中的团队协作:即使明确了分工,如果沟通渠道不畅、信息传递失真,或者成员间产生分歧无法调和,整个团队依然会陷入混乱。

本篇是 《AI 技术演进与核心算法实战》第五模块的第二篇,我们将深入多 Agent 系统的"神经系统"——通信与共识机制。从底层的消息协议设计,到经典的共享黑板(Blackboard)架构,再到冲突仲裁算法,带你全面掌握构建可扩展群体智能系统的核心技术。


1. 多 Agent 通信的挑战:为什么不能简单"聊天"?

1.1 从人类协作到机器协作

想象一下,如果你要组织一个五人团队完成一份复杂的商业计划书,你会面临哪些沟通挑战?

  1. 信息过载:每个人都在群里发消息,重要信息被淹没在闲聊中。
  2. 语义歧义:"尽快完成"对 A 来说是今天下班前,对 B 来说可能是本周内。
  3. 状态不同步:A 已经修改了文档第三部分,但 B 还在基于旧版本工作。
  4. 责任推诿:任务出问题时,每个人都声称"我以为别人会做"。

这些问题在多 Agent 系统中同样存在,甚至更加严重,因为:

  • LLM 的输出具有不确定性:同样的输入可能产生不同的输出。
  • 上下文窗口限制:Agent 无法记住所有历史消息。
  • 执行延迟差异:某些 Agent 需要调用外部工具(如数据库查询),响应时间可能长达数秒。

1.2 多 Agent 通信的四大核心需求

基于上述挑战,一个健壮的多 Agent 通信系统必须满足以下需求:

需求维度 说明 典型问题
结构化 消息必须有明确的格式和字段 自由文本难以解析,容易丢失关键信息
可追溯 每条消息应有唯一 ID 和时间戳 无法定位问题源头,调试困难
异步性 支持非阻塞的消息传递 同步等待会导致系统吞吐量骤降
容错性 处理消息丢失、重复、乱序 网络波动或 Agent 崩溃时的数据一致性
💡 真实案例:某电商公司的多 Agent 客服系统故障

2024 年,某头部电商平台部署了一套由 8 个 Agent 组成的智能客服系统:

  • 意图识别 Agent:判断用户问题类型
  • 订单查询 Agent:检索订单状态
  • 物流跟踪 Agent:获取物流信息
  • 退款处理 Agent:处理退货申请

故障现象:用户反馈"明明说好了退款,第二天却收到发货通知"。

根因分析

  1. 退款处理 Agent 生成了退款确认消息,但由于网络抖动,消息队列出现了重复投递。
  2. 订单查询 Agent 收到了两条相同的退款消息,但没有去重机制,执行了两次退款操作。
  3. 物流跟踪 Agent 在退款完成后仍监听到"订单有效"的缓存状态,触发了发货流程。

教训:缺乏消息幂等性设计全局状态同步机制,导致业务逻辑混乱。


2. 消息协议设计:Agent 间的"通用语言"

2.1 从自然语言到结构化消息

早期的多 Agent 系统尝试让 Agent 直接用自然语言对话(就像两个人在微信聊天)。但这种做法很快暴露出致命缺陷:

# ❌ 错误的做法:直接传递自然语言
message = "我觉得这个方案有问题,你觉得呢?"
# Agent B 收到后需要理解:
# - "这个方案"指代什么?
# - "有问题"具体是什么问题?
# - 期望的回复格式是什么?

现代多 Agent 框架(如 LangGraph、AutoGen、CrewAI)都采用了结构化的消息协议,将对话内容封装为包含元数据的标准对象。

2.2 通用消息协议的核心字段

一个完善的 Agent 消息协议通常包含以下字段:

from typing import Optional, List, Dict, Any
from datetime import datetime
import uuid

class AgentMessage:
    """
    标准化的 Agent 消息协议
    
    设计原则:
    1. 自包含(Self-contained):消息携带足够的上下文,无需依赖外部状态
    2. 可扩展(Extensible):通过 metadata 字段支持自定义属性
    3. 可序列化(Serializable):支持 JSON 序列化,便于网络传输和持久化
    """
    
    def __init__(
        self,
        sender: str,              # 发送者 ID
        receiver: str,            # 接收者 ID("*" 表示广播)
        message_type: str,        # 消息类型:REQUEST, RESPONSE, EVENT, ERROR
        content: str,             # 消息主体内容
        message_id: str = None,   # 唯一消息 ID
        timestamp: datetime = None,
        correlation_id: str = None,  # 关联 ID,用于追踪请求-响应链
        metadata: Dict[str, Any] = None,  # 扩展元数据
        attachments: List[Dict] = None    # 附件(如文件路径、图片 URL)
    ):
        self.message_id = message_id or str(uuid.uuid4())
        self.sender = sender
        self.receiver = receiver
        self.message_type = message_type
        self.content = content
        self.timestamp = timestamp or datetime.now()
        self.correlation_id = correlation_id or self.message_id
        self.metadata = metadata or {}
        self.attachments = attachments or []
    
    def to_dict(self) -> Dict[str, Any]:
        """序列化为字典(可用于 JSON 编码)"""
        return {
            "message_id": self.message_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "message_type": self.message_type,
            "content": self.content,
            "timestamp": self.timestamp.isoformat(),
            "correlation_id": self.correlation_id,
            "metadata": self.metadata,
            "attachments": self.attachments
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'AgentMessage':
        """从字典反序列化"""
        return cls(
            sender=data["sender"],
            receiver=data["receiver"],
            message_type=data["message_type"],
            content=data["content"],
            message_id=data.get("message_id"),
            timestamp=datetime.fromisoformat(data["timestamp"]),
            correlation_id=data.get("correlation_id"),
            metadata=data.get("metadata", {}),
            attachments=data.get("attachments", [])
        )

2.3 消息类型的最佳实践

在实际工程中,我们通常定义以下几种消息类型:

Agent 消息类型流转图
<!-- REQUEST -->
<rect x="50" y="60" width="140" height="80" class="box-request" />
<text x="120" y="85" class="text" font-weight="bold">REQUEST</text>
<text x="120" y="105" class="text" font-size="12">请求消息</text>
<text x="120" y="125" class="text" font-size="11" fill="#64748b">携带任务参数</text>

<!-- Arrow to PROCESSING -->
<path d="M 190 100 L 270 100" class="arrow" />
<text x="230" y="90" class="text" font-size="11" fill="#64748b">接收处理</text>

<!-- PROCESSING (Internal) -->
<ellipse cx="330" cy="100" rx="60" ry="30" fill="#e2e8f0" stroke="#64748b" stroke-width="2" />
<text x="330" y="100" class="text">Processing</text>

<!-- Arrow to RESPONSE -->
<path d="M 390 100 L 470 100" class="arrow" />
<text x="430" y="90" class="text" font-size="11" fill="#64748b">返回结果</text>

<!-- RESPONSE -->
<rect x="470" y="60" width="140" height="80" class="box-response" />
<text x="540" y="85" class="text" font-weight="bold">RESPONSE</text>
<text x="540" y="105" class="text" font-size="12">响应消息</text>
<text x="540" y="125" class="text" font-size="11" fill="#64748b">携带执行结果</text>

<!-- EVENT (Broadcast) -->
<rect x="50" y="200" width="140" height="80" class="box-event" />
<text x="120" y="225" class="text" font-weight="bold">EVENT</text>
<text x="120" y="245" class="text" font-size="12">事件消息</text>
<text x="120" y="265" class="text" font-size="11" fill="#64748b">广播通知</text>

<!-- Arrow to multiple receivers -->
<path d="M 190 220 L 270 180" class="arrow" />
<path d="M 190 240 L 270 240" class="arrow" />
<path d="M 190 260 L 270 300" class="arrow" />

<!-- Multiple Receivers -->
<rect x="270" y="160" width="100" height="40" class="box" />
<text x="320" y="180" class="text" font-size="12">Agent A</text>

<rect x="270" y="220" width="100" height="40" class="box" />
<text x="320" y="240" class="text" font-size="12">Agent B</text>

<rect x="270" y="280" width="100" height="40" class="box" />
<text x="320" y="300" class="text" font-size="12">Agent C</text>

<!-- ERROR -->
<rect x="470" y="200" width="140" height="80" class="box-error" />
<text x="540" y="225" class="text" font-weight="bold">ERROR</text>
<text x="540" y="245" class="text" font-size="12">错误消息</text>
<text x="540" y="265" class="text" font-size="11" fill="#64748b">携带异常信息</text>

<!-- Error handling loop -->
<path d="M 540 280 Q 540 340 400 340 Q 260 340 260 280" class="arrow" stroke-dasharray="6,4" />
<text x="400" y="355" class="text" font-size="11" fill="#64748b">触发重试/回滚</text>

(1)REQUEST(请求消息)

用于向其他 Agent 发起任务请求,必须包含:

  • 任务描述:清晰的任务目标
  • 输入参数:结构化数据(如 JSON)
  • 期望输出格式:明确返回值结构
  • 超时时间:避免无限等待
# 示例:数据分析 Agent 向数据库 Agent 发起查询请求
request_msg = AgentMessage(
    sender="data_analyst_agent",
    receiver="database_agent",
    message_type="REQUEST",
    content="查询过去 30 天的销售数据",
    metadata={
        "task_id": "task_12345",
        "query_params": {
            "start_date": "2026-03-06",
            "end_date": "2026-04-05",
            "metrics": ["revenue", "order_count"]
        },
        "timeout_seconds": 30,
        "priority": "high"
    }
)

(2)RESPONSE(响应消息)

用于返回任务执行结果,必须包含:

  • 执行状态:SUCCESS / PARTIAL_SUCCESS / FAILED
  • 结果数据:结构化输出
  • 错误信息(如果失败):错误码和详细描述
# 示例:数据库 Agent 返回查询结果
response_msg = AgentMessage(
    sender="database_agent",
    receiver="data_analyst_agent",
    message_type="RESPONSE",
    content="查询成功,返回 1523 条记录",
    correlation_id="task_12345",  # 关联原始请求
    metadata={
        "status": "SUCCESS",
        "result": {
            "total_revenue": 1250000.50,
            "order_count": 1523,
            "avg_order_value": 820.75
        },
        "execution_time_ms": 2340
    }
)

(3)EVENT(事件消息)

用于广播系统状态变化,接收者可选择性订阅:

# 示例:订单状态变更事件
event_msg = AgentMessage(
    sender="order_agent",
    receiver="*",  # 广播给所有订阅者
    message_type="EVENT",
    content="订单 #ORD-2026-0405-001 已发货",
    metadata={
        "event_type": "ORDER_SHIPPED",
        "order_id": "ORD-2026-0405-001",
        "tracking_number": "SF1234567890",
        "timestamp": "2026-04-05T14:30:00Z"
    }
)

(4)ERROR(错误消息)

用于报告异常情况,触发重试或降级策略:

# 示例:API 调用失败
error_msg = AgentMessage(
    sender="payment_agent",
    receiver="orchestrator_agent",
    message_type="ERROR",
    content="支付网关超时",
    correlation_id="payment_req_789",
    metadata={
        "error_code": "GATEWAY_TIMEOUT",
        "error_details": "第三方支付 API 在 30s 内未响应",
        "retry_count": 2,
        "max_retries": 3,
        "suggested_action": "retry_with_backoff"
    }
)

2.4 消息路由策略

有了标准化的消息协议,接下来需要解决如何将消息准确送达目标 Agent。常见的路由策略包括:

路由策略 适用场景 优点 缺点
点对点(Point-to-Point) 明确知道接收者 简单高效,低延迟 耦合度高,需维护 Agent 注册表
发布-订阅(Pub/Sub) 一对多广播 解耦,易于扩展 可能导致消息风暴
基于内容的路由(Content-Based) 动态选择接收者 灵活,智能路由 实现复杂,性能开销大
负载均衡路由 多个相同角色的 Agent 自动分配负载 需维护会话粘性

在现代多 Agent 框架中,通常采用混合路由策略:对于任务分配使用点对点路由,对于状态通知使用 Pub/Sub,对于异常处理使用基于内容的路由。


3. 共享黑板(Blackboard)架构:去中心化的协作模式

3.1 什么是 Blackboard 架构?

共享黑板(Blackboard)架构是一种经典的多 Agent 协作模式,灵感来源于人类团队协作时在白板上共享信息的场景。

在这个架构中:

  • 黑板(Blackboard):一个全局共享的数据空间,存储任务的中间状态、知识和结果。
  • 知识源(Knowledge Sources, KS):多个专业化的 Agent,它们监控黑板上的变化,并在适当时机贡献自己的专业知识。
  • 控制组件(Controller):协调 Agent 的执行顺序,解决冲突(可选,有些实现是完全去中心化的)。
共享黑板(Blackboard)架构
<!-- Central Blackboard -->
<rect x="200" y="60" width="300" height="180" class="blackboard" />
<text x="350" y="90" class="text" font-weight="bold" font-size="18">共享黑板</text>
<text x="350" y="115" class="text" font-size="13">• 问题分解状态</text>
<text x="350" y="140" class="text" font-size="13">• 中间结果</text>
<text x="350" y="165" class="text" font-size="13">• 最终解决方案</text>
<text x="350" y="190" class="text" font-size="13">• 约束条件</text>

<!-- Agent 1 - Top Left -->
<rect x="50" y="280" width="120" height="60" class="agent-box" />
<text x="110" y="300" class="text" font-weight="bold">规划 Agent</text>
<text x="110" y="320" class="text" font-size="11">分解任务</text>
<path d="M 130 280 L 220 230" class="arrow" />
<text x="160" y="260" class="text" font-size="10" fill="#64748b">写入计划</text>

<!-- Agent 2 - Top Right -->
<rect x="250" y="280" width="120" height="60" class="agent-box" />
<text x="310" y="300" class="text" font-weight="bold">执行 Agent</text>
<text x="310" y="320" class="text" font-size="11">执行子任务</text>
<path d="M 310 280 L 310 240" class="arrow" />
<text x="325" y="260" class="text" font-size="10" fill="#64748b">更新进度</text>

<!-- Agent 3 - Bottom Left -->
<rect x="450" y="280" width="120" height="60" class="agent-box" />
<text x="510" y="300" class="text" font-weight="bold">验证 Agent</text>
<text x="510" y="320" class="text" font-size="11">检查结果</text>
<path d="M 490 280 L 460 230" class="arrow" />
<text x="470" y="260" class="text" font-size="10" fill="#64748b">标记错误</text>

<!-- Agent 4 - Bottom Right -->
<rect x="530" y="370" width="120" height="60" class="agent-box" />
<text x="590" y="390" class="text" font-weight="bold">整合 Agent</text>
<text x="590" y="410" class="text" font-size="11">汇总结果</text>
<path d="M 570 370 L 480 230" class="arrow" />
<text x="530" y="310" class="text" font-size="10" fill="#64748b">读取全部</text>

<!-- Bidirectional arrows showing monitoring -->
<path d="M 170 310 Q 190 340 210 310" class="dashed-arrow" />
<path d="M 370 310 Q 390 340 410 310" class="dashed-arrow" />
<text x="290" y="350" class="text" font-size="11" fill="#64748b">持续监控黑板变化</text>

3.2 Blackboard 架构的工作流程

以一个智能旅行规划系统为例,展示 Blackboard 架构的实际运作:

场景:用户要求"规划一个为期 5 天的东京旅行,预算 2 万元,喜欢美食和文化体验"。

阶段 1:问题初始化

# 主控 Agent 将用户需求写入黑板
blackboard = {
    "task_id": "travel_plan_001",
    "status": "INITIALIZED",
    "user_requirements": {
        "destination": "Tokyo",
        "duration_days": 5,
        "budget_cny": 20000,
        "preferences": ["food", "culture"]
    },
    "subtasks": [],  # 待分解
    "partial_results": {},  # 各 Agent 的中间结果
    "final_plan": None
}

阶段 2:任务分解(规划 Agent 介入)

规划 Agent 监控到黑板上有新的 INITIALIZED 任务,立即激活:

# 规划 Agent 读取需求,分解任务
def planning_agent(blackboard):
    requirements = blackboard["user_requirements"]
    
    # 基于规则或 LLM 推理,分解为子任务
    subtasks = [
        {"id": "flight_booking", "description": "查询往返机票", "priority": 1},
        {"id": "hotel_search", "description": "搜索住宿", "priority": 1},
        {"id": "attraction_planning", "description": "规划景点行程", "priority": 2},
        {"id": "restaurant_recommendation", "description": "推荐餐厅", "priority": 2},
        {"id": "budget_allocation", "description": "预算分配", "priority": 3}
    ]
    
    # 更新黑板
    blackboard["subtasks"] = subtasks
    blackboard["status"] = "PLANNED"
    return blackboard

阶段 3:并行执行(多个 Agent 同时工作)

一旦黑板状态变为 PLANNED,各个专业 Agent 被触发:

# 机票 Agent
def flight_agent(blackboard):
    if blackboard["status"] != "PLANNED":
        return  # 等待规划完成
    
    # 调用航班 API
    flights = search_flights(
        origin="Beijing",
        destination="Tokyo",
        dates=get_travel_dates(blackboard)
    )
    
    # 将结果写入黑板
    blackboard["partial_results"]["flight_booking"] = {
        "best_option": flights[0],
        "price_cny": flights[0]["price"],
        "alternatives": flights[1:3]
    }
    check_task_completion(blackboard)

# 酒店 Agent
def hotel_agent(blackboard):
    if blackboard["status"] != "PLANNED":
        return
    
    hotels = search_hotels(
        location="Tokyo",
        check_in=get_check_in_date(blackboard),
        nights=5,
        max_price_per_night=800  # 根据预算推算
    )
    
    blackboard["partial_results"]["hotel_search"] = {
        "recommended": hotels[0],
        "options": hotels[:5]
    }
    check_task_completion(blackboard)

# 景点规划 Agent
def attraction_agent(blackboard):
    if blackboard["status"] != "PLANNED":
        return
    
    # 基于用户偏好(文化体验)推荐景点
    attractions = recommend_attractions(
        city="Tokyo",
        categories=["temple", "museum", "historical_site"],
        duration_days=5
    )
    
    blackboard["partial_results"]["attraction_planning"] = {
        "daily_itinerary": attractions
    }
    check_task_completion(blackboard)

阶段 4:结果整合(整合 Agent 最后收尾)

当所有子任务完成后,整合 Agent 被触发:

def integration_agent(blackboard):
    # 检查是否所有子任务都已完成
    completed_tasks = set(blackboard["partial_results"].keys())
    required_tasks = {task["id"] for task in blackboard["subtasks"]}
    
    if completed_tasks != required_tasks:
        return  # 等待其他 Agent 完成
    
    # 整合所有结果,生成最终方案
    final_plan = {
        "overview": {
            "destination": blackboard["user_requirements"]["destination"],
            "duration": f"{blackboard['user_requirements']['duration_days']} days",
            "total_budget_cny": calculate_total_budget(blackboard)
        },
        "flights": blackboard["partial_results"]["flight_booking"],
        "accommodation": blackboard["partial_results"]["hotel_search"],
        "itinerary": blackboard["partial_results"]["attraction_planning"]["daily_itinerary"],
        "restaurants": blackboard["partial_results"]["restaurant_recommendation"]
    }
    
    blackboard["final_plan"] = final_plan
    blackboard["status"] = "COMPLETED"

3.3 Blackboard 架构的优势与挑战

优势

  1. 高度解耦:Agent 之间不需要直接通信,只需关注黑板状态变化。新增 Agent 无需修改现有代码。
  2. 灵活性:Agent 可以动态加入或退出,系统仍能正常运行。
  3. 可追溯性:黑板记录了完整的决策过程,便于调试和优化。
  4. 容错性:单个 Agent 失败不会导致整个系统崩溃,其他 Agent 可以继续工作。

挑战

  1. 竞争条件(Race Condition):多个 Agent 同时修改黑板同一字段时,可能导致数据不一致。
  2. 性能瓶颈:所有 Agent 都依赖同一个黑板,高并发时可能成为瓶颈。
  3. 无效轮询:Agent 需要不断检查黑板状态,浪费计算资源。
  4. 复杂性管理:随着 Agent 数量增加,黑板数据结构会变得极其复杂。

3.4 工程优化策略

针对上述挑战,实际工程中常采用以下优化手段:

import threading
from typing import Callable, List

class OptimizedBlackboard:
    """
    优化的黑板实现,解决竞争条件和无效轮询问题
    """
    
    def __init__(self):
        self.data = {}
        self.lock = threading.RLock()  # 可重入锁,防止死锁
        self.observers: dict[str, List[Callable]] = {}  # 观察者模式
        self.version = 0  # 版本号,用于乐观锁
    
    def update(self, key: str, value: any, expected_version: int = None):
        """
        线程安全的更新操作,支持乐观锁
        
        Args:
            key: 数据键
            value: 新值
            expected_version: 期望的版本号(None 表示不检查)
        
        Raises:
            ConflictError: 版本号不匹配,说明有其他 Agent 修改了数据
        """
        with self.lock:
            if expected_version is not None and self.version != expected_version:
                raise ConflictError(f"Version conflict: expected {expected_version}, got {self.version}")
            
            old_value = self.data.get(key)
            self.data[key] = value
            self.version += 1
            
            # 通知观察者(异步触发相关 Agent)
            if key in self.observers:
                for callback in self.observers[key]:
                    callback(key, old_value, value)
    
    def subscribe(self, key: str, callback: Callable):
        """订阅某个键的变化"""
        if key not in self.observers:
            self.observers[key] = []
        self.observers[key].append(callback)
    
    def get(self, key: str, default=None):
        """线程安全的读取操作"""
        with self.lock:
            return self.data.get(key, default)
    
    def get_version(self) -> int:
        """获取当前版本号"""
        with self.lock:
            return self.version


class ConflictError(Exception):
    """版本冲突异常"""
    pass


# 使用示例
blackboard = OptimizedBlackboard()

# Agent 订阅感兴趣的状态变化
blackboard.subscribe("status", lambda key, old, new: print(f"Status changed: {old} -> {new}"))

# 安全更新
try:
    current_version = blackboard.get_version()
    blackboard.update("status", "PLANNED", expected_version=current_version)
except ConflictError:
    # 重试逻辑
    print("Conflict detected, retrying...")

通过引入观察者模式乐观锁,我们解决了无效轮询和竞争条件问题。Agent 不再需要主动查询黑板,而是在关心的数据变化时被自动通知。


4. 冲突仲裁算法:当 Agent 意见不合时

4.1 冲突的来源

在多 Agent 系统中,冲突是不可避免的。常见的冲突类型包括:

冲突类型 示例 根本原因
资源竞争 两个 Agent 同时修改同一份文档 共享状态未加锁
目标冲突 Agent A 认为应该优先降低成本,Agent B 认为应该优先提升质量 优化目标不一致
信息矛盾 Agent A 查询到的航班价格是 5000 元,Agent B 查询到的是 5500 元 数据源不同或时效性差异
执行顺序争议 Agent A 认为应该先订酒店,Agent B 认为应该先订机票 依赖关系判断不同

4.2 经典冲突仲裁算法

(1)投票机制(Voting)

最简单的仲裁方式:少数服从多数

def voting_arbitration(agent_opinions: list[dict]) -> dict:
    """
    基于投票的冲突仲裁
    
    Args:
        agent_opinions: 每个 Agent 的意见列表
            [{"agent_id": "A", "opinion": "option_1", "confidence": 0.8}, ...]
    
    Returns:
        获胜的选项
    """
    from collections import Counter
    
    # 提取所有意见
    opinions = [item["opinion"] for item in agent_opinions]
    
    # 统计票数
    vote_counts = Counter(opinions)
    
    # 获取最高票数的选项
    winner = vote_counts.most_common(1)[0][0]
    
    return {
        "decision": winner,
        "vote_distribution": dict(vote_counts),
        "consensus_level": vote_counts[winner] / len(opinions)  # 共识度
    }


# 示例:三个 Agent 对旅行预算分配的意见
opinions = [
    {"agent_id": "budget_agent", "opinion": "accommodation_40%", "confidence": 0.7},
    {"agent_id": "experience_agent", "opinion": "activities_40%", "confidence": 0.9},
    {"agent_id": "comfort_agent", "opinion": "accommodation_40%", "confidence": 0.6}
]

result = voting_arbitration(opinions)
print(result)
# 输出: {'decision': 'accommodation_40%', 'vote_distribution': {'accommodation_40%': 2, 'activities_40%': 1}, 'consensus_level': 0.67}

局限性:简单投票忽略了 Agent 的专业性和置信度。一个领域专家的 0.9 置信度应该比普通 Agent 的 0.6 更有权重。

(2)加权投票(Weighted Voting)

改进版投票:根据 Agent 的专业度和历史准确率分配权重

def weighted_voting_arbitration(agent_opinions: list[dict], agent_weights: dict[str, float]) -> dict:
    """
    加权投票仲裁
    
    Args:
        agent_opinions: Agent 意见列表
        agent_weights: 每个 Agent 的权重(基于专业性、历史表现等)
    
    Returns:
        加权后的决策结果
    """
    from collections import defaultdict
    
    # 计算每个选项的加权得分
    weighted_scores = defaultdict(float)
    total_weight = 0
    
    for item in agent_opinions:
        agent_id = item["agent_id"]
        opinion = item["opinion"]
        confidence = item.get("confidence", 0.5)
        weight = agent_weights.get(agent_id, 1.0)
        
        # 加权得分 = 权重 × 置信度
        weighted_scores[opinion] += weight * confidence
        total_weight += weight
    
    # 归一化得分
    normalized_scores = {
        option: score / total_weight 
        for option, score in weighted_scores.items()
    }
    
    # 选择得分最高的选项
    winner = max(normalized_scores, key=normalized_scores.get)
    
    return {
        "decision": winner,
        "weighted_scores": normalized_scores,
        "confidence_gap": max(normalized_scores.values()) - sorted(normalized_scores.values())[-2] if len(normalized_scores) > 1 else 1.0
    }


# 示例:不同 Agent 的权重不同
agent_weights = {
    "budget_agent": 1.5,      # 财务专家,权重高
    "experience_agent": 1.2,   # 体验专家
    "comfort_agent": 1.0       # 普通顾问
}

result = weighted_voting_arbitration(opinions, agent_weights)
print(result)
# 输出: {'decision': 'accommodation_40%', 'weighted_scores': {...}, 'confidence_gap': 0.25}

(3)辩论式仲裁(Debate-based Arbitration)

受人类法庭辩论启发,让持不同意见的 Agent 进行多轮辩论,最终由裁判 Agent 做出裁决。

辩论式仲裁流程
<!-- Round 1 -->
<text x="350" y="60" class="round-label">第一轮:陈述观点</text>

<!-- Agent A -->
<rect x="50" y="80" width="120" height="50" class="agent-a" />
<text x="110" y="100" class="text" font-weight="bold">Agent A</text>
<text x="110" y="118" class="text" font-size="11">观点 + 论据</text>

<!-- Agent B -->
<rect x="530" y="80" width="120" height="50" class="agent-b" />
<text x="590" y="100" class="text" font-weight="bold">Agent B</text>
<text x="590" y="118" class="text" font-size="11">观点 + 论据</text>

<!-- Arrows to Judge -->
<path d="M 170 105 L 300 130" class="arrow" />
<path d="M 530 105 L 400 130" class="arrow" />

<!-- Judge -->
<rect x="280" y="130" width="140" height="50" class="judge" />
<text x="350" y="150" class="text" font-weight="bold">裁判 Agent</text>
<text x="350" y="168" class="text" font-size="11">评估论证质量</text>

<!-- Round 2 -->
<text x="350" y="210" class="round-label">第二轮:反驳与质询</text>

<!-- Agent A rebuttal -->
<rect x="50" y="230" width="120" height="50" class="agent-a" />
<text x="110" y="250" class="text" font-weight="bold">Agent A</text>
<text x="110" y="268" class="text" font-size="11">反驳对方</text>

<!-- Agent B rebuttal -->
<rect x="530" y="230" width="120" height="50" class="agent-b" />
<text x="590" y="250" class="text" font-weight="bold">Agent B</text>
<text x="590" y="268" class="text" font-size="11">反驳对方</text>

<!-- Cross-examination arrows -->
<path d="M 170 255 Q 350 220 530 255" class="arrow" stroke-dasharray="6,4" />
<path d="M 530 255 Q 350 220 170 255" class="arrow" stroke-dasharray="6,4" />

<!-- Arrows to Judge -->
<path d="M 170 255 L 300 280" class="arrow" />
<path d="M 530 255 L 400 280" class="arrow" />

<!-- Judge final decision -->
<rect x="280" y="280" width="140" height="50" class="judge" />
<text x="350" y="300" class="text" font-weight="bold">裁判 Agent</text>
<text x="350" y="318" class="text" font-size="11">综合裁决</text>

<!-- Final output -->
<path d="M 350 330 L 350 360" class="arrow" />
<rect x="280" y="360" width="140" height="35" fill="#dcfce7" stroke="#16a34a" stroke-width="2" rx="8" />
<text x="350" y="378" class="text" font-weight="bold">最终决策</text>
async def debate_arbitration(
    topic: str,
    agent_a: Agent,
    agent_b: Agent,
    judge_agent: Agent,
    max_rounds: int = 3
) -> dict:
    """
    辩论式仲裁算法
    
    Args:
        topic: 争议话题
        agent_a: 持观点 A 的 Agent
        agent_b: 持观点 B 的 Agent
        judge_agent: 裁判 Agent
        max_rounds: 最大辩论轮数
    
    Returns:
        最终裁决结果
    """
    debate_history = []
    
    # 第一轮:初始陈述
    argument_a = await agent_a.generate_argument(topic, stance="pro")
    argument_b = await agent_b.generate_argument(topic, stance="con")
    
    debate_history.append({
        "round": 1,
        "agent_a_argument": argument_a,
        "agent_b_argument": argument_b
    })
    
    # 后续轮次:反驳与质询
    for round_num in range(2, max_rounds + 1):
        # Agent A 反驳 Agent B
        rebuttal_a = await agent_a.rebut(
            opponent_argument=argument_b,
            previous_debate=debate_history
        )
        
        # Agent B 反驳 Agent A
        rebuttal_b = await agent_b.rebut(
            opponent_argument=argument_a,
            previous_debate=debate_history
        )
        
        debate_history.append({
            "round": round_num,
            "agent_a_rebuttal": rebuttal_a,
            "agent_b_rebuttal": rebuttal_b
        })
        
        # 裁判评估是否可以做出裁决
        evaluation = await judge_agent.evaluate_debate(debate_history)
        
        if evaluation["can_decide"]:
            break
        
        # 更新论点,进入下一轮
        argument_a = rebuttal_a
        argument_b = rebuttal_b
    
    # 最终裁决
    final_decision = await judge_agent.make_final_verdict(debate_history)
    
    return {
        "topic": topic,
        "debate_rounds": len(debate_history),
        "history": debate_history,
        "final_decision": final_decision,
        "reasoning": final_decision.get("reasoning", "")
    }


# 使用示例:两个 Agent 对旅行预算分配的辩论
result = await debate_arbitration(
    topic="旅行预算应该优先分配给住宿还是活动体验?",
    agent_a=budget_focused_agent,
    agent_b=experience_focused_agent,
    judge_agent=neutral_judge_agent,
    max_rounds=3
)

print(f"裁决结果: {result['final_decision']['choice']}")
print(f"裁决理由: {result['reasoning']}")

辩论式仲裁的优势

  • 深度推理:通过多轮辩论,挖掘问题的深层逻辑。
  • 透明性:完整的辩论历史可作为审计日志。
  • 鲁棒性:即使单个 Agent 出错,裁判仍能通过对比发现矛盾。

劣势

  • 计算成本高:需要多次 LLM 调用。
  • 延迟大:不适合实时性要求高的场景。

4.3 基于共识阈值的混合仲裁策略

在实际工程中,我们通常采用分层仲裁策略

def hybrid_arbitration(
    agent_opinions: list[dict],
    consensus_threshold: float = 0.8,
    agent_weights: dict = None
) -> dict:
    """
    混合仲裁策略:
    1. 首先尝试简单投票,如果共识度超过阈值,直接返回
    2. 否则使用加权投票
    3. 如果加权投票的置信度差距太小(< 0.1),启动辩论仲裁
    
    Args:
        agent_opinions: Agent 意见列表
        consensus_threshold: 共识阈值(默认 0.8)
        agent_weights: Agent 权重字典
    
    Returns:
        仲裁结果
    """
    # 第一步:简单投票
    simple_result = voting_arbitration(agent_opinions)
    
    if simple_result["consensus_level"] >= consensus_threshold:
        return {
            "method": "simple_voting",
            "decision": simple_result["decision"],
            "confidence": simple_result["consensus_level"]
        }
    
    # 第二步:加权投票
    if agent_weights:
        weighted_result = weighted_voting_arbitration(agent_opinions, agent_weights)
        
        # 如果置信度差距足够大,直接采纳
        if weighted_result["confidence_gap"] > 0.1:
            return {
                "method": "weighted_voting",
                "decision": weighted_result["decision"],
                "confidence": weighted_result["weighted_scores"][weighted_result["decision"]]
            }
    
    # 第三步:启动辩论仲裁(耗时操作)
    print("Low confidence, initiating debate arbitration...")
    debate_result = initiate_debate_arbitration(agent_opinions)
    
    return {
        "method": "debate_arbitration",
        "decision": debate_result["final_decision"]["choice"],
        "confidence": debate_result["final_decision"].get("confidence", 0.5),
        "debate_rounds": debate_result["debate_rounds"]
    }

这种混合策略在保证决策质量的同时,尽可能减少了计算开销:大多数情况下通过快速投票解决,只有真正困难的冲突才启动耗时的辩论机制


5. 实战案例:构建一个基于 Blackboard 的智能研究助手

5.1 系统架构设计

我们将构建一个多 Agent 研究助手,能够自动完成"给定研究主题 → 收集资料 → 撰写报告"的全流程。

Agent 角色定义

  • Coordinator Agent:协调整体流程,维护黑板状态
  • Research Agent:搜索学术论文和新闻
  • Analysis Agent:分析文献,提取关键观点
  • Writing Agent:撰写研究报告
  • Review Agent:审查报告质量,提出修改建议

5.2 核心代码实现

import asyncio
from enum import Enum
from typing import Optional
from datetime import datetime

class ResearchStatus(Enum):
    INITIALIZED = "initialized"
    RESEARCHING = "researching"
    ANALYZING = "analyzing"
    WRITING = "writing"
    REVIEWING = "reviewing"
    COMPLETED = "completed"
    FAILED = "failed"


class ResearchBlackboard:
    """研究任务的共享黑板"""
    
    def __init__(self, topic: str):
        self.topic = topic
        self.status = ResearchStatus.INITIALIZED
        self.sources = []  # 收集的文献来源
        self.key_findings = []  # 关键发现
        self.draft_report = None  # 报告草稿
        self.final_report = None  # 最终报告
        self.reviews = []  # 审查意见
        self.created_at = datetime.now()
        self.updated_at = datetime.now()
        self.lock = asyncio.Lock()
        self.observers = {}
    
    async def update_status(self, new_status: ResearchStatus):
        """线程安全的状态更新"""
        async with self.lock:
            old_status = self.status
            self.status = new_status
            self.updated_at = datetime.now()
            
            # 通知观察者
            if "status" in self.observers:
                for callback in self.observers["status"]:
                    await callback(old_status, new_status)
    
    def subscribe(self, field: str, callback):
        """订阅字段变化"""
        if field not in self.observers:
            self.observers[field] = []
        self.observers[field].append(callback)


class ResearchAgent:
    """研究 Agent 基类"""
    
    def __init__(self, name: str, blackboard: ResearchBlackboard):
        self.name = name
        self.blackboard = blackboard
    
    async def execute(self):
        """执行 Agent 任务(子类实现)"""
        raise NotImplementedError


class CoordinatorAgent(ResearchAgent):
    """协调器 Agent"""
    
    async def execute(self):
        print(f"[{self.name}] 启动研究任务: {self.blackboard.topic}")
        
        # 订阅状态变化
        self.blackboard.subscribe("status", self.on_status_change)
        
        # 启动其他 Agent
        research_agent = ResearchCollectorAgent("Research Collector", self.blackboard)
        analysis_agent = AnalysisAgent("Literature Analyst", self.blackboard)
        writing_agent = WritingAgent("Report Writer", self.blackboard)
        review_agent = ReviewAgent("Quality Reviewer", self.blackboard)
        
        # 按顺序执行(实际中可以并行)
        await research_agent.execute()
        await analysis_agent.execute()
        await writing_agent.execute()
        await review_agent.execute()
        
        print(f"[{self.name}] 研究任务完成!")
    
    async def on_status_change(self, old_status, new_status):
        print(f"[{self.name}] 状态变更: {old_status.value}{new_status.value}")


class ResearchCollectorAgent(ResearchAgent):
    """文献收集 Agent"""
    
    async def execute(self):
        await self.blackboard.update_status(ResearchStatus.RESEARCHING)
        
        print(f"[{self.name}] 正在搜索文献...")
        
        # 模拟文献搜索(实际中调用学术 API)
        await asyncio.sleep(2)  # 模拟 API 调用延迟
        
        # 将结果写入黑板
        async with self.blackboard.lock:
            self.blackboard.sources = [
                {
                    "title": "Attention Is All You Need",
                    "authors": ["Vaswani et al."],
                    "year": 2017,
                    "relevance_score": 0.95
                },
                {
                    "title": "BERT: Pre-training of Deep Bidirectional Transformers",
                    "authors": ["Devlin et al."],
                    "year": 2019,
                    "relevance_score": 0.88
                }
            ]
        
        print(f"[{self.name}] 找到 {len(self.blackboard.sources)} 篇相关文献")


class AnalysisAgent(ResearchAgent):
    """文献分析 Agent"""
    
    async def execute(self):
        await self.blackboard.update_status(ResearchStatus.ANALYZING)
        
        print(f"[{self.name}] 正在分析文献...")
        
        # 等待文献收集完成
        while not self.blackboard.sources:
            await asyncio.sleep(0.5)
        
        await asyncio.sleep(3)  # 模拟分析过程
        
        # 提取关键发现
        async with self.blackboard.lock:
            self.blackboard.key_findings = [
                "Transformer 架构通过自注意力机制实现了并行化处理",
                "预训练-微调范式显著提升了 NLP 任务性能",
                "大规模语料库是模型涌现能力的关键因素"
            ]
        
        print(f"[{self.name}] 提取了 {len(self.blackboard.key_findings)} 个关键发现")


class WritingAgent(ResearchAgent):
    """报告撰写 Agent"""
    
    async def execute(self):
        await self.blackboard.update_status(ResearchStatus.WRITING)
        
        print(f"[{self.name}] 正在撰写报告...")
        
        # 等待分析完成
        while not self.blackboard.key_findings:
            await asyncio.sleep(0.5)
        
        await asyncio.sleep(4)  # 模拟写作过程
        
        # 生成报告草稿
        report = f"""
# 研究报告:{self.blackboard.topic}

## 摘要
本报告基于对最新文献的综合分析,探讨了{self.blackboard.topic}的核心进展。

## 关键发现
"""
        for i, finding in enumerate(self.blackboard.key_findings, 1):
            report += f"\n{i}. {finding}"
        
        report += "\n\n## 参考文献\n"
        for source in self.blackboard.sources:
            report += f"- {source['authors'][0]} ({source['year']}). {source['title']}\n"
        
        async with self.blackboard.lock:
            self.blackboard.draft_report = report
        
        print(f"[{self.name}] 报告草稿完成({len(report)} 字符)")


class ReviewAgent(ResearchAgent):
    """质量审查 Agent"""
    
    async def execute(self):
        await self.blackboard.update_status(ResearchStatus.REVIEWING)
        
        print(f"[{self.name}] 正在审查报告...")
        
        # 等待草稿完成
        while not self.blackboard.draft_report:
            await asyncio.sleep(0.5)
        
        await asyncio.sleep(2)  # 模拟审查过程
        
        # 模拟审查意见
        review_opinion = {
            "reviewer": self.name,
            "score": 8.5,
            "comments": [
                "报告结构清晰,逻辑连贯",
                "建议增加实际应用案例",
                "参考文献格式需要统一"
            ],
            "approved": True
        }
        
        async with self.blackboard.lock:
            self.blackboard.reviews.append(review_opinion)
            self.blackboard.final_report = self.blackboard.draft_report  # 简化:直接使用草稿
            await self.blackboard.update_status(ResearchStatus.COMPLETED)
        
        print(f"[{self.name}] 审查完成,评分: {review_opinion['score']}/10")


# 主函数
async def main():
    # 创建黑板
    blackboard = ResearchBlackboard(topic="Transformer 架构在 NLP 中的应用")
    
    # 创建协调器并执行
    coordinator = CoordinatorAgent("Coordinator", blackboard)
    await coordinator.execute()
    
    # 输出最终结果
    print("\n" + "="*60)
    print("最终研究报告:")
    print("="*60)
    print(blackboard.final_report)


# 运行
if __name__ == "__main__":
    asyncio.run(main())

5.3 运行结果

[Coordinator] 启动研究任务: Transformer 架构在 NLP 中的应用
[Coordinator] 状态变更: initialized → researching
[Research Collector] 正在搜索文献...
[Research Collector] 找到 2 篇相关文献
[Coordinator] 状态变更: researching → analyzing
[Literature Analyst] 正在分析文献...
[Literature Analyst] 提取了 3 个关键发现
[Coordinator] 状态变更: analyzing → writing
[Report Writer] 正在撰写报告...
[Report Writer] 报告草稿完成(523 字符)
[Coordinator] 状态变更: writing → reviewing
[Quality Reviewer] 正在审查报告...
[Quality Reviewer] 审查完成,评分: 8.5/10
[Coordinator] 状态变更: reviewing → completed
[Coordinator] 研究任务完成!

============================================================
最终研究报告:
============================================================

# 研究报告:Transformer 架构在 NLP 中的应用

## 摘要
本报告基于对最新文献的综合分析,探讨了Transformer 架构在 NLP 中的应用的核心进展。

## 关键发现
1. Transformer 架构通过自注意力机制实现了并行化处理
2. 预训练-微调范式显著提升了 NLP 任务性能
3. 大规模语料库是模型涌现能力的关键因素

## 参考文献
- Vaswani et al. (2017). Attention Is All You Need
- Devlin et al. (2019). BERT: Pre-training of Deep Bidirectional Transformers

通过这个案例,你可以看到:

  1. Blackboard 作为中央枢纽,所有 Agent 通过读写黑板进行间接协作。
  2. 状态驱动的执行流程,每个 Agent 只在特定状态下激活。
  3. 松耦合设计,新增 Agent(如翻译 Agent)只需订阅相应状态,无需修改现有代码。

结语

通信是多 Agent 系统的血脉,共识是其灵魂。没有高效的通信,群体只是个体的集合;没有可靠的共识,协作只会沦为混乱的争吵。

在本篇中,我们从底层到上层系统性地拆解了多 Agent 通信与共识的核心技术:

  1. 消息协议设计:通过结构化的消息格式(REQUEST/RESPONSE/EVENT/ERROR),解决了自然语言通信的歧义性和不可靠性。
  2. 共享黑板架构:通过去中心化的数据共享机制,实现了 Agent 间的松耦合协作,并通过观察者模式和乐观锁优化了性能和一致性。
  3. 冲突仲裁算法:从简单投票到加权投票,再到辩论式仲裁,提供了分层的决策机制,平衡了效率与准确性。

这些技术不仅是多 Agent 系统的基石,也是分布式系统、微服务架构等领域通用的设计模式。理解了这些原理,你将能够设计出更具扩展性、更健壮的智能系统。

下一篇,我们将探索多 Agent 系统中的角色塑造与博弈机制,探讨如何通过 Prompt 工程赋予 Agent 差异化的人格,以及多 Agent 辩论如何激发超越个体智慧的群体智能。敬请期待《角色模拟与博弈:通过 Prompt 塑造差异化人格与多 Agent 辩论激发智慧》。


📚 参考文献与延伸阅读

  1. Multiagent Systems: Algorithmic, Game-Theoretic, and Logical Foundations (Yoav Shoham & Kevin Leyton-Brown, 2008) - 多 Agent 系统的经典教材,全面介绍了通信协议、协商机制和博弈论基础。
  2. The Blackboard Model of Problem Solving and the Evolution of Blackboard Architectures (H. Penny Nii, 1986) - Blackboard 架构的奠基论文,详细阐述了该模式在专家系统中的应用。
  3. LangGraph Documentation (LangChain AI) - 现代化的多 Agent 编排框架,提供了基于状态机的协作机制实现。
  4. AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation (Wu et al., 2023) - Microsoft 提出的多 Agent 对话框架,展示了结构化消息协议的实际应用。
  5. Consensus in Multi-Agent Systems: A Survey (Olfati-Saber et al., 2007) - 分布式共识算法的综述,涵盖了投票、平均一致性等经典方法。

下一篇预告: 角色模拟与博弈:通过 Prompt 塑造差异化人格与多 Agent 辩论激发智慧

博客日历
2026年04月
SuMoTuWeThFrSa
29
30
31
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
01
02
03
04
05
06
07
08
09
更多
--
--
--
--