Planner, Harness, Skill Evaluation:构建工业级 Agent 系统的三层架构

March 11, 2026 · Tech Blog

大多数 Agent 教程教的是怎么让 LLM 调工具。这篇文章讲的是怎么让 Agent 系统在生产环境里可靠地跑起来。


为什么 Naive Agent 在生产环境会炸

一个最简单的 agent 实现长这样:

def run(task: str):
    history = []
    while True:
        response = llm(task + str(history))
        action = parse_action(response)
        if action.type == "finish":
            return action.output
        result = execute_tool(action)
        history.append(result)

这个东西在 demo 里能跑。在生产里它的问题是:

  1. 没有显式的任务分解:所有规划都隐式地藏在 LLM 的 context 里,不可见、不可控
  2. 执行层没有防护:工具超时、输出格式错误、上下文污染,全靠 LLM 自救
  3. 失败不可恢复:出错了只能整个重来,之前所有的工作全丢
  4. 没有质量信号tool call succeeded 不等于 output is useful
  5. 上下文越来越长:history 无限增长,LLM 注意力开始漂移

工业级的解法是三层架构:Planner + Harness + Skill Evaluation


第一层:Planner——把任务变成可调度的结构

核心差异:隐式规划 vs 显式规划

普通 agent 是隐式规划,计划藏在 LLM 的脑子里。Planner agent 的核心是把计划变成一个一等公民的数据结构

from dataclasses import dataclass, field
from typing import List, Literal, Any, Optional

@dataclass
class Step:
    id: str
    description: str
    skill_type: str                    # 用哪种 skill 来执行
    task_type: str                     # 任务的语义类型,用于 skill 选择
    dependencies: List[str]            # 依赖哪些 step 的输出
    success_criteria: str              # 什么叫"成功",给 evaluator 用
    status: Literal["pending", "running", "done", "failed"] = "pending"
    output: Any = None
    eval_score: float = 0.0

@dataclass
class Plan:
    steps: List[Step]

    def get_ready_steps(self) -> List[Step]:
        """返回所有依赖已完成、自己还没开始的 step"""
        done_ids = {s.id for s in self.steps if s.status == "done"}
        return [
            s for s in self.steps
            if s.status == "pending"
            and all(dep in done_ids for dep in s.dependencies)
        ]

    def is_done(self) -> bool:
        return all(s.status in ("done", "failed") for s in self.steps)

    def get_context(self) -> dict:
        """把已完成 step 的输出整理成 executor 可用的上下文"""
        return {
            s.id: s.output
            for s in self.steps
            if s.status == "done"
        }

    def mark_done(self, step: Step, result: "StepResult"):
        step.status = "done"
        step.output = result.output
        step.eval_score = result.skill_score

    def mark_failed(self, step: Step, reason: str):
        step.status = "failed"
        step.output = reason

Plan 是一个有向无环图(DAG)。dependencies 字段定义了 step 之间的数据依赖关系,这是普通 agent 完全没有的东西。

Planner LLM:只做规划,不做执行

class Planner:
    def __init__(self, llm, skill_registry):
        self.llm = llm
        self.skill_registry = skill_registry

    def plan(self, task: str) -> Plan:
        # 查 skill registry,给 LLM 提供历史表现数据
        skill_hints = self.skill_registry.get_hints()

        prompt = f"""
你是一个任务规划器。将以下任务分解为可执行的步骤,每个步骤必须明确指定:
- 依赖哪些前置步骤
- 使用什么 skill 类型
- 成功标准是什么

任务: {task}

可用 skill 类型及历史表现:
{skill_hints}

返回 JSON 格式:
{{
  "steps": [
    {{
      "id": "step_1",
      "description": "...",
      "skill_type": "web_search | code_gen | data_analysis | ...",
      "task_type": "...",
      "dependencies": [],
      "success_criteria": "..."
    }}
  ]
}}
"""
        raw = self.llm(prompt)
        data = json.loads(raw)
        return Plan(steps=[Step(**s) for s in data["steps"]])

    def replan(self, task: str, plan: Plan, failed_step: Step, reason: str) -> Plan:
        """当某个 step 失败时,带着失败原因重新规划"""
        completed = [s for s in plan.steps if s.status == "done"]

        prompt = f"""
任务: {task}

已完成的步骤:
{json.dumps([{"id": s.id, "description": s.description, "output_summary": str(s.output)[:200]} for s in completed])}

失败的步骤:
- 描述: {failed_step.description}
- 失败原因: {reason}

请基于已完成的工作,重新规划剩余步骤。避免重复已失败的方式。

返回 JSON(只需规划未完成的部分):
"""
        raw = self.llm(prompt)
        data = json.loads(raw)

        # 保留已完成的 steps,替换失败的和待执行的
        new_steps = completed + [Step(**s) for s in data["steps"]]
        return Plan(steps=new_steps)

第二层:Harness——让每一步可靠地执行

Planner 知道"做什么",但不管"怎么可靠地做"。这是 Harness 的职责。

Harness 不是一个函数,是一套分层的执行基础设施

┌─────────────────────────────────┐
│  5. Observability Layer         │  trace、log、metrics
├─────────────────────────────────┤
│  4. Error & Recovery Layer      │  retry、fallback、replan 触发
├─────────────────────────────────┤
│  3. Context Management Layer    │  注入什么 context,压缩多少
├─────────────────────────────────┤
│  2. Tool Routing Layer          │  step 路由到哪个 executor
├─────────────────────────────────┤
│  1. State Management Layer      │  step 结果的存储和传递
└─────────────────────────────────┘

StepResult:Harness 和 Planner 之间的契约

@dataclass
class StepResult:
    output: Any
    status: Literal["success", "failed"]
    skill_score: float = 0.0          # 来自 Skill Evaluator
    eval_reason: str = ""             # 为什么这个分
    eval_passed: bool = False         # 是否通过质量阈值
    retry_count: int = 0
    duration_ms: int = 0
    error: Optional[str] = None

StepRunner:五层的具体实现

import time
from typing import Dict, Type

class StepRunner:
    def __init__(
        self,
        tool_router: "ToolRouter",
        context_manager: "ContextManager",
        skill_evaluator: "SkillEvaluator",
        state_manager: "StateManager",
        tracer: "Tracer",
    ):
        self.tool_router = tool_router
        self.context_manager = context_manager
        self.skill_evaluator = skill_evaluator
        self.state_manager = state_manager
        self.tracer = tracer

        self.quality_threshold = 0.6    # eval score 低于这个触发 replan
        self.max_retries = 3

    def run(self, step: Step, plan_context: dict) -> StepResult:
        start = time.time()

        # === Layer 3: Context Management ===
        enriched_context = self.context_manager.build(step, plan_context)

        # === Layer 5: Observability - begin ===
        trace_id = self.tracer.begin(step, enriched_context)

        try:
            # === Layer 4: Error & Recovery ===
            raw_result = self._execute_with_retry(step, enriched_context)

            # === Skill Evaluation(挂在执行之后,return 之前)===
            eval_result = self.skill_evaluator.evaluate(
                skill_type=step.skill_type,
                step=step,
                context=enriched_context,
                output=raw_result
            )

            result = StepResult(
                output=raw_result,
                status="success" if eval_result.score >= self.quality_threshold else "failed",
                skill_score=eval_result.score,
                eval_reason=eval_result.reason,
                eval_passed=eval_result.score >= self.quality_threshold,
                duration_ms=int((time.time() - start) * 1000)
            )

            # === Layer 1: State Management ===
            self.state_manager.save(step.id, result)

            # === Layer 5: Observability - end ===
            self.tracer.end(trace_id, result)

            return result

        except Exception as e:
            result = StepResult(
                output=None, status="failed",
                error=str(e),
                duration_ms=int((time.time() - start) * 1000)
            )
            self.tracer.end(trace_id, result)
            return result

    def _execute_with_retry(self, step: Step, context: dict, attempt: int = 0):
        """Layer 4: 带错误分类的重试逻辑"""
        try:
            executor = self.tool_router.get_executor(step.skill_type)  # Layer 2
            return executor.run(step, context)

        except ToolTimeoutError:
            if attempt >= self.max_retries:
                raise
            # 指数退避
            time.sleep(2 ** attempt)
            return self._execute_with_retry(step, context, attempt + 1)

        except OutputFormatError as e:
            # 格式错误,重试没意义,直接抛给上层
            raise

        except RateLimitError:
            if attempt >= self.max_retries:
                raise
            time.sleep(10 * (attempt + 1))
            return self._execute_with_retry(step, context, attempt + 1)

ContextManager:控制每个 step 看到什么

这是经常被忽视但非常重要的一层:

class ContextManager:
    def __init__(self, max_tokens: int = 4096):
        self.max_tokens = max_tokens

    def build(self, step: Step, plan_context: dict) -> dict:
        """
        不是把所有 plan_context 都塞给 executor
        只注入这个 step 真正需要的信息
        """
        # 只取 dependencies 指定的上游输出
        relevant_outputs = {
            dep_id: self._compress(plan_context.get(dep_id))
            for dep_id in step.dependencies
        }

        return {
            "task": step.description,
            "success_criteria": step.success_criteria,
            "upstream_outputs": relevant_outputs,
            "skill_type": step.skill_type,
        }

    def _compress(self, output: Any, max_chars: int = 2000) -> Any:
        """防止 context 爆炸"""
        if output is None:
            return None
        text = str(output)
        if len(text) > max_chars:
            return text[:max_chars] + f"\n... [truncated, {len(text)} chars total]"
        return output

ToolRouter:Layer 2 的具体实现

class ToolRouter:
    def __init__(self):
        self._registry: Dict[str, Type["BaseExecutor"]] = {}

    def register(self, skill_type: str, executor_class: Type["BaseExecutor"]):
        self._registry[skill_type] = executor_class

    def get_executor(self, skill_type: str) -> "BaseExecutor":
        if skill_type not in self._registry:
            raise ValueError(f"No executor for skill_type: {skill_type}")
        return self._registry[skill_type]()

# Executor 接口
class BaseExecutor:
    def run(self, step: Step, context: dict) -> Any:
        raise NotImplementedError

class WebSearchExecutor(BaseExecutor):
    def run(self, step: Step, context: dict) -> Any:
        query = context["task"]
        return search_web(query)  # 具体工具调用

class CodeGenExecutor(BaseExecutor):
    def run(self, step: Step, context: dict) -> Any:
        return llm_generate_code(context)

第三层:Skill Evaluation——质量信号闭环

status == "success" 不等于 output is useful。这是 Skill Evaluation 存在的根本原因。

Evaluator 的三种形态

形态 1:Rule-based(轻量,适合有明确 schema 的输出)

class RuleBasedEvaluator:
    def evaluate(self, skill_type: str, step: Step, context: dict, output: Any) -> "EvalResult":

        if skill_type == "web_search":
            results = output.get("results", [])
            score = min(len(results) / 5, 1.0)  # 基础分:结果数量

            # 检查关键词覆盖
            required_kw = self._extract_keywords(step.description)
            output_text = str(output).lower()
            coverage = sum(1 for kw in required_kw if kw in output_text) / max(len(required_kw), 1)
            score = score * 0.3 + coverage * 0.7

            return EvalResult(score=score, reason=f"result_count={len(results)}, kw_coverage={coverage:.2f}")

        if skill_type == "code_gen":
            code = output.get("code", "")
            try:
                compile(code, "<string>", "exec")
                score = 1.0
            except SyntaxError as e:
                score = 0.0
                return EvalResult(score=score, reason=f"syntax_error: {e}")

            # 额外检查:代码是否有实质内容
            if len(code.strip().split("\n")) < 3:
                score = 0.3
            return EvalResult(score=score, reason="syntax_ok")

        return EvalResult(score=0.5, reason="no_rule_for_skill_type")

形态 2:LLM-as-judge(主流,适合语义质量评估)

class LLMEvaluator:
    def __init__(self, judge_llm):
        self.judge_llm = judge_llm

    def evaluate(self, skill_type: str, step: Step, context: dict, output: Any) -> "EvalResult":
        prompt = f"""
你是一个严格的 agent step 评估器。

任务描述: {step.description}
成功标准: {step.success_criteria}
实际输出: {str(output)[:1500]}

请评估这个输出是否满足任务的成功标准。

评分标准:
- 1.0: 完全满足成功标准
- 0.7-0.9: 基本满足,有小缺陷
- 0.4-0.6: 部分满足,有明显不足
- 0.0-0.3: 基本不满足成功标准

返回 JSON(只返回 JSON,不要其他内容):
{{"score": 0.0-1.0, "reason": "一句话说明", "missing": ["缺少的关键信息"]}}
"""
        raw = self.judge_llm(prompt)
        data = json.loads(raw)
        return EvalResult(**data)

形态 3:带 reference 的 evaluator(科学计算场景)

class ReferenceEvaluator:
    """
    当有 ground truth 时使用。
    适合科学 AI 场景:分子性质预测、数值计算验证等。
    """
    def evaluate(self, skill_type: str, step: Step, context: dict, output: Any) -> "EvalResult":
        if not step.expected_output:
            # 没有 reference,降级到 LLM judge
            return self.llm_evaluator.evaluate(skill_type, step, context, output)

        score = self._compute_similarity(output, step.expected_output)
        return EvalResult(
            score=score,
            reason=f"similarity_to_reference={score:.3f}"
        )

    def _compute_similarity(self, output, reference) -> float:
        # 根据 skill_type 选择合适的相似度计算方式
        # 数值型:1 - abs(output - reference) / abs(reference)
        # 结构型:embedding cosine similarity
        # 文本型:BLEU / semantic similarity
        pass

SkillRegistry:让系统越跑越好

Evaluator 产生的分数不应该只被用一次就丢掉,它应该积累成 skill 的历史表现数据:

import sqlite3
from collections import defaultdict
from statistics import mean

class SkillRegistry:
    """
    记录每个 skill_type 在每类 task_type 上的历史 eval score。
    Planner 在规划时可以查询,选择历史表现最好的 skill。
    """

    def __init__(self, db_path: str = "skill_registry.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS skill_scores (
                skill_type TEXT,
                task_type TEXT,
                score REAL,
                eval_reason TEXT,
                timestamp INTEGER
            )
        """)
        conn.commit()
        conn.close()

    def record(self, skill_type: str, task_type: str, score: float, reason: str = ""):
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "INSERT INTO skill_scores VALUES (?, ?, ?, ?, ?)",
            (skill_type, task_type, score, reason, int(time.time()))
        )
        conn.commit()
        conn.close()

    def best_skill_for(self, task_type: str, top_k: int = 3) -> List[dict]:
        """返回在这类任务上历史表现最好的 skill"""
        conn = sqlite3.connect(self.db_path)
        rows = conn.execute("""
            SELECT skill_type, AVG(score) as avg_score, COUNT(*) as sample_count
            FROM skill_scores
            WHERE task_type = ?
            GROUP BY skill_type
            HAVING sample_count >= 3
            ORDER BY avg_score DESC
            LIMIT ?
        """, (task_type, top_k)).fetchall()
        conn.close()

        return [
            {"skill_type": r[0], "avg_score": r[1], "sample_count": r[2]}
            for r in rows
        ]

    def get_hints(self) -> str:
        """给 Planner LLM 的 skill 表现摘要"""
        conn = sqlite3.connect(self.db_path)
        rows = conn.execute("""
            SELECT skill_type, task_type, AVG(score) as avg_score
            FROM skill_scores
            GROUP BY skill_type, task_type
            ORDER BY avg_score DESC
        """).fetchall()
        conn.close()

        if not rows:
            return "暂无历史数据"

        lines = []
        for skill_type, task_type, avg_score in rows:
            lines.append(f"- {skill_type} 用于 {task_type}:平均得分 {avg_score:.2f}")
        return "\n".join(lines)

组装:完整的 PlannerAgent

现在把三层合在一起:

class PlannerAgent:
    def __init__(
        self,
        planner_llm,
        judge_llm,
        tool_router: ToolRouter,
    ):
        # 初始化各层组件
        self.skill_registry = SkillRegistry()
        self.planner = Planner(planner_llm, self.skill_registry)

        evaluator = LLMEvaluator(judge_llm)
        context_manager = ContextManager(max_tokens=4096)
        state_manager = StateManager()
        tracer = Tracer()

        self.harness = StepRunner(
            tool_router=tool_router,
            context_manager=context_manager,
            skill_evaluator=evaluator,
            state_manager=state_manager,
            tracer=tracer,
        )

        self.max_replan_attempts = 3

    def run(self, task: str) -> dict:
        plan = self.planner.plan(task)
        replan_count = 0

        print(f"[Planner] 生成计划,共 {len(plan.steps)} 个步骤")

        while not plan.is_done():
            ready_steps = plan.get_ready_steps()

            if not ready_steps:
                # 所有 pending steps 都在等待,但没有 ready 的 → 死锁
                break

            # 可以并行执行没有互相依赖的 ready steps
            # 简化版:串行执行
            for step in ready_steps:
                print(f"[Harness] 执行 step: {step.id} ({step.skill_type})")

                result = self.harness.run(step, plan.get_context())

                if result.eval_passed:
                    # ✅ 成功:更新计划 + 记录到 registry
                    plan.mark_done(step, result)
                    self.skill_registry.record(
                        skill_type=step.skill_type,
                        task_type=step.task_type,
                        score=result.skill_score,
                        reason=result.eval_reason
                    )
                    print(f"[Eval] step {step.id} 通过,score={result.skill_score:.2f}")

                else:
                    # ❌ 失败:带着 eval reason 重新规划
                    print(f"[Eval] step {step.id} 失败,reason={result.eval_reason}")
                    self.skill_registry.record(
                        skill_type=step.skill_type,
                        task_type=step.task_type,
                        score=result.skill_score,
                        reason=result.eval_reason
                    )

                    if replan_count >= self.max_replan_attempts:
                        print(f"[Planner] 已达最大 replan 次数,终止")
                        plan.mark_failed(step, result.eval_reason)
                        continue

                    # replan 时传入失败原因,让 LLM 换一种策略
                    plan = self.planner.replan(task, plan, step, result.eval_reason)
                    replan_count += 1
                    print(f"[Planner] 重新规划 (第 {replan_count} 次)")
                    break  # 重新进入 while 循环,用新 plan

        # 汇总结果
        return self._summarize(plan)

    def _summarize(self, plan: Plan) -> dict:
        done_steps = [s for s in plan.steps if s.status == "done"]
        failed_steps = [s for s in plan.steps if s.status == "failed"]
        avg_score = mean([s.eval_score for s in done_steps]) if done_steps else 0.0

        return {
            "completed_steps": len(done_steps),
            "failed_steps": len(failed_steps),
            "average_skill_score": avg_score,
            "outputs": {s.id: s.output for s in done_steps}
        }

完整数据流

任务输入
   ↓
Planner.plan(task)
   ├─ 查 SkillRegistry 获取历史表现 hint
   └─ LLM 生成 Plan(Step DAG)
        ↓
plan.get_ready_steps()
   └─ 基于依赖图找可执行的 steps
        ↓
Harness.run(step, plan_context)
   ├─ ContextManager: 注入压缩后的相关上下文
   ├─ ToolRouter: 路由到正确的 Executor
   ├─ Executor: 实际执行工具调用(带 retry)
   ├─ SkillEvaluator: 对输出质量打分
   ├─ StateManager: 持久化结果
   └─ Tracer: 记录全链路 trace
        ↓
StepResult(output, eval_score, eval_passed)
        ↓
   eval_passed?
   ├─ YES → plan.mark_done + SkillRegistry.record(+score)
   │         → 回到 get_ready_steps()
   └─ NO  → SkillRegistry.record(-score)
             → Planner.replan(task, plan, eval_reason)
                  ↑
       LLM 看到失败原因,换一种 skill 策略

和现实工程的对应

这套架构不是理论,它对应的是实际的工程模块:

你可能在做的工程工作对应这里的哪层
WebSocket relay / MCP bridgeHarness → ToolRouter(Layer 2)
E2B sandbox gatewayHarness → ToolRouter + Error Layer(Layer 2, 4)
Pitfall RegistryHarness → StateManager + SkillRegistry 的前身(Layer 1)
Task trace systemHarness → Tracer(Layer 5)
场景序列化/反序列化Harness → StateManager(Layer 1)

Pitfall Registry 和 SkillRegistry 之间只差一步:把人工写入的经验改成 evaluator 自动打分写入。schema 加一个 eval_score 字段,让 Harness 的 evaluator 在执行后自动 record(),就从"经验库"升级成了"skill performance database",Planner 就可以基于数据做决策而不是靠 LLM 猜。


一些工程取舍

Evaluator 选哪种?

  • 输出有明确 schema(JSON、代码、数值)→ Rule-based,省钱省时
  • 需要语义理解(报告质量、推理正确性)→ LLM-as-judge
  • 有 ground truth(科学计算、数学验证)→ Reference evaluator
  • 生产系统 → 三种组合用,分层 fallback

SkillRegistry 的冷启动问题

系统初始没有历史数据,best_skill_for() 返回空。解法:

  1. 用人工标注的初始数据 seed
  2. 冷启动期间让 Planner LLM 完全自主决策
  3. 设置 HAVING sample_count >= 3 这样的最低样本门槛,数据不够时不给建议

Replan 的终止条件

无限 replan 会无限花钱。max_replan_attempts 是硬限制,但更好的做法是:如果连续两次 replan 用了相同的 skill 且都失败,直接终止并报告"this task may be infeasible with available skills"。


结语

Planner 解决"做什么",Harness 解决"怎么可靠地做每一步",Skill Evaluation 解决"做得好不好,怎么越做越好"。

三层之间的接口很简单:

  • Planner → Harness:run_step(step, context) → StepResult
  • Harness → Evaluator:evaluate(skill_type, step, context, output) → EvalResult
  • Evaluator → SkillRegistry:record(skill_type, task_type, score)
  • SkillRegistry → Planner:get_hints() → str

接口稳定,每层可以独立迭代。这是让 agent 系统真正可维护的前提。