AI 技术演进与核心算法实战 | 第二十二篇:多步任务容错:自动重试、回滚机制与异常处理策略设计


在分布式系统中,失败不是异常,而是常态。真正的智能不仅体现在成功时的优雅,更体现在失败时的韧性。
在前几篇文章中,我们构建了能够自主规划(Plan-and-Solve)、自我反思(Reflexion)并编排复杂工作流(DAG)的 AI Agent。但当我们把这些 Agent 部署到真实的生产环境时,会遭遇一个残酷的现实:一切都会出错。
- LLM API 可能超时或返回 503 错误
- 外部工具(数据库、API)可能暂时不可用
- 网络抖动可能导致请求丢失
- 模型可能产生幻觉输出无效的工具调用参数
- 长时间运行的任务可能在中途被中断
如果我们的 Agent 在这些情况下直接崩溃,那它永远无法成为可靠的生产系统。容错能力(Fault Tolerance) 是区分"玩具 Demo"和"工业级系统"的分水岭。
本篇是 《AI 技术演进与核心算法实战》第四模块:行动篇 的收官之作。我们将深入探讨多步任务中的容错架构设计,涵盖:
- 智能重试策略:指数退避、抖动算法与熔断机制
- 状态管理与回滚:检查点(Checkpointing)、快照恢复与事务补偿
- 降级与兜底策略:优雅降级、备用模型切换与人机协同
- 完整的容错引擎实现:手写一个具备工业级可靠性的 Agent 执行框架
1. 为什么多步任务的容错如此困难?
在传统的单体应用中,错误处理相对简单:捕获异常 → 记录日志 → 返回错误码。但在 AI Agent 的多步执行流程中,情况变得极其复杂。
1.1 多步任务的特殊性
让我们看一个典型的复杂 Agent 任务:
# 用户请求:"分析公司 Q3 财报,生成可视化图表,撰写分析报告并发送给 CEO"
执行步骤:
Step 1: 从数据库提取 Q3 财务数据 (SQL Query)
Step 2: 数据清洗与异常值检测 (Python Code Execution)
Step 3: 调用 LLM 分析数据趋势 (LLM API Call)
Step 4: 生成可视化图表 (Matplotlib/Chart Generation)
Step 5: 撰写分析报告 (LLM API Call)
Step 6: 发送邮件给 CEO (Email API)
潜在故障点分析:
| 步骤 | 可能的失败原因 | 失败类型 | 影响范围 |
|---|---|---|---|
| Step 1 | 数据库连接超时、查询语法错误 | 临时性/永久性 | 阻塞后续所有步骤 |
| Step 2 | 代码执行沙箱资源不足、依赖缺失 | 临时性/配置错误 | 需要重新执行 Step 2 |
| Step 3 | LLM API 限流、返回格式错误 | 临时性/模型错误 | 可能需要重试或切换模型 |
| Step 4 | 内存溢出、图表库版本冲突 | 环境错误 | 需要隔离执行环境 |
| Step 5 | LLM 生成长文本超时 | 临时性 | 可分段生成或降级为摘要 |
| Step 6 | 邮件服务器不可达、收件人地址无效 | 临时性/永久错误 | 需要通知管理员 |
1.2 传统错误处理的局限性
问题 1:简单的 try-catch 无法解决状态不一致
# ❌ 错误的做法:没有状态管理
try:
data = fetch_financial_data()
analysis = llm_analyze(data)
report = generate_report(analysis)
send_email(report)
except Exception as e:
log_error(e)
return "Task failed" # 用户不知道执行到哪一步了,也无法恢复
如果 generate_report() 失败,我们已经:
- ✅ 完成了数据提取(浪费了数据库资源)
- ✅ 完成了数据分析(浪费了 LLM Token)
- ❌ 报告生成失败
此时简单的重试会重复执行所有步骤,造成资源浪费和成本飙升。
问题 2:无法区分临时性错误和永久性错误
# ❌ 盲目重试:对永久性错误无效且浪费时间
for i in range(3):
try:
result = call_llm_api(prompt)
break
except Exception:
continue # 如果是"参数格式错误"这种永久性错误,重试 100 次也没用
问题 3:缺乏优雅的降级方案
当主流程失败时,系统是直接崩溃,还是能提供部分可用的结果?例如:
- LLM 分析失败 → 能否返回原始数据让用户自行判断?
- 邮件发送失败 → 能否保存报告到本地并通知用户手动发送?
1.3 容错设计的核心原则
基于上述挑战,我们需要建立一套系统的容错架构,遵循以下原则:
- 快速失败(Fail Fast):尽早检测并拒绝明显的错误输入,避免浪费资源
- 智能重试(Smart Retry):只对临时性错误重试,并采用合理的退避策略
- 状态持久化(State Persistence):定期保存执行进度,支持断点续传
- 优雅降级(Graceful Degradation):在主流程失败时提供备选方案
- 可观测性(Observability):完整记录执行轨迹,便于调试和优化
接下来,我们将逐一实现这些策略。
2. 智能重试策略:从暴力重试到指数退避
重试是最直接的容错手段,但"如何重试"比"是否重试"更重要。
2.1 识别可重试错误
首先,我们需要建立一个错误分类体系,区分哪些错误值得重试:
from enum import Enum
from typing import Optional
class ErrorType(Enum):
TRANSIENT = "transient" # 临时性错误:网络抖动、服务过载
PERMANENT = "permanent" # 永久性错误:参数错误、权限不足
RATE_LIMIT = "rate_limit" # 限流错误:需要等待后重试
TIMEOUT = "timeout" # 超时错误:可能需要增加超时时间或重试
def classify_error(exception: Exception) -> ErrorType:
"""根据异常类型判断是否可重试"""
error_msg = str(exception).lower()
# 临时性错误(可重试)
if any(keyword in error_msg for keyword in [
"timeout", "connection reset", "service unavailable",
"503", "502", "network error"
]):
return ErrorType.TRANSIENT
# 限流错误(可重试,但需要特殊处理)
if "rate limit" in error_msg or "429" in error_msg:
return ErrorType.RATE_LIMIT
# 超时错误(可重试)
if "timed out" in error_msg:
return ErrorType.TIMEOUT
# 永久性错误(不应重试)
if any(keyword in error_msg for keyword in [
"invalid parameter", "authentication failed", "404",
"permission denied", "syntax error"
]):
return ErrorType.PERMANENT
# 默认为临时性错误(保守策略)
return ErrorType.TRANSIENT
2.2 指数退避算法(Exponential Backoff)
暴力重试(固定间隔)会导致"重试风暴",加剧服务端压力。业界标准做法是使用指数退避 + 随机抖动。
核心公式:
wait_time = min(base_delay * (2 ^ attempt_number) + random_jitter, max_delay)
代码实现:
import time
import random
from functools import wraps
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""
装饰器:为函数添加带指数退避的重试逻辑
参数:
max_retries: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
exponential_base: 指数基数(通常为 2)
jitter: 是否添加随机抖动(避免重试风暴)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
error_type = classify_error(e)
# 永久性错误不重试
if error_type == ErrorType.PERMANENT:
raise ValueError(f"Permanent error (not retryable): {e}")
# 达到最大重试次数
if attempt >= max_retries:
break
# 计算等待时间
if error_type == ErrorType.RATE_LIMIT:
# 限流错误:从响应头中提取 Retry-After
retry_after = getattr(e, 'retry_after', 5)
delay = min(retry_after, max_delay)
else:
# 指数退避
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
# 添加随机抖动(±25%)
if jitter:
jitter_range = delay * 0.25
delay += random.uniform(-jitter_range, jitter_range)
print(f"Attempt {attempt + 1}/{max_retries} failed: {e}")
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(max(0, delay))
raise Exception(f"Failed after {max_retries} retries. Last error: {last_exception}")
return wrapper
return decorator
# 使用示例
@retry_with_backoff(max_retries=3, base_delay=1.0, max_delay=30.0)
def call_llm_api(prompt: str) -> str:
"""调用 LLM API(会自动重试)"""
# 模拟 API 调用
response = make_http_request("https://api.openai.com/v1/chat/completions", ...)
return response.choices[0].message.content
2.3 熔断器模式(Circuit Breaker)
当某个服务持续失败时,继续重试只会浪费资源。熔断器会在失败率达到阈值时"跳闸",暂时停止对该服务的调用,给系统恢复的时间。
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # 正常状态:允许请求通过
OPEN = "open" # 断开状态:拒绝请求,快速失败
HALF_OPEN = "half_open" # 半开状态:允许少量试探性请求
class CircuitBreaker:
"""
熔断器实现
工作原理:
1. CLOSED 状态:正常执行,记录失败次数
2. 当失败次数超过阈值 → 切换到 OPEN 状态
3. OPEN 状态:立即拒绝请求,等待冷却时间
4. 冷却时间结束后 → 切换到 HALF_OPEN 状态
5. HALF_OPEN 状态:允许一次试探请求
- 成功 → 切换到 CLOSED(恢复正常)
- 失败 → 切换回 OPEN(继续冷却)
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.success_count_in_half_open = 0
def can_execute(self) -> bool:
"""判断是否允许执行请求"""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# 检查是否已过冷却时间
if self.last_failure_time and \
(time.time() - self.last_failure_time) > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
print("Circuit breaker: OPEN → HALF_OPEN (testing recovery)")
return True
return False
# HALF_OPEN 状态:允许试探性请求
return True
def record_success(self):
"""记录成功"""
if self.state == CircuitState.HALF_OPEN:
self.success_count_in_half_open += 1
if self.success_count_in_half_open >= 3: # 连续 3 次成功
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count_in_half_open = 0
print("Circuit breaker: HALF_OPEN → CLOSED (recovered)")
elif self.state == CircuitState.CLOSED:
# 在 CLOSED 状态下,成功会重置失败计数
self.failure_count = 0
def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# 在半开状态下失败,立即回到断开状态
self.state = CircuitState.OPEN
print("Circuit breaker: HALF_OPEN → OPEN (recovery failed)")
elif self.failure_count >= self.failure_threshold:
# 失败次数超过阈值,跳闸
self.state = CircuitState.OPEN
print(f"Circuit breaker: CLOSED → OPEN (failures: {self.failure_count})")
def execute(self, func, *args, **kwargs):
"""在熔断器保护下执行函数"""
if not self.can_execute():
raise Exception("Circuit breaker is OPEN. Request rejected.")
try:
result = func(*args, **kwargs)
self.record_success()
return result
except self.expected_exception as e:
self.record_failure()
raise
# 使用示例
llm_circuit_breaker = CircuitBreaker(
failure_threshold=5, # 连续 5 次失败后跳闸
recovery_timeout=60.0 # 冷却 60 秒后尝试恢复
)
def safe_call_llm(prompt: str) -> str:
return llm_circuit_breaker.execute(call_llm_api, prompt)
熔断器的价值:
- ✅ 防止雪崩效应:避免大量请求压垮已故障的服务
- ✅ 快速失败:在服务不可用时立即返回错误,而不是等待超时
- ✅ 自动恢复:定期探测服务是否恢复,无需人工干预
3. 状态管理与回滚机制:从检查点到补偿事务
在多步任务中,如果第 5 步失败,我们不应该从头开始,而应该从最近的检查点(Checkpoint) 恢复。这需要完善的状态管理机制。
3.1 检查点(Checkpointing)机制
核心思想: 在执行每个关键步骤前后,将当前状态序列化并持久化存储。
import json
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
class Checkpoint:
"""执行检查点"""
def __init__(
self,
task_id: str,
step_name: str,
step_index: int,
state: Dict[str, Any],
timestamp: Optional[str] = None
):
self.task_id = task_id
self.step_name = step_name
self.step_index = step_index
self.state = state
self.timestamp = timestamp or datetime.utcnow().isoformat()
self.checkpoint_id = str(uuid.uuid4())
def to_dict(self) -> Dict[str, Any]:
return {
"checkpoint_id": self.checkpoint_id,
"task_id": self.task_id,
"step_name": self.step_name,
"step_index": self.step_index,
"state": self.state,
"timestamp": self.timestamp
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Checkpoint':
return cls(
task_id=data["task_id"],
step_name=data["step_name"],
step_index=data["step_index"],
state=data["state"],
timestamp=data["timestamp"]
)
class CheckpointManager:
"""
检查点管理器
负责保存和恢复任务执行状态
"""
def __init__(self, storage_backend: str = "memory"):
"""
参数:
storage_backend: 存储后端(memory/file/database)
"""
self.storage_backend = storage_backend
self.checkpoints: Dict[str, Checkpoint] = {} # task_id -> latest checkpoint
def save_checkpoint(
self,
task_id: str,
step_name: str,
step_index: int,
state: Dict[str, Any]
):
"""保存检查点"""
checkpoint = Checkpoint(
task_id=task_id,
step_name=step_name,
step_index=step_index,
state=state
)
# 序列化状态(过滤不可序列化的对象)
serializable_state = self._make_serializable(state)
checkpoint.state = serializable_state
# 存储检查点
if self.storage_backend == "memory":
self.checkpoints[task_id] = checkpoint
elif self.storage_backend == "file":
self._save_to_file(checkpoint)
elif self.storage_backend == "database":
self._save_to_database(checkpoint)
print(f"✓ Checkpoint saved: {step_name} (step {step_index})")
return checkpoint
def load_latest_checkpoint(self, task_id: str) -> Optional[Checkpoint]:
"""加载最新的检查点"""
if self.storage_backend == "memory":
return self.checkpoints.get(task_id)
elif self.storage_backend == "file":
return self._load_from_file(task_id)
elif self.storage_backend == "database":
return self._load_from_database(task_id)
return None
def clear_checkpoints(self, task_id: str):
"""任务完成后清理检查点"""
if self.storage_backend == "memory":
self.checkpoints.pop(task_id, None)
elif self.storage_backend == "file":
self._delete_file(task_id)
elif self.storage_backend == "database":
self._delete_from_database(task_id)
def _make_serializable(self, obj: Any) -> Any:
"""将对象转换为可 JSON 序列化的格式"""
if isinstance(obj, dict):
return {k: self._make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._make_serializable(item) for item in obj]
elif isinstance(obj, (str, int, float, bool, type(None))):
return obj
else:
# 对于不可序列化的对象,转换为字符串表示
return f"<{type(obj).__name__}: {str(obj)}>"
def _save_to_file(self, checkpoint: Checkpoint):
"""保存到文件(简化版)"""
filename = f"checkpoints/{checkpoint.task_id}.json"
with open(filename, 'w') as f:
json.dump(checkpoint.to_dict(), f, indent=2)
def _load_from_file(self, task_id: str) -> Optional[Checkpoint]:
"""从文件加载"""
filename = f"checkpoints/{task_id}.json"
try:
with open(filename, 'r') as f:
data = json.load(f)
return Checkpoint.from_dict(data)
except FileNotFoundError:
return None
# 数据库存储的实现省略(可使用 SQLite/PostgreSQL/Redis)
def _save_to_database(self, checkpoint: Checkpoint):
pass
def _load_from_database(self, task_id: str) -> Optional[Checkpoint]:
return None
def _delete_file(self, task_id: str):
import os
filename = f"checkpoints/{task_id}.json"
if os.path.exists(filename):
os.remove(filename)
def _delete_from_database(self, task_id: str):
pass
3.2 基于检查点的恢复执行
现在我们可以实现一个支持断点续传的任务执行引擎:
class ResumableTaskExecutor:
"""
可恢复的任务执行器
特性:
- 自动保存检查点
- 支持从中断处恢复执行
- 任务完成后自动清理检查点
"""
def __init__(self, checkpoint_manager: CheckpointManager):
self.checkpoint_manager = checkpoint_manager
self.task_state: Dict[str, Any] = {}
def execute_with_resumption(
self,
task_id: str,
steps: list,
initial_state: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
执行任务(支持断点续传)
参数:
task_id: 任务唯一标识
steps: 执行步骤列表,每个元素是 (step_name, function)
initial_state: 初始状态
返回:
最终状态
"""
# 1. 尝试加载之前的检查点
checkpoint = self.checkpoint_manager.load_latest_checkpoint(task_id)
if checkpoint:
print(f"↻ Resuming task from checkpoint: {checkpoint.step_name}")
start_index = checkpoint.step_index + 1
self.task_state = checkpoint.state
else:
print(f"▶ Starting new task: {task_id}")
start_index = 0
self.task_state = initial_state or {}
# 2. 从检查点之后开始执行
for i in range(start_index, len(steps)):
step_name, step_func = steps[i]
try:
print(f"\n[{i+1}/{len(steps)}] Executing: {step_name}")
# 执行步骤
result = step_func(self.task_state)
# 更新状态
self.task_state[f"{step_name}_result"] = result
self.task_state["last_completed_step"] = step_name
self.task_state["last_completed_step_index"] = i
# 保存检查点
self.checkpoint_manager.save_checkpoint(
task_id=task_id,
step_name=step_name,
step_index=i,
state=self.task_state.copy()
)
except Exception as e:
print(f"✗ Step '{step_name}' failed: {e}")
print(f"Task paused. Use task_id '{task_id}' to resume later.")
raise
# 3. 任务完成,清理检查点
print(f"\n✓ Task '{task_id}' completed successfully!")
self.checkpoint_manager.clear_checkpoints(task_id)
return self.task_state
# 使用示例
def step_fetch_data(state: Dict) -> Dict:
"""步骤 1:获取数据"""
print(" → Fetching financial data from database...")
# 模拟数据库查询
return {"revenue": 1000000, "profit": 200000}
def step_analyze_data(state: Dict) -> Dict:
"""步骤 2:分析数据"""
print(" → Analyzing data with LLM...")
# 模拟 LLM 调用
return {"trend": "positive", "growth_rate": 0.15}
def step_generate_report(state: Dict) -> str:
"""步骤 3:生成报告"""
print(" → Generating report...")
analysis = state.get("step_analyze_data_result", {})
return f"Report: Growth rate is {analysis.get('growth_rate', 0)*100}%"
def step_send_email(state: Dict) -> bool:
"""步骤 4:发送邮件"""
print(" → Sending email to CEO...")
# 模拟邮件发送
return True
# 创建执行器
checkpoint_mgr = CheckpointManager(storage_backend="file")
executor = ResumableTaskExecutor(checkpoint_mgr)
# 定义任务步骤
steps = [
("fetch_data", step_fetch_data),
("analyze_data", step_analyze_data),
("generate_report", step_generate_report),
("send_email", step_send_email),
]
# 执行任务
task_id = "financial-report-2026-q1"
try:
result = executor.execute_with_resumption(
task_id=task_id,
steps=steps,
initial_state={"user": "CEO", "quarter": "Q1 2026"}
)
print(f"\nFinal result: {result}")
except Exception as e:
print(f"\nTask interrupted. Run again with task_id='{task_id}' to resume.")
运行效果:
# 第一次执行(假设在第 3 步失败)
▶ Starting new task: financial-report-2026-q1
[1/4] Executing: fetch_data
→ Fetching financial data from database...
✓ Checkpoint saved: fetch_data (step 0)
[2/4] Executing: analyze_data
→ Analyzing data with LLM...
✓ Checkpoint saved: analyze_data (step 1)
[3/4] Executing: generate_report
→ Generating report...
✗ Step 'generate_report' failed: Report generation timeout
Task paused. Use task_id 'financial-report-2026-q1' to resume later.
# 第二次执行(自动从第 3 步恢复)
↻ Resuming task from checkpoint: analyze_data
[3/4] Executing: generate_report
→ Generating report...
✓ Checkpoint saved: generate_report (step 2)
[4/4] Executing: send_email
→ Sending email to CEO...
✓ Checkpoint saved: send_email (step 3)
✓ Task 'financial-report-2026-q1' completed successfully!
3.3 补偿事务(Compensating Transactions)
在某些场景下,简单的检查点恢复不够。例如:
- Step 1:在数据库中创建记录
- Step 2:调用支付 API 扣款
- Step 3:发送确认邮件(失败)
如果 Step 3 失败,我们不能简单地重试 Step 3,因为 Step 2 已经扣款了。此时需要补偿事务来撤销已完成的操作。
class CompensationHandler:
"""
补偿事务处理器
用于在任务失败时撤销已成功执行的操作
"""
def __init__(self):
self.compensation_stack: list = [] # 补偿操作栈
def register_compensation(self, compensation_func, *args, **kwargs):
"""注册补偿操作"""
self.compensation_stack.append({
"func": compensation_func,
"args": args,
"kwargs": kwargs
})
def execute_all_compensations(self):
"""执行所有注册的补偿操作(逆序执行)"""
print("\n⚠ Executing compensations to rollback changes...")
errors = []
while self.compensation_stack:
comp = self.compensation_stack.pop()
try:
print(f" → Rolling back: {comp['func'].__name__}")
comp["func"](*comp["args"], **comp["kwargs"])
print(f" ✓ Compensation successful")
except Exception as e:
print(f" ✗ Compensation failed: {e}")
errors.append(e)
if errors:
raise Exception(f"Some compensations failed: {errors}")
print("✓ All compensations completed")
def clear(self):
"""清空补偿栈(任务成功完成后调用)"""
self.compensation_stack.clear()
# 使用示例
comp_handler = CompensationHandler()
def step_create_order(state: Dict) -> str:
"""步骤 1:创建订单"""
order_id = "ORD-12345"
print(f" → Creating order {order_id}...")
# 注册补偿操作:如果后续步骤失败,删除订单
comp_handler.register_compensation(delete_order, order_id)
return order_id
def delete_order(order_id: str):
"""补偿操作:删除订单"""
print(f" [Compensation] Deleting order {order_id}...")
# 实际删除逻辑
def step_charge_payment(state: Dict) -> bool:
"""步骤 2:扣款"""
order_id = state["step_create_order_result"]
print(f" → Charging payment for order {order_id}...")
# 注册补偿操作:如果后续步骤失败,退款
comp_handler.register_compensation(refund_payment, order_id)
return True
def refund_payment(order_id: str):
"""补偿操作:退款"""
print(f" [Compensation] Refunding payment for order {order_id}...")
# 实际退款逻辑
def step_send_confirmation(state: Dict) -> bool:
"""步骤 3:发送确认邮件(可能失败)"""
print(" → Sending confirmation email...")
raise Exception("Email service unavailable") # 模拟失败
# 执行带补偿的任务
try:
state = {}
state["step_create_order_result"] = step_create_order(state)
state["step_charge_payment_result"] = step_charge_payment(state)
step_send_confirmation(state)
# 如果执行到这里,说明全部成功,清空补偿栈
comp_handler.clear()
except Exception as e:
print(f"\n✗ Task failed: {e}")
# 执行补偿操作
comp_handler.execute_all_compensations()
输出:
→ Creating order ORD-12345...
→ Charging payment for order ORD-12345...
→ Sending confirmation email...
✗ Task failed: Email service unavailable
⚠ Executing compensations to rollback changes...
→ Rolling back: refund_payment
[Compensation] Refunding payment for order ORD-12345...
✓ Compensation successful
→ Rolling back: delete_order
[Compensation] Deleting order ORD-12345...
✓ Compensation successful
✓ All compensations completed
补偿事务的关键点:
- ✅ 逆序执行:后执行的操作先补偿(类似栈的 LIFO)
- ✅ 幂等性:补偿操作应该可以安全地重复执行
- ✅ 记录日志:所有补偿操作都应该详细记录,便于审计
4. 降级与兜底策略:优雅地应对失败
即使有重试和回滚,某些错误仍然无法恢复。此时我们需要降级策略(Fallback Strategy),确保系统仍能提供部分可用的功能。
4.1 多层降级架构
4.2 实现降级策略
from typing import Callable, Optional
class FallbackChain:
"""
降级链:按优先级依次尝试多个策略
使用场景:
- LLM API 失败时切换到备用模型
- 实时数据不可用时使用缓存数据
- 复杂算法失败时使用简化算法
"""
def __init__(self):
self.strategies: list = []
def add_strategy(
self,
strategy_func: Callable,
name: str,
priority: int = 0
):
"""
添加降级策略
参数:
strategy_func: 策略函数
name: 策略名称
priority: 优先级(数字越小优先级越高)
"""
self.strategies.append({
"func": strategy_func,
"name": name,
"priority": priority
})
# 按优先级排序
self.strategies.sort(key=lambda x: x["priority"])
def execute(self, *args, **kwargs) -> Any:
"""
执行降级链
依次尝试每个策略,直到有一个成功
"""
last_error = None
for strategy in self.strategies:
try:
print(f" → Trying strategy: {strategy['name']}")
result = strategy["func"](*args, **kwargs)
print(f" ✓ Strategy '{strategy['name']}' succeeded")
return result
except Exception as e:
last_error = e
print(f" ✗ Strategy '{strategy['name']}' failed: {e}")
continue
# 所有策略都失败
raise Exception(
f"All strategies failed. Last error: {last_error}"
)
# 使用示例:LLM 调用的多层降级
fallback_llm = FallbackChain()
def primary_strategy(prompt: str) -> str:
"""主策略:GPT-4"""
return call_gpt4_api(prompt)
def secondary_strategy(prompt: str) -> str:
"""次选策略:GPT-3.5"""
return call_gpt35_api(prompt)
def tertiary_strategy(prompt: str) -> str:
"""保底策略:规则引擎"""
return rule_based_response(prompt)
# 注册策略
fallback_llm.add_strategy(primary_strategy, "GPT-4", priority=1)
fallback_llm.add_strategy(secondary_strategy, "GPT-3.5", priority=2)
fallback_llm.add_strategy(tertiary_strategy, "Rule Engine", priority=3)
# 执行
try:
response = fallback_llm.execute("Analyze this financial data...")
print(f"Response: {response}")
except Exception as e:
print(f"All strategies failed: {e}")
4.3 人机协同兜底
当所有自动化策略都失败时,最后的防线是人工介入。但这不应该是简单的报错,而应该:
- 保存所有中间结果
- 清晰描述问题和上下文
- 提供便捷的恢复入口
def human_fallback_handler(
task_id: str,
error: Exception,
context: Dict[str, Any],
notification_channels: list = ["email", "slack"]
):
"""
人工介入处理器
当所有自动化策略失败时,通知人工处理
"""
# 1. 保存现场
incident_report = {
"task_id": task_id,
"timestamp": datetime.utcnow().isoformat(),
"error": str(error),
"error_type": type(error).__name__,
"context": context,
"status": "awaiting_human_intervention"
}
# 保存到数据库或文件系统
save_incident_report(incident_report)
# 2. 发送通知
message = f"""
🚨 AI Agent Task Failed - Human Intervention Required
Task ID: {task_id}
Error: {error}
Time: {incident_report['timestamp']}
Context Summary:
- Last completed step: {context.get('last_completed_step', 'N/A')}
- User: {context.get('user', 'N/A')}
Action Required:
1. Review the incident report
2. Manually complete the task or fix the issue
3. Resume the task using: resume_task('{task_id}')
Incident Report: /incidents/{task_id}.json
"""
for channel in notification_channels:
if channel == "email":
send_email(
to="admin@example.com",
subject=f"[URGENT] AI Agent Task Failed: {task_id}",
body=message
)
elif channel == "slack":
send_slack_notification(
channel="#ai-alerts",
message=message
)
print(f"⚠ Human intervention requested for task: {task_id}")
return incident_report
5. 综合实战:构建工业级容错 Agent 引擎
现在我们将前面学到的所有技术整合在一起,构建一个完整的容错 Agent 执行引擎。
import time
import json
from typing import Dict, Any, List, Callable, Optional
from dataclasses import dataclass, field
@dataclass
class TaskConfig:
"""任务配置"""
task_id: str
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
enable_checkpointing: bool = True
enable_circuit_breaker: bool = True
enable_fallback: bool = True
timeout: Optional[float] = 300.0 # 5分钟超时
class FaultTolerantAgentEngine:
"""
工业级容错 Agent 引擎
集成特性:
- 智能重试(指数退避 + 熔断器)
- 状态管理(检查点 + 恢复执行)
- 补偿事务(失败回滚)
- 降级策略(多层兜底)
- 超时控制
- 完整日志与监控
"""
def __init__(self, config: TaskConfig):
self.config = config
self.checkpoint_mgr = CheckpointManager(storage_backend="file")
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
self.compensation_handler = CompensationHandler()
self.execution_log: List[Dict[str, Any]] = []
def get_circuit_breaker(self, service_name: str) -> CircuitBreaker:
"""获取或创建熔断器"""
if service_name not in self.circuit_breakers:
self.circuit_breakers[service_name] = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60.0
)
return self.circuit_breakers[service_name]
def execute_step_with_protection(
self,
step_name: str,
step_func: Callable,
*args,
**kwargs
) -> Any:
"""
在多重保护下执行单个步骤
保护机制:
1. 熔断器检查
2. 超时控制
3. 智能重试
4. 降级策略
"""
service_name = step_func.__name__
cb = self.get_circuit_breaker(service_name)
# 1. 熔断器检查
if not cb.can_execute():
raise Exception(f"Circuit breaker OPEN for {service_name}")
last_error = None
# 2. 重试循环
for attempt in range(self.config.max_retries + 1):
try:
# 3. 超时控制
if self.config.timeout:
result = self._execute_with_timeout(
step_func,
self.config.timeout,
*args,
**kwargs
)
else:
result = step_func(*args, **kwargs)
# 记录成功
cb.record_success()
self._log_execution(step_name, attempt, "success", None)
return result
except Exception as e:
last_error = e
error_type = classify_error(e)
# 记录失败
cb.record_failure()
self._log_execution(step_name, attempt, "failed", str(e))
# 永久性错误不重试
if error_type == ErrorType.PERMANENT:
raise ValueError(f"Permanent error in {step_name}: {e}")
# 达到最大重试次数
if attempt >= self.config.max_retries:
break
# 计算退避时间
delay = min(
self.config.base_delay * (2 ** attempt),
self.config.max_delay
)
print(f" ⏳ Retry {attempt + 1}/{self.config.max_retries} in {delay:.2f}s...")
time.sleep(delay)
# 4. 所有重试失败,尝试降级策略
if self.config.enable_fallback:
print(f" ↓ Attempting fallback for {step_name}...")
try:
fallback_result = self._try_fallback(step_name, *args, **kwargs)
print(f" ✓ Fallback succeeded")
return fallback_result
except Exception as fallback_error:
print(f" ✗ Fallback also failed: {fallback_error}")
# 5. 彻底失败
raise Exception(
f"Step '{step_name}' failed after {self.config.max_retries} retries. "
f"Last error: {last_error}"
)
def _execute_with_timeout(
self,
func: Callable,
timeout: float,
*args,
**kwargs
) -> Any:
"""带超时控制的执行"""
import signal
def timeout_handler(signum, frame):
raise TimeoutError(f"Execution timed out after {timeout}s")
# 设置超时信号
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(timeout))
try:
result = func(*args, **kwargs)
return result
finally:
# 取消超时
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def _try_fallback(
self,
step_name: str,
*args,
**kwargs
) -> Any:
"""尝试降级策略"""
fallback_strategies = {
"call_llm_api": self._fallback_llm,
"fetch_external_data": self._fallback_cached_data,
"generate_complex_report": self._fallback_simple_summary,
}
fallback_func = fallback_strategies.get(step_name)
if fallback_func:
return fallback_func(*args, **kwargs)
else:
raise Exception(f"No fallback strategy for {step_name}")
def _fallback_llm(self, prompt: str, **kwargs) -> str:
"""LLM 降级:使用更便宜的模型"""
print(" [Fallback] Using GPT-3.5 instead of GPT-4")
return call_gpt35_api(prompt)
def _fallback_cached_data(self, query: str, **kwargs) -> Dict:
"""数据获取降级:使用缓存"""
print(" [Fallback] Using cached data")
return load_from_cache(query)
def _fallback_simple_summary(self, data: Dict, **kwargs) -> str:
"""报告生成降级:生成简化版"""
print(" [Fallback] Generating simplified summary")
return f"Simplified summary: {str(data)[:500]}..."
def execute_workflow(
self,
steps: List[tuple],
initial_state: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
执行完整的工作流
参数:
steps: 步骤列表 [(step_name, step_func), ...]
initial_state: 初始状态
返回:
最终状态
"""
task_id = self.config.task_id
state = initial_state or {}
# 尝试从检查点恢复
checkpoint = self.checkpoint_mgr.load_latest_checkpoint(task_id)
if checkpoint:
print(f"↻ Resuming from checkpoint: {checkpoint.step_name}")
start_index = checkpoint.step_index + 1
state = checkpoint.state
else:
print(f"▶ Starting new workflow: {task_id}")
start_index = 0
# 执行步骤
for i in range(start_index, len(steps)):
step_name, step_func = steps[i]
try:
print(f"\n[{i+1}/{len(steps)}] Executing: {step_name}")
# 在保护下执行步骤
result = self.execute_step_with_protection(
step_name,
step_func,
state
)
# 更新状态
state[f"{step_name}_result"] = result
state["last_completed_step"] = step_name
state["last_completed_step_index"] = i
# 注册补偿操作(如果需要)
if hasattr(step_func, "__compensation__"):
comp_func = step_func.__compensation__
self.compensation_handler.register_compensation(
comp_func,
result
)
# 保存检查点
if self.config.enable_checkpointing:
self.checkpoint_mgr.save_checkpoint(
task_id=task_id,
step_name=step_name,
step_index=i,
state=state.copy()
)
except Exception as e:
print(f"\n✗ Workflow failed at step '{step_name}': {e}")
# 执行补偿事务
if self.compensation_handler.compensation_stack:
try:
self.compensation_handler.execute_all_compensations()
except Exception as comp_error:
print(f"⚠ Compensation also failed: {comp_error}")
# 人工介入
human_fallback_handler(
task_id=task_id,
error=e,
context=state
)
raise
# 工作流成功完成
print(f"\n✓ Workflow '{task_id}' completed successfully!")
self.compensation_handler.clear()
self.checkpoint_mgr.clear_checkpoints(task_id)
# 保存执行日志
self._save_execution_log()
return state
def _log_execution(
self,
step_name: str,
attempt: int,
status: str,
error: Optional[str]
):
"""记录执行日志"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"task_id": self.config.task_id,
"step_name": step_name,
"attempt": attempt,
"status": status,
"error": error
}
self.execution_log.append(log_entry)
def _save_execution_log(self):
"""保存执行日志"""
log_filename = f"logs/{self.config.task_id}_execution_log.json"
with open(log_filename, 'w') as f:
json.dump(self.execution_log, f, indent=2)
print(f"📝 Execution log saved to {log_filename}")
# ==================== 使用示例 ====================
def example_financial_analysis_workflow():
"""示例:财务分析工作流"""
# 定义步骤
def fetch_financial_data(state: Dict) -> Dict:
"""步骤 1:获取财务数据"""
# 模拟 API 调用
return {"revenue": 1000000, "profit": 200000, "expenses": 800000}
def analyze_trends(state: Dict) -> Dict:
"""步骤 2:分析趋势"""
data = state["fetch_financial_data_result"]
# 模拟 LLM 分析
return {
"trend": "positive",
"growth_rate": 0.15,
"recommendation": "Continue current strategy"
}
def generate_visualization(state: Dict) -> str:
"""步骤 3:生成可视化"""
# 模拟图表生成
return "chart_q3_2026.png"
def compile_report(state: Dict) -> str:
"""步骤 4:编写报告"""
analysis = state["analyze_trends_result"]
chart = state["generate_visualization_result"]
return f"Q3 Report: {analysis['trend']} growth ({chart})"
def send_to_stakeholders(state: Dict) -> bool:
"""步骤 5:发送给利益相关者"""
report = state["compile_report_result"]
# 模拟邮件发送
print(f" Sending report: {report}")
return True
steps = [
("fetch_financial_data", fetch_financial_data),
("analyze_trends", analyze_trends),
("generate_visualization", generate_visualization),
("compile_report", compile_report),
("send_to_stakeholders", send_to_stakeholders),
]
# 创建引擎
config = TaskConfig(
task_id="financial-analysis-q3-2026",
max_retries=3,
base_delay=1.0,
max_delay=30.0,
enable_checkpointing=True,
enable_circuit_breaker=True,
enable_fallback=True,
timeout=300.0
)
engine = FaultTolerantAgentEngine(config)
# 执行工作流
try:
result = engine.execute_workflow(
steps=steps,
initial_state={"quarter": "Q3 2026", "user": "CFO"}
)
print(f"\n✅ Final Result: {result}")
except Exception as e:
print(f"\n❌ Workflow failed: {e}")
# 运行示例
if __name__ == "__main__":
example_financial_analysis_workflow()
6. 生产环境的最佳实践
在实际部署容错 Agent 系统时,还需要注意以下几点:
6.1 监控与告警
# 关键指标监控
metrics = {
"task_success_rate": 0.95, # 任务成功率
"average_retry_count": 1.2, # 平均重试次数
"circuit_breaker_trips": 3, # 熔断器触发次数
"fallback_usage_rate": 0.05, # 降级策略使用率
"average_execution_time": 45.3, # 平均执行时间(秒)
"compensation_trigger_rate": 0.02 # 补偿事务触发率
}
# 告警规则
alerts = [
"成功率 < 90% → P1 告警",
"熔断器触发 > 10 次/小时 → P2 告警",
"降级策略使用率 > 20% → P2 告警",
"平均执行时间 > 2x 基线 → P3 告警",
]
6.2 日志与追踪
- 结构化日志:使用 JSON 格式记录所有关键事件
- 分布式追踪:为每个任务分配唯一的 Trace ID,贯穿整个执行链路
- 审计日志:记录所有补偿事务和人工介入操作
6.3 性能优化
- 异步执行:对于独立步骤,使用 asyncio 并行执行
- 缓存策略:缓存 LLM 响应、外部 API 数据
- 资源池化:复用数据库连接、HTTP 会话
6.4 测试策略
# 混沌工程测试
chaos_tests = [
"随机注入网络延迟",
"模拟 LLM API 503 错误",
"强制触发熔断器",
"模拟检查点损坏",
"测试并发任务冲突",
]
结语
可靠性不是偶然,而是设计的结果。
通过本篇的学习,我们掌握了构建生产级 AI Agent 的核心容错技术:
- 智能重试:指数退避 + 熔断器,避免重试风暴
- 状态管理:检查点机制,支持断点续传
- 补偿事务:失败时自动回滚,保证数据一致性
- 降级策略:多层兜底,优雅应对各种故障
- 人机协同:最后的防线,确保关键任务不丢失
这些技术不仅是 AI Agent 的核心,也是所有分布式系统的基础。正如 Netflix 的 Chaos Monkey 所证明的:只有经过充分容错设计的系统,才能在真实的混乱环境中生存下来。
至此,我们完成了 《AI 技术演进与核心算法实战》第四模块:行动篇 的全部内容。从 Function Calling 的工具调用,到 ReAct 的思考-行动循环,再到自主规划和工作流编排,最后到工业级的容错设计——我们已经赋予了 AI 完整的"行动能力"。
下一篇,我们将进入 第五模块:协作篇,探索多 Agent 系统的奥秘。当单个 Agent 的能力遇到瓶颈时,如何通过多个 Agent 的协作产生"群体智能"?敬请期待 《多 Agent 协作模式:Hierarchical、Sequential 与 Joint 架构对比》。
📚 参考文献与延伸阅读
- Release It!: Design and Deploy Production-Ready Software (Michael Nygard) - 经典的生产系统稳定性设计指南,详细介绍了熔断器、舱壁隔离等模式。
- The Circuit Breaker Pattern (Martin Fowler) - 熔断器模式的权威解释,理解微服务容错的核心。
- Exponential Backoff And Jitter (AWS Architecture Blog) - AWS 官方博客,深入讲解指数退避算法的原理和实践。
- Saga Pattern for Microservices (Chris Richardson) - 补偿事务的经典实现模式,适用于长运行事务。
- Chaos Engineering: System Resiliency in Practice (Casey Rosenthal) - 混沌工程实践,通过主动注入故障来提升系统可靠性。
- LangGraph Documentation: Persistence & Checkpointing - LangGraph 的检查点和状态管理实现,工业级参考。
下一篇预告: 多 Agent 协作模式:Hierarchical(层级)、Sequential(流水线)与 Joint(联合)架构对比