Planner, Harness, Skill Evaluation:构建工业级 Agent 系统的三层架构
March 11, 2026 · Tech Blog
大多数 Agent 教程教的是怎么让 LLM 调工具。这篇文章讲的是怎么让 Agent 系统在生产环境里可靠地跑起来。
为什么 Naive Agent 在生产环境会炸
一个最简单的 agent 实现长这样:
def run(task: str):
history = []
while True:
response = llm(task + str(history))
action = parse_action(response)
if action.type == "finish":
return action.output
result = execute_tool(action)
history.append(result)
这个东西在 demo 里能跑。在生产里它的问题是:
- 没有显式的任务分解:所有规划都隐式地藏在 LLM 的 context 里,不可见、不可控
- 执行层没有防护:工具超时、输出格式错误、上下文污染,全靠 LLM 自救
- 失败不可恢复:出错了只能整个重来,之前所有的工作全丢
- 没有质量信号:
tool call succeeded不等于output is useful - 上下文越来越长:history 无限增长,LLM 注意力开始漂移
工业级的解法是三层架构:Planner + Harness + Skill Evaluation。
第一层:Planner——把任务变成可调度的结构
核心差异:隐式规划 vs 显式规划
普通 agent 是隐式规划,计划藏在 LLM 的脑子里。Planner agent 的核心是把计划变成一个一等公民的数据结构。
from dataclasses import dataclass, field
from typing import List, Literal, Any, Optional
@dataclass
class Step:
id: str
description: str
skill_type: str # 用哪种 skill 来执行
task_type: str # 任务的语义类型,用于 skill 选择
dependencies: List[str] # 依赖哪些 step 的输出
success_criteria: str # 什么叫"成功",给 evaluator 用
status: Literal["pending", "running", "done", "failed"] = "pending"
output: Any = None
eval_score: float = 0.0
@dataclass
class Plan:
steps: List[Step]
def get_ready_steps(self) -> List[Step]:
"""返回所有依赖已完成、自己还没开始的 step"""
done_ids = {s.id for s in self.steps if s.status == "done"}
return [
s for s in self.steps
if s.status == "pending"
and all(dep in done_ids for dep in s.dependencies)
]
def is_done(self) -> bool:
return all(s.status in ("done", "failed") for s in self.steps)
def get_context(self) -> dict:
"""把已完成 step 的输出整理成 executor 可用的上下文"""
return {
s.id: s.output
for s in self.steps
if s.status == "done"
}
def mark_done(self, step: Step, result: "StepResult"):
step.status = "done"
step.output = result.output
step.eval_score = result.skill_score
def mark_failed(self, step: Step, reason: str):
step.status = "failed"
step.output = reason
Plan 是一个有向无环图(DAG)。dependencies 字段定义了 step 之间的数据依赖关系,这是普通 agent 完全没有的东西。
Planner LLM:只做规划,不做执行
class Planner:
def __init__(self, llm, skill_registry):
self.llm = llm
self.skill_registry = skill_registry
def plan(self, task: str) -> Plan:
# 查 skill registry,给 LLM 提供历史表现数据
skill_hints = self.skill_registry.get_hints()
prompt = f"""
你是一个任务规划器。将以下任务分解为可执行的步骤,每个步骤必须明确指定:
- 依赖哪些前置步骤
- 使用什么 skill 类型
- 成功标准是什么
任务: {task}
可用 skill 类型及历史表现:
{skill_hints}
返回 JSON 格式:
{{
"steps": [
{{
"id": "step_1",
"description": "...",
"skill_type": "web_search | code_gen | data_analysis | ...",
"task_type": "...",
"dependencies": [],
"success_criteria": "..."
}}
]
}}
"""
raw = self.llm(prompt)
data = json.loads(raw)
return Plan(steps=[Step(**s) for s in data["steps"]])
def replan(self, task: str, plan: Plan, failed_step: Step, reason: str) -> Plan:
"""当某个 step 失败时,带着失败原因重新规划"""
completed = [s for s in plan.steps if s.status == "done"]
prompt = f"""
任务: {task}
已完成的步骤:
{json.dumps([{"id": s.id, "description": s.description, "output_summary": str(s.output)[:200]} for s in completed])}
失败的步骤:
- 描述: {failed_step.description}
- 失败原因: {reason}
请基于已完成的工作,重新规划剩余步骤。避免重复已失败的方式。
返回 JSON(只需规划未完成的部分):
"""
raw = self.llm(prompt)
data = json.loads(raw)
# 保留已完成的 steps,替换失败的和待执行的
new_steps = completed + [Step(**s) for s in data["steps"]]
return Plan(steps=new_steps)
第二层:Harness——让每一步可靠地执行
Planner 知道"做什么",但不管"怎么可靠地做"。这是 Harness 的职责。
Harness 不是一个函数,是一套分层的执行基础设施:
┌─────────────────────────────────┐
│ 5. Observability Layer │ trace、log、metrics
├─────────────────────────────────┤
│ 4. Error & Recovery Layer │ retry、fallback、replan 触发
├─────────────────────────────────┤
│ 3. Context Management Layer │ 注入什么 context,压缩多少
├─────────────────────────────────┤
│ 2. Tool Routing Layer │ step 路由到哪个 executor
├─────────────────────────────────┤
│ 1. State Management Layer │ step 结果的存储和传递
└─────────────────────────────────┘
StepResult:Harness 和 Planner 之间的契约
@dataclass
class StepResult:
output: Any
status: Literal["success", "failed"]
skill_score: float = 0.0 # 来自 Skill Evaluator
eval_reason: str = "" # 为什么这个分
eval_passed: bool = False # 是否通过质量阈值
retry_count: int = 0
duration_ms: int = 0
error: Optional[str] = None
StepRunner:五层的具体实现
import time
from typing import Dict, Type
class StepRunner:
def __init__(
self,
tool_router: "ToolRouter",
context_manager: "ContextManager",
skill_evaluator: "SkillEvaluator",
state_manager: "StateManager",
tracer: "Tracer",
):
self.tool_router = tool_router
self.context_manager = context_manager
self.skill_evaluator = skill_evaluator
self.state_manager = state_manager
self.tracer = tracer
self.quality_threshold = 0.6 # eval score 低于这个触发 replan
self.max_retries = 3
def run(self, step: Step, plan_context: dict) -> StepResult:
start = time.time()
# === Layer 3: Context Management ===
enriched_context = self.context_manager.build(step, plan_context)
# === Layer 5: Observability - begin ===
trace_id = self.tracer.begin(step, enriched_context)
try:
# === Layer 4: Error & Recovery ===
raw_result = self._execute_with_retry(step, enriched_context)
# === Skill Evaluation(挂在执行之后,return 之前)===
eval_result = self.skill_evaluator.evaluate(
skill_type=step.skill_type,
step=step,
context=enriched_context,
output=raw_result
)
result = StepResult(
output=raw_result,
status="success" if eval_result.score >= self.quality_threshold else "failed",
skill_score=eval_result.score,
eval_reason=eval_result.reason,
eval_passed=eval_result.score >= self.quality_threshold,
duration_ms=int((time.time() - start) * 1000)
)
# === Layer 1: State Management ===
self.state_manager.save(step.id, result)
# === Layer 5: Observability - end ===
self.tracer.end(trace_id, result)
return result
except Exception as e:
result = StepResult(
output=None, status="failed",
error=str(e),
duration_ms=int((time.time() - start) * 1000)
)
self.tracer.end(trace_id, result)
return result
def _execute_with_retry(self, step: Step, context: dict, attempt: int = 0):
"""Layer 4: 带错误分类的重试逻辑"""
try:
executor = self.tool_router.get_executor(step.skill_type) # Layer 2
return executor.run(step, context)
except ToolTimeoutError:
if attempt >= self.max_retries:
raise
# 指数退避
time.sleep(2 ** attempt)
return self._execute_with_retry(step, context, attempt + 1)
except OutputFormatError as e:
# 格式错误,重试没意义,直接抛给上层
raise
except RateLimitError:
if attempt >= self.max_retries:
raise
time.sleep(10 * (attempt + 1))
return self._execute_with_retry(step, context, attempt + 1)
ContextManager:控制每个 step 看到什么
这是经常被忽视但非常重要的一层:
class ContextManager:
def __init__(self, max_tokens: int = 4096):
self.max_tokens = max_tokens
def build(self, step: Step, plan_context: dict) -> dict:
"""
不是把所有 plan_context 都塞给 executor
只注入这个 step 真正需要的信息
"""
# 只取 dependencies 指定的上游输出
relevant_outputs = {
dep_id: self._compress(plan_context.get(dep_id))
for dep_id in step.dependencies
}
return {
"task": step.description,
"success_criteria": step.success_criteria,
"upstream_outputs": relevant_outputs,
"skill_type": step.skill_type,
}
def _compress(self, output: Any, max_chars: int = 2000) -> Any:
"""防止 context 爆炸"""
if output is None:
return None
text = str(output)
if len(text) > max_chars:
return text[:max_chars] + f"\n... [truncated, {len(text)} chars total]"
return output
ToolRouter:Layer 2 的具体实现
class ToolRouter:
def __init__(self):
self._registry: Dict[str, Type["BaseExecutor"]] = {}
def register(self, skill_type: str, executor_class: Type["BaseExecutor"]):
self._registry[skill_type] = executor_class
def get_executor(self, skill_type: str) -> "BaseExecutor":
if skill_type not in self._registry:
raise ValueError(f"No executor for skill_type: {skill_type}")
return self._registry[skill_type]()
# Executor 接口
class BaseExecutor:
def run(self, step: Step, context: dict) -> Any:
raise NotImplementedError
class WebSearchExecutor(BaseExecutor):
def run(self, step: Step, context: dict) -> Any:
query = context["task"]
return search_web(query) # 具体工具调用
class CodeGenExecutor(BaseExecutor):
def run(self, step: Step, context: dict) -> Any:
return llm_generate_code(context)
第三层:Skill Evaluation——质量信号闭环
status == "success" 不等于 output is useful。这是 Skill Evaluation 存在的根本原因。
Evaluator 的三种形态
形态 1:Rule-based(轻量,适合有明确 schema 的输出)
class RuleBasedEvaluator:
def evaluate(self, skill_type: str, step: Step, context: dict, output: Any) -> "EvalResult":
if skill_type == "web_search":
results = output.get("results", [])
score = min(len(results) / 5, 1.0) # 基础分:结果数量
# 检查关键词覆盖
required_kw = self._extract_keywords(step.description)
output_text = str(output).lower()
coverage = sum(1 for kw in required_kw if kw in output_text) / max(len(required_kw), 1)
score = score * 0.3 + coverage * 0.7
return EvalResult(score=score, reason=f"result_count={len(results)}, kw_coverage={coverage:.2f}")
if skill_type == "code_gen":
code = output.get("code", "")
try:
compile(code, "<string>", "exec")
score = 1.0
except SyntaxError as e:
score = 0.0
return EvalResult(score=score, reason=f"syntax_error: {e}")
# 额外检查:代码是否有实质内容
if len(code.strip().split("\n")) < 3:
score = 0.3
return EvalResult(score=score, reason="syntax_ok")
return EvalResult(score=0.5, reason="no_rule_for_skill_type")
形态 2:LLM-as-judge(主流,适合语义质量评估)
class LLMEvaluator:
def __init__(self, judge_llm):
self.judge_llm = judge_llm
def evaluate(self, skill_type: str, step: Step, context: dict, output: Any) -> "EvalResult":
prompt = f"""
你是一个严格的 agent step 评估器。
任务描述: {step.description}
成功标准: {step.success_criteria}
实际输出: {str(output)[:1500]}
请评估这个输出是否满足任务的成功标准。
评分标准:
- 1.0: 完全满足成功标准
- 0.7-0.9: 基本满足,有小缺陷
- 0.4-0.6: 部分满足,有明显不足
- 0.0-0.3: 基本不满足成功标准
返回 JSON(只返回 JSON,不要其他内容):
{{"score": 0.0-1.0, "reason": "一句话说明", "missing": ["缺少的关键信息"]}}
"""
raw = self.judge_llm(prompt)
data = json.loads(raw)
return EvalResult(**data)
形态 3:带 reference 的 evaluator(科学计算场景)
class ReferenceEvaluator:
"""
当有 ground truth 时使用。
适合科学 AI 场景:分子性质预测、数值计算验证等。
"""
def evaluate(self, skill_type: str, step: Step, context: dict, output: Any) -> "EvalResult":
if not step.expected_output:
# 没有 reference,降级到 LLM judge
return self.llm_evaluator.evaluate(skill_type, step, context, output)
score = self._compute_similarity(output, step.expected_output)
return EvalResult(
score=score,
reason=f"similarity_to_reference={score:.3f}"
)
def _compute_similarity(self, output, reference) -> float:
# 根据 skill_type 选择合适的相似度计算方式
# 数值型:1 - abs(output - reference) / abs(reference)
# 结构型:embedding cosine similarity
# 文本型:BLEU / semantic similarity
pass
SkillRegistry:让系统越跑越好
Evaluator 产生的分数不应该只被用一次就丢掉,它应该积累成 skill 的历史表现数据:
import sqlite3
from collections import defaultdict
from statistics import mean
class SkillRegistry:
"""
记录每个 skill_type 在每类 task_type 上的历史 eval score。
Planner 在规划时可以查询,选择历史表现最好的 skill。
"""
def __init__(self, db_path: str = "skill_registry.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS skill_scores (
skill_type TEXT,
task_type TEXT,
score REAL,
eval_reason TEXT,
timestamp INTEGER
)
""")
conn.commit()
conn.close()
def record(self, skill_type: str, task_type: str, score: float, reason: str = ""):
conn = sqlite3.connect(self.db_path)
conn.execute(
"INSERT INTO skill_scores VALUES (?, ?, ?, ?, ?)",
(skill_type, task_type, score, reason, int(time.time()))
)
conn.commit()
conn.close()
def best_skill_for(self, task_type: str, top_k: int = 3) -> List[dict]:
"""返回在这类任务上历史表现最好的 skill"""
conn = sqlite3.connect(self.db_path)
rows = conn.execute("""
SELECT skill_type, AVG(score) as avg_score, COUNT(*) as sample_count
FROM skill_scores
WHERE task_type = ?
GROUP BY skill_type
HAVING sample_count >= 3
ORDER BY avg_score DESC
LIMIT ?
""", (task_type, top_k)).fetchall()
conn.close()
return [
{"skill_type": r[0], "avg_score": r[1], "sample_count": r[2]}
for r in rows
]
def get_hints(self) -> str:
"""给 Planner LLM 的 skill 表现摘要"""
conn = sqlite3.connect(self.db_path)
rows = conn.execute("""
SELECT skill_type, task_type, AVG(score) as avg_score
FROM skill_scores
GROUP BY skill_type, task_type
ORDER BY avg_score DESC
""").fetchall()
conn.close()
if not rows:
return "暂无历史数据"
lines = []
for skill_type, task_type, avg_score in rows:
lines.append(f"- {skill_type} 用于 {task_type}:平均得分 {avg_score:.2f}")
return "\n".join(lines)
组装:完整的 PlannerAgent
现在把三层合在一起:
class PlannerAgent:
def __init__(
self,
planner_llm,
judge_llm,
tool_router: ToolRouter,
):
# 初始化各层组件
self.skill_registry = SkillRegistry()
self.planner = Planner(planner_llm, self.skill_registry)
evaluator = LLMEvaluator(judge_llm)
context_manager = ContextManager(max_tokens=4096)
state_manager = StateManager()
tracer = Tracer()
self.harness = StepRunner(
tool_router=tool_router,
context_manager=context_manager,
skill_evaluator=evaluator,
state_manager=state_manager,
tracer=tracer,
)
self.max_replan_attempts = 3
def run(self, task: str) -> dict:
plan = self.planner.plan(task)
replan_count = 0
print(f"[Planner] 生成计划,共 {len(plan.steps)} 个步骤")
while not plan.is_done():
ready_steps = plan.get_ready_steps()
if not ready_steps:
# 所有 pending steps 都在等待,但没有 ready 的 → 死锁
break
# 可以并行执行没有互相依赖的 ready steps
# 简化版:串行执行
for step in ready_steps:
print(f"[Harness] 执行 step: {step.id} ({step.skill_type})")
result = self.harness.run(step, plan.get_context())
if result.eval_passed:
# ✅ 成功:更新计划 + 记录到 registry
plan.mark_done(step, result)
self.skill_registry.record(
skill_type=step.skill_type,
task_type=step.task_type,
score=result.skill_score,
reason=result.eval_reason
)
print(f"[Eval] step {step.id} 通过,score={result.skill_score:.2f}")
else:
# ❌ 失败:带着 eval reason 重新规划
print(f"[Eval] step {step.id} 失败,reason={result.eval_reason}")
self.skill_registry.record(
skill_type=step.skill_type,
task_type=step.task_type,
score=result.skill_score,
reason=result.eval_reason
)
if replan_count >= self.max_replan_attempts:
print(f"[Planner] 已达最大 replan 次数,终止")
plan.mark_failed(step, result.eval_reason)
continue
# replan 时传入失败原因,让 LLM 换一种策略
plan = self.planner.replan(task, plan, step, result.eval_reason)
replan_count += 1
print(f"[Planner] 重新规划 (第 {replan_count} 次)")
break # 重新进入 while 循环,用新 plan
# 汇总结果
return self._summarize(plan)
def _summarize(self, plan: Plan) -> dict:
done_steps = [s for s in plan.steps if s.status == "done"]
failed_steps = [s for s in plan.steps if s.status == "failed"]
avg_score = mean([s.eval_score for s in done_steps]) if done_steps else 0.0
return {
"completed_steps": len(done_steps),
"failed_steps": len(failed_steps),
"average_skill_score": avg_score,
"outputs": {s.id: s.output for s in done_steps}
}
完整数据流
任务输入
↓
Planner.plan(task)
├─ 查 SkillRegistry 获取历史表现 hint
└─ LLM 生成 Plan(Step DAG)
↓
plan.get_ready_steps()
└─ 基于依赖图找可执行的 steps
↓
Harness.run(step, plan_context)
├─ ContextManager: 注入压缩后的相关上下文
├─ ToolRouter: 路由到正确的 Executor
├─ Executor: 实际执行工具调用(带 retry)
├─ SkillEvaluator: 对输出质量打分
├─ StateManager: 持久化结果
└─ Tracer: 记录全链路 trace
↓
StepResult(output, eval_score, eval_passed)
↓
eval_passed?
├─ YES → plan.mark_done + SkillRegistry.record(+score)
│ → 回到 get_ready_steps()
└─ NO → SkillRegistry.record(-score)
→ Planner.replan(task, plan, eval_reason)
↑
LLM 看到失败原因,换一种 skill 策略
和现实工程的对应
这套架构不是理论,它对应的是实际的工程模块:
| 你可能在做的工程工作 | 对应这里的哪层 |
|---|---|
| WebSocket relay / MCP bridge | Harness → ToolRouter(Layer 2) |
| E2B sandbox gateway | Harness → ToolRouter + Error Layer(Layer 2, 4) |
| Pitfall Registry | Harness → StateManager + SkillRegistry 的前身(Layer 1) |
| Task trace system | Harness → Tracer(Layer 5) |
| 场景序列化/反序列化 | Harness → StateManager(Layer 1) |
Pitfall Registry 和 SkillRegistry 之间只差一步:把人工写入的经验改成 evaluator 自动打分写入。schema 加一个 eval_score 字段,让 Harness 的 evaluator 在执行后自动 record(),就从"经验库"升级成了"skill performance database",Planner 就可以基于数据做决策而不是靠 LLM 猜。
一些工程取舍
Evaluator 选哪种?
- 输出有明确 schema(JSON、代码、数值)→ Rule-based,省钱省时
- 需要语义理解(报告质量、推理正确性)→ LLM-as-judge
- 有 ground truth(科学计算、数学验证)→ Reference evaluator
- 生产系统 → 三种组合用,分层 fallback
SkillRegistry 的冷启动问题
系统初始没有历史数据,best_skill_for() 返回空。解法:
- 用人工标注的初始数据 seed
- 冷启动期间让 Planner LLM 完全自主决策
- 设置
HAVING sample_count >= 3这样的最低样本门槛,数据不够时不给建议
Replan 的终止条件
无限 replan 会无限花钱。max_replan_attempts 是硬限制,但更好的做法是:如果连续两次 replan 用了相同的 skill 且都失败,直接终止并报告"this task may be infeasible with available skills"。
结语
Planner 解决"做什么",Harness 解决"怎么可靠地做每一步",Skill Evaluation 解决"做得好不好,怎么越做越好"。
三层之间的接口很简单:
- Planner → Harness:
run_step(step, context) → StepResult - Harness → Evaluator:
evaluate(skill_type, step, context, output) → EvalResult - Evaluator → SkillRegistry:
record(skill_type, task_type, score) - SkillRegistry → Planner:
get_hints() → str
接口稳定,每层可以独立迭代。这是让 agent 系统真正可维护的前提。