Dify Agent 源码分析 - Dify(1)
工程能力是体现在细节上的. 比如说,基本agent的工作,在调用工具的时候,是一个一个决定调用的。在处理这些的时候,有没有更好的方法?agent是一个充满想象力的领域。
核心思想:基于“思考 -> 行动 -> 观察”的循环
Dify Agent 的工作模式是由一个精心设计的、围绕大语言模型(LLM)的循环驱动。这个循环严格遵循 ReAct (Reasoning and Acting) 框架,其核心步骤如下:
-
思考 (Thought): LLM 分析当前的目标和所有已知信息(包括用户问题、历史对话、以及上一步工具调用的结果),生成下一步的行动计划。
-
行动 (Action): LLM 提出一个明确的、需要执行的动作。这通常是一个需要调用的工具及其所需参数,并以特定的格式(如 JSON)输出。
-
观察 (Observation): Python 代码执行这个动作(调用工具),并将返回的结果作为“观察”到的新信息。
-
循环 (Loop): 将这个“观察”结果反馈给 LLM,开始新一轮的“思考”,直到任务完成。
这个机制使得 Agent 能够处理复杂任务、使用外部工具并从执行结果中学习和调整。
三大核心组件与代码实现
整个 Agent 的架构由三个关键的 Python 文件协同工作来支撑这个循环。
1. 上下文准备器 (The Secretary)
-
文件:
api/core/agent/cot_chat_agent_runner.py -
职责: 它的核心任务是在每一轮循环开始时,为 LLM 精心准备好所有必需的“会议材料”(Prompt)。
-
核心方法:
_organize_prompt_messages() -
代码细节:
# api/core/agent/cot_chat_agent_runner.py
def _organize_prompt_messages(self) -> list[PromptMessage]:
# 1. 准备系统指令 (System Prompt),包含工具列表和行为准则
system_message = self._organize_system_prompt()
# 2. 准备 Agent 上一轮的“草稿纸” (Thought, Action, Observation)
agent_scratchpad = self._agent_scratchpad
# ... 格式化 scratchpad 的逻辑 ...
# 3. 准备用户的当前查询
query_messages = self._organize_user_query(self._query, [])
# 4. 准备历史对话记录
historic_messages = self._organize_historic_prompt_messages(...)
# 5. 将以上所有部分组合成一个完整的消息列表,发送给 LLM
messages = [system_message, *historic_messages, *query_messages, *assistant_messages, ...]
return messages
2. 意图解析器 (The Translator)
-
文件:
api/core/agent/output_parser/cot_output_parser.py -
职责: 它是 Agent 的“耳朵”,负责实时监听 LLM 的流式输出,并从中精确地识别出代表“行动”的信号,将其从非结构化的文本翻译成结构化的指令。
-
核心方法:
handle_react_stream_output() -
代码细节:
该方法通过一个状态机来工作。
- 定义信号: 它寻找的“暗号”是硬编码的字符串。
# api/core/agent/output_parser/cot_output_parser.py
action_str = "action:"
thought_str = "thought:"
- 捕捉信号: 通过逐字匹配来确认 LLM 的意图。
# api/core/agent/output_parser/cot_output_parser.py
# 伪代码
for char in llm_stream:
if current_sequence + char completes "action:":
# 信号确认!
# 进入“提取JSON”状态
else:
# 作为普通Thought文本处理
- 翻译指令: 提取出 JSON 后,调用
parse_action将其转换为Action对象。
# api/core/agent/output_parser/cot_output_parser.py
def parse_action(action):
# ...
return AgentScratchpadUnit.Action(
action_name=action_name,
action_input=action_input,
)
3. 执行引擎 (The Executor)
-
文件:
api/core/agent/cot_agent_runner.py -
职责: 这是整个 Agent 的心脏和项目经理。它负责驱动“思考->行动->观察”的循环,调用其他组件,并最终执行工具。
-
核心方法:
run() -
代码细节:
run()方法中的while循环是整个流程的核心。
# api/core/agent/cot_agent_runner.py
def run(...):
# ... 初始化 ...
while function_call_state and iteration_step <= max_iteration_steps:
# 1. 准备上下文 (调用 Secretary)
prompt_messages = self._organize_prompt_messages()
# 2. 调用 LLM
chunks = model_instance.invoke_llm(...)
# 3. 解析意图 (调用 Translator)
react_chunks = CotAgentOutputParser.handle_react_stream_output(...)
# 创建一个用于本次循环的、只能容纳单个Action的草稿纸
scratchpad = AgentScratchpadUnit(...)
for chunk in react_chunks:
if isinstance(chunk, AgentScratchpadUnit.Action):
# 将解析出的Action存入草稿纸(若有多个,则会覆盖)
scratchpad.action = chunk
# 4. 执行任务 (如果解析出了Action)
if scratchpad.action and scratchpad.action.action_name != "Final Answer":
tool_invoke_response, ... = self._handle_invoke_action(
action=scratchpad.action, # 传入单个Action
...
)
# 5. 观察:将工具结果存入草稿纸,用于下一轮循环
scratchpad.observation = tool_invoke_response
# ... 循环结束,返回最终答案 ...
两个关键问题的代码溯源
1. Agent 如何以及何时决定调用 Tool?
-
决策者: LLM。它在“思考”后,认为需要外部信息或能力来完成任务时,就会决定调用工具。
-
决策体现: LLM 会生成一段包含
Action:关键字和相应 JSON 载荷的文本。 -
决策识别点:
cot_output_parser.py中的handle_react_stream_output方法通过状态机逐字匹配Action:字符串,一旦匹配成功,就认为决策已被做出,并开始提取 JSON 内容。
2. Agent 是如何调用多个 Tool 的?(一次一个 vs 一次多个)
-
答案: 严格的串行模式,一次只调用一个工具。
-
代码证据:
- 覆盖机制: 在
cot_agent_runner.py的run()方法中,scratchpad.action是一个单一对象。即使 LLM 一次性输出了多个Action,循环for chunk in react_chunks:也只会将最后一个Action保存在scratchpad.action中,前面的都会被覆盖。
# api/core/agent/cot_agent_runner.py (Line ~146)
# 这个赋值操作会覆盖之前的值,确保了单一性
scratchpad.action = action
- 单一执行:
_handle_invoke_action方法的签名明确要求传入一个action对象,而非列表,从设计上就限制了并行执行。
- 工作流: Agent 调用第一个工具 -> 看到结果 (
Observation) -> 将结果作为新信息进行第二轮思考 -> 决定是否需要调用第二个工具。