创世网站建设,动画素材,建设多用户网站,wap免费建站程序简介
本文详细介绍了基于LangGraph 1.0实现的人机协同(HITL)系统#xff0c;重点解析了Interrupt机制的工作原理与实现方法。文章从系统架构设计到前后端代码实现#xff0c;全面展示了如何构建工业级人机交互系统#xff0c;以智能设备维护预测为应用场景#xff0c;实现…简介本文详细介绍了基于LangGraph 1.0实现的人机协同(HITL)系统重点解析了Interrupt机制的工作原理与实现方法。文章从系统架构设计到前后端代码实现全面展示了如何构建工业级人机交互系统以智能设备维护预测为应用场景实现设备监控、故障预测、维护规划及工程师审批的完整流程。项目包含完整源码适合大模型应用开发学习与实践。摘要人机协同Human-in-the-Loop, HITL是现代AI系统中的关键能力它允许在AI工作流的特定节点插入人工干预实现更精准、更可靠的业务流程。本文将深入分析一个基于LangGraph 1.0实现的完整HITL系统从架构设计到代码实现从原理剖析到最佳实践全方位讲解如何在实际项目中构建工业级的人机交互系统。目录HITL系统概述系统架构设计LangGraph Interrupt机制深度解析后端实现工作流与状态管理前端实现状态同步与交互界面关键代码深度剖析开发要点与最佳实践总结项目源代码下载一、HITL系统概述1.1 什么是HITLHuman-in-the-Loop人机协同是一种将人工智能与人类智慧相结合的系统设计模式。在HITL系统中AI系统可以在关键决策点暂停执行等待人类专家的输入、审批或修改然后将人类反馈整合到后续流程中。HITL的核心价值在于结合了AI的效率优势和人类的判断能力特别适用于高风险、高精度要求的业务场景如医疗诊断、金融风控、工业质检等领域。1.2 业务场景本项目以智能设备维护预测为应用场景展示了完整的HITL流程图1智能设备维护预测HITL流程在这个场景中系统会持续监控设备传感器数据通过AI分析预测潜在故障风险。当检测到高风险情况时系统会暂停并等待工程师审批而低风险情况则可以自动执行维护计划。这种设计既保证了高风险决策的可靠性又提高了整体流程的效率。1.3 系统界面展示系统首页待审批页面初始为空智能体流程中断提示人类审批待审批页面显示审批请求人类审批人类审批后智能体流程恢复执行完成针对所有自学遇到困难的同学们我帮大家系统梳理大模型学习脉络将这份LLM大模型资料分享出来包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 有需要的小伙伴可以扫描下方二维码领取↓↓↓二、系统架构设计2.1 整体架构系统采用分层架构设计各层职责明确通过标准化接口通信确保系统的可维护性和可扩展性。核心的HITL逻辑由LangGraph实现负责工作流的定义、执行、中断和恢复。图2系统整体架构图2.2 核心组件说明组件职责技术栈前端界面用户交互、状态展示、实时更新React 18 TypeScript Vite审批API服务器前后端通信桥梁Express.js Node.js工作流引擎业务逻辑执行、HITL控制LangGraph 1.0 FastAPI状态管理工作流状态持久化Checkpointer SQLite通信机制状态同步HTTP Polling架构设计考量系统采用前后端分离架构通过API进行通信使得前后端可以独立开发、测试和部署。核心的工作流逻辑与业务API分离提高了代码的复用性和可维护性。三、 LangGraph Interrupt机制深度解析3.1 什么是InterruptInterrupt是LangGraph 1.0引入的强大机制允许在节点执行过程中暂停工作流等待外部事件如人工审批后再恢复执行。Interrupt机制是实现HITL的核心技术它突破了传统工作流一跑到底的限制使得工作流可以在任意节点暂停和恢复为人类干预提供了可能。3.2 工作原理Interrupt机制的工作流程可以分为以下几个关键步骤工作流执行客户端触发工作流API调用LangGraph的invoke方法开始执行节点处理工作流按定义的节点顺序执行直到遇到中断节点触发中断中断节点调用interrupt()方法工作流暂停执行状态保存Checkpointer自动保存当前工作流状态到持久化存储等待审批前端收到中断状态展示审批界面等待人工输入提交决策人类专家完成审批前端提交审批结果恢复执行API调用带有resume参数的invoke方法工作流从中断处恢复处理结果节点继续执行处理审批结果完成后续流程图3Interrupt/Resume机制时序图深入理解为什么 LangGraph 能精准“冻结”许多开发者会好奇为什么 interrupt() 调用后整个 Python 函数能像按了暂停键一样并在几小时甚至几天后恢复状态快照State Snapshots每当工作流到达一个节点或触发 interrupt 时Checkpointer 会将当前的 State 字典序列化并存入数据库如 SQLite。这个快照包含了当前所有变量的副本。线程隔离与恢复通过 thread_idLangGraph 能在数据库中定位唯一的执行历史。当你调用 resume 时系统并不是重新运行整个函数而是**重放Replay**历史状态并将 resume 的输入直接注入到 interrupt() 的返回值中。不可变性原则这种设计要求 State 中的数据必须是可序列化的 JSON 对象这也就是为什么我们在代码中强调需要 clean_result_for_json 的原因。3.3 关键特性特性1状态自动持久化from langgraph.checkpoint.sqlite import SqliteSaverfrom langgraph.graph import StateGraph# 配置Checkpointercheckpointer SqliteSaver.from_conn_string(sqlite:///state.db)# 构建工作流时指定checkpointerbuilder StateGraph(state_schema)builder.compile( checkpointercheckpointer, interrupt_before[engineer_review], # 可选在指定节点前中断 interrupt_after[engineer_review] # 可选在指定节点后中断)技术解析Checkpointer是LangGraph提供的状态持久化机制支持多种存储后端如SQLite、PostgreSQL、Redis等。当工作流中断时Checkpointer会自动保存当前状态包括当前节点、状态数据、执行历史等关键信息为后续恢复提供完整的上下文。特性2线程级状态隔离# 每次调用都使用唯一线程IDthread_id fworkflow_{equipment_id}_{int(time.time())}# 工作流执行result graph.invoke( initial_state, config{ configurable: {thread_id: thread_id}, # 或者使用resume参数恢复 resume: human_decision })技术解析thread_id是工作流实例的唯一标识类似于会话ID。通过为每个工作流实例分配唯一的thread_idCheckpointer可以区分不同的工作流状态避免状态混淆。在分布式环境中thread_id还可以用于跨服务节点定位工作流实例。特性3可中断节点from langgraph.types import Commanddef engineer_review_node(self, state: Dict[str, Any]) - Command: 工程师审查节点 - 可中断 # ... 业务逻辑 ... # 触发中断等待人工审批 human_decision interrupt(review_request) # 工作流恢复时从此处继续 # human_decision包含审批结果 # 返回下一个节点 return Command( gotoexecution_feedback, resumehuman_decision )设计优势在节点内部触发中断的方式提供了最大的灵活性可以根据业务逻辑动态决定是否需要中断而不是静态配置在节点边界。这使得系统可以实现更复杂的HITL逻辑如基于风险评估结果动态决定是否需要人工干预。四、后端实现工作流与状态管理4.1 工作流定义文件backend/app/agents/graph_builder.pyfrom langgraph.graph import StateGraph, ENDfrom langgraph.types import Command, interruptfrom typing import Dict, Any, Literalclass MaintenanceGraph: 维护预测工作流图 def __init__(self): self.builder StateGraph(dict) self._setup_nodes() self._setup_edges() self.graph self.builder.compile( checkpointerSqliteSaver.from_conn_string(sqlite:///./state.db) ) def _setup_nodes(self): 注册所有节点 self.builder.add_node(equipment_monitor, self.equipment_monitor_node) self.builder.add_node(fault_prediction, self.fault_prediction_node) self.builder.add_node(maintenance_planning, self.maintenance_planning_node) self.builder.add_node(engineer_review, self.engineer_review_node) # 可中断节点 self.builder.add_node(execution_feedback, self.execution_feedback_node) self.builder.add_node(knowledge_update, self.knowledge_update_node) def engineer_review_node(self, state: Dict[str, Any]) - Command: 工程师审查节点 - HITL核心实现 这个节点是整个HITL系统的关键 1. 构建审查请求 2. 通知前端审批API 3. 触发interrupt暂停工作流 4. 等待人工审批 5. 恢复后处理审批结果 equipment_id state.get(equipment_id) maintenance_plan state.get(maintenance_plan) # 构建审查请求 review_request { type: maintenance_plan_review, equipment_id: equipment_id, plan: maintenance_plan, options: { accept: 批准此维护计划, edit: 修改计划内容, feedback: 提供改进建议 } } # 通知前端审批API可选 try: self._notify_frontend(review_request) except Exception as e: print(f⚠️ 通知前端失败: {e}) # ⭐ 核心触发中断等待人工审批 # 工作流将暂停在此处直到通过resume API恢复 human_decision interrupt(review_request) # ⭐ 工作流恢复后的处理逻辑 # human_decision包含审批结果 decision_type human_decision.get(type) if decision_type edit: # 处理编辑 edited_plan human_decision.get(edited_plan) state[maintenance_plan] edited_plan state[decision] edited elif decision_type feedback: # 处理反馈 feedback human_decision.get(feedback) state[feedback] feedback state[decision] feedback_provided else: # accept # 批准 state[decision] approved # 返回下一个节点 return Command( gotoexecution_feedback, resumehuman_decision ) def _notify_frontend(self, review_request: Dict[str, Any]): 通知前端审批系统 import requests response requests.post( http://localhost:3001/api/approval/request, json{ approvalId: fAPP_{review_request[equipment_id]}_{int(time.time())}, equipmentId: review_request[equipment_id], plan: review_request[plan], interruptData: review_request, notificationUrl: fhttp://localhost:5173/approval }, timeout5 ) print(f✅ 已通知前端审批服务器)工作流设计解析MaintenanceGraph类定义了一个包含6个节点的完整工作流其中engineer_review节点是实现HITL的核心。这个节点的实现包含以下关键步骤构建审查请求收集设备ID、维护计划等关键信息构建结构化的审查请求数据通知前端主动通知前端系统有新的审批请求可通过WebSocket或HTTP请求实现触发中断调用interrupt()方法暂停工作流传递审查请求数据等待并处理审批结果工作流恢复后根据审批类型批准/编辑/反馈处理结果返回下一个节点通过Command对象指定下一个要执行的节点节点间的数据传递通过state对象实现这是一个字典结构可以在工作流执行过程中持续累积和修改数据。这种设计使得节点间的通信简单直接同时保持了节点的独立性。架构视角Command 对象的妙用在早期的 LangGraph 版本中节点通常只返回更新后的状态。而在 1.0 版本中引入了 Command 对象控制权显式化通过 Command(goto“…”)节点不再仅仅是数据处理器它拥有了显式的路由控制权。Resume 语义Command 结构中的 resume 字段是专门为 HITL 设计的。它告诉工作流引擎“我已经拿到了人工干预的结果请将其作为上次中断的补偿输入并继续流转到下一个目标。”。解耦节点逻辑这种方式避免了在状态机中写死复杂的 if/else 跳转让每个节点的逻辑更加内聚。4.2 API端点实现文件backend/app/api/agent.pyfrom fastapi import APIRouter, HTTPExceptionfrom typing import Dict, Anyfrom datetime import datetimeimport jsonrouter APIRouter()# 存储工作流状态的内存字典# 生产环境应使用数据库workflow_states {}router.post(/agent/workflow/{thread_id}/resume)async def resume_workflow( thread_id: str, human_input: Dict[str, Any]): 恢复被中断的工作流 这是HITL系统的核心API 1. 接收审批决策 2. 通过Command(resume...)恢复工作流 3. 返回最终结果 try: print(f[RESUME] 收到恢复请求 - 线程ID: {thread_id}) print(f[RESUME] 人机输入: {human_input}) if thread_id not in workflow_states: raise HTTPException(status_code404, detail工作流不存在) state workflow_states[thread_id] print(f[RESUME] 当前状态: {state.get(status)}) if state.get(status) ! awaiting_human_input: raise HTTPException(status_code400, detail工作流未中断或已恢复) # 更新工作流状态 state[status] resuming state[human_input] human_input state[resumed_at] datetime.utcnow().isoformat() # ⭐ 关键使用Command(resume...)恢复工作流 # resume_value将作为interrupt()的返回值 maintenance_graph get_maintenance_graph() # 获取当前状态 current_state state.get(initial_state, {}) # 恢复工作流 # 第一个参数是当前状态 # config中的resume参数是恢复时的输入 result await maintenance_graph.graph.ainvoke( current_state, config{ configurable: {thread_id: thread_id}, resume: human_input # ⭐ 关键传递给interrupt() } ) # 更新工作流状态为完成 workflow_states[thread_id][status] completed workflow_states[thread_id][completed_at] datetime.utcnow().isoformat() # 清理结果以便JSON序列化 cleaned_result clean_result_for_json(result) return { success: True, message: 工作流已恢复并完成, data: { thread_id: thread_id, status: completed, workflow_result: cleaned_result } } except HTTPException: raise except Exception as e: error_msg f恢复工作流失败: {str(e)} print(f❌ {error_msg}) raise HTTPException(status_code500, detailerror_msg)def clean_result_for_json(result): 清理结果中的不可序列化对象 这是生产环境必须的处理 LangGraph返回的结果可能包含复杂对象如Interrupt 需要转换为JSON可序列化的格式 if result is None: return {} if not isinstance(result, dict): return result cleaned {} for key, value in result.items(): try: if key __interrupt__: # 特殊处理Interrupt对象 cleaned[key] [{type: interrupt, handled: True}] elif hasattr(value, __dict__) or Interrupt in str(type(value)): # 特殊处理所有对象类型 cleaned[key] str(value) else: # 尝试JSON序列化以验证可序列化 json.dumps(value) cleaned[key] value except (TypeError, ValueError) as e: print(f⚠️ 键 {key} 不可序列化转换为字符串: {e}) cleaned[key] str(value) return cleanedrouter.get(/agent/workflow/status_by_thread/{thread_id})async def get_workflow_status_by_thread(thread_id: str): 查询工作流状态 前端轮询此API来检测工作流是否完成 try: if thread_id not in workflow_states: return { success: True, data: { thread_id: thread_id, status: not_found, message: 工作流不存在 } } state workflow_states[thread_id] status { thread_id: thread_id, equipment_id: state.get(equipment_id), current_step: state.get(current_step), status: state.get(status), # 关键completed/awaiting_human_input created_at: state.get(created_at), awaiting_human_input: state.get(status) awaiting_human_input, completed_at: state.get(completed_at), resumed_at: state.get(resumed_at) } return { success: True, data: status } except Exception as e: raise HTTPException(status_code500, detailf获取工作流状态失败: {str(e)})API设计解析后端API层提供了两个核心接口用于实现HITL功能1. 工作流恢复API (/agent/workflow/{thread_id}/resume)该API负责接收人工审批决策并恢复被中断的工作流。其核心逻辑包括验证工作流状态确保工作流存在且处于awaiting_human_input状态更新工作流状态为resuming调用LangGraph的ainvoke方法通过config中的resume参数传递审批决策工作流完成后更新状态为completed清理结果数据确保可以JSON序列化返回执行结果核心技术点通过在config中设置resume参数LangGraph会自动将该值作为interrupt()方法的返回值从而实现工作流从中断处恢复执行。这种设计使得工作流恢复逻辑与正常执行逻辑无缝集成简化了系统设计。 :::2. 状态查询API (/agent/workflow/status_by_thread/{thread_id})该API提供工作流状态查询功能支持前端轮询获取最新状态。返回的状态信息包括工作流ID (thread_id)设备ID (equipment_id)当前步骤 (current_step)状态 (status) - 如completed、awaiting_human_input等时间戳信息 - 创建时间、恢复时间、完成时间是否等待人工输入 (awaiting_human_input)生产环境考虑示例代码中使用内存字典(workflow_states)存储工作流状态这仅适用于开发环境。在生产环境中应使用分布式缓存如Redis或数据库来存储状态确保系统的可扩展性和可靠性。针对所有自学遇到困难的同学们我帮大家系统梳理大模型学习脉络将这份LLM大模型资料分享出来包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 有需要的小伙伴可以扫描下方二维码领取↓↓↓五、前端实现状态同步与交互界面5.1 状态管理文件frontend/smart-maintenance-frontend/src/pages/AgentInterface.tsximport React, { useState, useEffect, useRef } from react;interface AgentMessage { role: user | assistant | system; content: string; timestamp: string;}const AgentInterface: React.FC () { // 核心状态 const [messages, setMessages] useStateAgentMessage[]([]); const [currentStep, setCurrentStep] useState(); const [agentStatus, setAgentStatus] useStateidle | processing | completed(idle); const [currentThreadId, setCurrentThreadId] useStatestring | null(null); // ⭐ 状态持久化自动保存到sessionStorage const saveWorkflowState (newMessages?: AgentMessage[], newCurrentStep?: string) { if (newMessages) { sessionStorage.setItem(workflowMessages, JSON.stringify(newMessages)); } if (newCurrentStep ! undefined) { sessionStorage.setItem(workflowCurrentStep, newCurrentStep); } }; // ⭐ 状态恢复从sessionStorage读取 const restoreWorkflowState () { const savedMessages sessionStorage.getItem(workflowMessages); const savedCurrentStep sessionStorage.getItem(workflowCurrentStep); const savedThreadId sessionStorage.getItem(currentThreadId); const savedAgentStatus sessionStorage.getItem(agentStatus); let restored false; if (savedMessages) { try { const messages JSON.parse(savedMessages); setMessages(messages); restored true; } catch (e) { console.error(解析保存的消息失败:, e); } } if (savedCurrentStep) { setCurrentStep(savedCurrentStep); restored true; } if (savedThreadId savedAgentStatus) { setCurrentThreadId(savedThreadId); setAgentStatus(savedAgentStatus as any); restored true; } return restored; }; // 页面加载时恢复状态 useEffect(() { const hasSavedState restoreWorkflowState(); if (!hasSavedState) { initializeAgent(); } }, []); // 状态变化时自动保存 useEffect(() { if (messages.length 0) { saveWorkflowState(messages); } }, [messages]);前端状态管理解析前端状态管理是HITL系统用户体验的关键直接影响用户对工作流状态的感知和操作。本实现通过以下机制确保状态的可靠性和一致性1. 核心状态定义定义了四个核心状态变量messages- 存储工作流交互消息列表currentStep- 当前工作流步骤agentStatus- 工作流状态idle/processing/completedcurrentThreadId- 当前工作流ID2. 状态持久化与恢复通过sessionStorage实现状态的持久化存储和恢复saveWorkflowState- 将消息列表和当前步骤保存到sessionStoragerestoreWorkflowState- 从sessionStorage读取并恢复状态用户体验优化状态持久化确保用户在页面刷新或意外关闭后仍能恢复到之前的工作状态这对于需要长时间审批或处理的工作流尤为重要。实现时需要注意处理JSON解析错误确保系统的健壮性。3. 自动保存机制通过React的useEffect钩子实现状态变化时的自动保存// 状态变化时自动保存useEffect(() { if (messages.length 0) { saveWorkflowState(messages); }}, [messages]);这种设计确保状态变化时能够及时保存避免数据丢失。5.2 轮询机制// ⭐ 核心轮询检测工作流状态const pollWorkflowStatus async (threadId: string) { console.log( [FRONTEND] 开始轮询工作流状态 - 线程ID: ${threadId}); // 每秒检查一次 const checkInterval setInterval(async () { try { const statusResponse await agentAPI.getWorkflowStatusByThread(threadId); const status statusResponse.data; console.log( [FRONTEND] 工作流状态响应:, status); if (status?.status completed) { // ⭐ 检测到工作流完成 console.log(✅ [FRONTEND] 检测到工作流已完成); clearInterval(checkInterval); setAgentStatus(completed); // 添加完成消息 const completedMessage: AgentMessage { role: assistant, content: ✅ **工作流已完成**\n\n 工作流已恢复并执行完毕。\n\n所有步骤\n• 设备监控\n• 故障预测\n• 维护规划\n• 工程师审查\n• 执行反馈\n• 知识更新\n\n 维护预测流程完成, timestamp: new Date().toISOString() }; // 更新消息列表 setMessages(prev { const newMessages [...prev, completedMessage]; // ⭐ 立即保存状态 saveWorkflowState(newMessages); return newMessages; }); toast.success(工作流已完成); setCurrentStep(); } else if (status?.status awaiting_human_input) { // 等待审批中 setCurrentStep(${status?.current_step} - 等待工程师审批...); } else if (status?.status running || status?.status resuming) { // 执行中 setCurrentStep(status?.current_step || ); } } catch (error) { console.error(❌ 轮询工作流状态出错:, error); // 继续轮询不中断 } }, 1000); // 60秒后超时 setTimeout(() { console.log(⏰ 轮询超时停止检查工作流状态); clearInterval(checkInterval); }, 60000);};轮询机制解析轮询机制是前端获取工作流状态更新的关键其核心逻辑包括使用setInterval设置每秒执行一次状态查询调用状态查询API获取最新状态根据返回状态执行不同逻辑completed- 工作流完成更新UI显示完成消息清除轮询定时器awaiting_human_input- 等待人工输入更新当前步骤提示running/resuming- 工作流执行中更新当前步骤设置60秒超时机制避免无限轮询错误处理 - 记录错误但不中断轮询技术选型考量示例中使用轮询机制实现状态同步这是一种简单可靠的方案适用于中小规模应用。对于大规模应用或对实时性要求更高的场景可以考虑使用WebSocket或Server-Sent Events (SSE)替代轮询减少服务器负载并提高实时性。六、关键代码深度剖析6.1 Interrupt机制的核心代码分析关键点1中断触发def engineer_review_node(self, state: Dict[str, Any]) - Command: 理解interrupt()的行为 1. 当执行到interrupt()时工作流立即暂停 2. interrupt()的参数会被保存并在恢复时返回 3. 函数在此处冻结直到工作流被恢复 4. 恢复时代码从此处继续执行 # 构建要传递给审批系统的数据 review_request { type: maintenance_plan_review, equipment_id: state.get(equipment_id), plan: state.get(maintenance_plan), risk_level: state.get(risk_level) } # ⭐ 触发中断 - 这是关键 # 工作流将暂停在此处等待外部恢复 human_decision interrupt(review_request) # ⭐ 工作流恢复后从此处继续执行 # human_decision包含审批结果 print(f收到审批决策: {human_decision}) # 处理审批结果 if human_decision.get(type) accept: state[approved] True elif human_decision.get(type) edit: state[approved] True state[modified_plan] human_decision.get(edited_plan) elif human_decision.get(type) feedback: state[feedback] human_decision.get(feedback) # 返回下一个节点 return Command( gotoexecution_feedback, # 下一个要执行的节点 resumehuman_decision # 可选传递数据给下一个节点 )深度解析interrupt()方法是实现HITL的核心。当调用此方法时LangGraph会执行以下操作保存当前节点的执行状态包括函数调用栈和局部变量将interrupt()的参数保存到持久化存储通过Checkpointer返回一个特殊的中断标记通知工作流引擎暂停执行工作流引擎返回当前状态给API调用者当工作流恢复时LangGraph会从持久化存储中加载保存的状态恢复函数执行环境将resume参数作为interrupt()方法的返回值从interrupt()调用处继续执行代码关键点2工作流恢复async def resume_workflow(thread_id: str, human_input: Dict[str, Any]): 恢复工作流的核心逻辑 1. 接收人工审批决策 2. 使用相同的thread_id 3. 通过config中的resume参数传递决策数据 4. LangGraph会自动从interrupt()处恢复执行 # 获取当前工作流状态 current_state get_current_state(thread_id) # ⭐ 关键使用resume参数恢复工作流 # 第一个参数是当前状态 # config中的resume是恢复时的输入 result await graph.ainvoke( current_state, # 当前状态 config{ configurable: {thread_id: thread_id}, resume: human_input # ⭐ 传递给interrupt()的返回值 } ) # 返回结果 return result技术解析工作流恢复的关键在于使用与中断时相同的thread_id并在config中提供resume参数。LangGraph通过以下机制实现恢复通过thread_id定位到被中断的工作流实例从Checkpointer加载保存的工作流状态将resume参数作为interrupt()的返回值恢复工作流执行从interrupt()调用处继续执行执行完成后返回最终结果这种设计使得工作流恢复逻辑与正常执行逻辑无缝集成开发者无需编写额外的恢复处理代码。关键点3前端轮询检测const pollWorkflowStatus async (threadId: string) { const checkInterval setInterval(async () { // 查询工作流状态 const status await agentAPI.getWorkflowStatusByThread(threadId); if (status.data.status completed) { // ⭐ 检测到完成添加完成消息 const completedMessage { role: assistant, content: ✅ 工作流已完成, timestamp: new Date().toISOString() }; setMessages(prev [...prev, completedMessage]); clearInterval(checkInterval); // 停止轮询 } }, 1000); // 每秒检查一次};实时性与性能平衡轮询间隔的选择需要在实时性和服务器负载之间取得平衡。示例中使用1秒间隔这对于大多数企业应用已经足够。对于实时性要求更高的场景可以考虑实现自适应轮询 - 根据工作流状态动态调整轮询间隔使用WebSocket实现真正的实时通信采用长轮询Long Polling技术减少无效请求无论采用何种技术都需要实现超时机制避免无限轮询导致的资源浪费。6.2 状态持久化机制// 自动保存状态useEffect(() { // 消息变化时保存 if (messages.length 0) { sessionStorage.setItem(workflowMessages, JSON.stringify(messages)); }}, [messages]);useEffect(() { // 状态变化时保存 if (currentStep) { sessionStorage.setItem(workflowCurrentStep, currentStep); }}, [currentStep]);// 页面加载时恢复状态const restoreWorkflowState () { const savedMessages sessionStorage.getItem(workflowMessages); const savedCurrentStep sessionStorage.getItem(workflowCurrentStep); if (savedMessages) { const messages JSON.parse(savedMessages); setMessages(messages); // 恢复消息列表 } if (savedCurrentStep) { setCurrentStep(savedCurrentStep); // 恢复当前步骤 }};状态管理策略前端状态持久化是提升用户体验的关键功能实现时需要考虑以下几点存储选择sessionStorage适用于临时会话状态localStorage适用于长期保存的数据数据结构确保存储的数据可以被JSON序列化和解析性能优化避免过于频繁的存储操作可以使用防抖debounce技术优化错误处理处理JSON解析错误和存储容量限制等异常情况安全考虑避免在客户端存储敏感信息七、开发要点与最佳实践7.1 Interrupt开发要点要点1正确配置Checkpointer# ✅ 正确必须配置Checkpointerbuilder.compile( checkpointerSqliteSaver.from_conn_string(sqlite:///./state.db))# ❌ 错误没有配置Checkpointerbuilder.compile() # 无法使用interrupt!重要提示Checkpointer是使用Interrupt机制的前提条件。没有配置Checkpointer的工作流无法实现状态持久化因此也无法支持中断和恢复功能。在开发环境中可以使用SQLiteSaver而在生产环境中应考虑使用更强大的数据库如PostgreSQL或分布式存储如Redis。️工业级补充审批环节的安全防范虽然示例代码为了简洁未加入权限校验但在生产环境中HITL 必须考虑身份校验在 resume_workflow API 中必须验证提交决策的用户是否有权限操作该设备。输入验证人类是不可靠的。对 editedPlan 的回传必须进行严格的 Schema 校验防止由于错误的 JSON 格式导致工作流在后续节点崩溃。审计日志每一次 interrupt 到 resume 的过程都应记录谁在什么时间、基于什么数据、做了什么决定。要点2使用唯一的线程ID# ✅ 正确每次调用使用唯一线程IDthread_id fworkflow_{equipment_id}_{int(time.time())}result graph.invoke(initial_state, config{configurable: {thread_id: thread_id}})# ❌ 错误使用相同线程ID会导致状态冲突result graph.invoke(initial_state) # 使用默认线程ID最佳实践thread_id的设计应确保唯一性避免不同工作流实例之间的状态冲突。推荐的thread_id生成策略包括结合业务ID如设备ID、用户ID和时间戳使用UUID/GUID结合分布式ID生成器如Snowflake算法同时thread_id应包含足够的业务信息便于问题排查和监控。要点3合理使用interrupt_before/after# 在指定节点前中断builder.compile( checkpointercheckpointer, interrupt_before[engineer_review] # 在engineer_review节点前中断)# 在指定节点后中断builder.compile( checkpointercheckpointer, interrupt_after[engineer_review] # 在engineer_review节点后中断)# 在节点内手动控制中断推荐def engineer_review_node(self, state): # 业务逻辑... human_decision interrupt(data) # 手动控制中断时机 # 恢复后逻辑... return Command(gotonext)技术选型建议静态配置interrupt_before/interrupt_after适用于固定需要人工干预的节点配置简单动态中断节点内调用interrupt()适用于需要根据业务逻辑动态决定是否中断的场景灵活性更高实际项目中推荐使用动态中断方式因为它允许根据实时业务数据如风险评估结果决定是否需要人工干预更符合HITL的核心价值。7.2 错误处理最佳实践实践1JSON序列化保护def clean_result_for_json(result): 清理不可序列化对象 cleaned {} for key, value in result.items(): try: # 尝试JSON序列化 json.dumps(value) cleaned[key] value except (TypeError, ValueError): # 转换不可序列化的对象 cleaned[key] str(value) return cleaned最佳实践LangGraph工作流返回的结果可能包含复杂对象这些对象无法直接序列化为JSON。在API返回结果前进行清理处理可以避免序列化错误。关键措施包括识别并特殊处理不可序列化的对象类型提供友好的错误提示和日志记录将复杂对象转换为字符串或简单字典表示在开发阶段进行充分的序列化测试实践2超时处理// 前端轮询超时setTimeout(() { clearInterval(checkInterval); console.log(⏰ 轮询超时);}, 60000); // 60秒超时超时处理是确保系统稳定性的重要措施应在以下场景实现API调用设置合理的请求超时时间轮询机制设置最大轮询时长避免无限轮询工作流执行设置工作流整体执行超时时间人工审批设置审批超时自动处理机制超时时间的设置应根据业务场景调整太短可能导致正常流程被中断太长则可能影响用户体验或导致资源泄漏。实践3异常捕获async def resume_workflow(thread_id: str, human_input: Dict[str, Any]): try: # 恢复工作流 result await graph.ainvoke(...) return {success: True, data: result} except Exception as e: # 记录错误 print(f❌ 恢复工作流失败: {e}) # 返回友好错误 raise HTTPException(status_code500, detailstr(e))异常处理最佳实践分层处理在API层统一处理异常避免将内部错误直接暴露给客户端详细日志记录异常堆栈信息便于问题排查指标监控记录异常指标支持监控和告警用户友好返回清晰的错误信息指导用户如何处理区分异常类型对已知异常和未知异常分别处理7.3 性能优化优化1减少轮询频率// ✅ 合理每秒轮询一次setInterval(async () { const status await checkStatus();}, 1000);// ❌ 过于频繁每100ms轮询setInterval(async () { const status await checkStatus();}, 100); // 可能导致服务器压力过大轮询频率优化建议默认轮询间隔1-5秒根据业务实时性要求调整自适应轮询根据工作流状态动态调整间隔如等待审批状态可增大间隔批量请求一次请求获取多个工作流状态减少请求次数长轮询服务器在有数据更新时才返回响应减少无效请求轮询优化可以显著减少服务器负载和网络流量特别是在系统规模扩大时效果更为明显。优化2使用WebSocket替代轮询可选// WebSocket实现更高效const ws new WebSocket(ws://localhost:8000/ws);ws.onmessage (event) { const status JSON.parse(event.data); if (status.type workflow_completed) { // 处理完成 }};WebSocket相比轮询的优势实时性更高服务器可以主动推送状态更新减少网络流量无需频繁发送重复的请求头降低服务器负载减少无效请求处理更好的用户体验状态更新无延迟适用场景用户数量适中、实时性要求高的应用。对于大规模应用需要考虑WebSocket服务器的扩展性和负载均衡问题。优化3状态批量更新// ✅ 批量更新减少重渲染setMessages(prev { const newMessages [...prev, msg1, msg2, msg3]; saveWorkflowState(newMessages); // 一次性保存 return newMessages;});前端性能优化建议批量更新合并多个状态更新操作减少React重渲染防抖处理对频繁触发的操作如输入、滚动使用防抖优化懒加载对大型组件或数据采用懒加载策略不可变数据使用不可变数据结构优化React渲染性能虚拟滚动对长列表使用虚拟滚动只渲染可见区域八、总结8.1 项目总结本项目成功实现了一个完整的HITL人机协同系统展示了以下核心技术LangGraph Interrupt机制标准的中断/恢复流程状态自动持久化线程级状态隔离前后端解耦架构HTTP API通信实时状态轮询状态持久化工业级质量完整的错误处理类型安全设计性能优化下一步行动如果你想将这套系统应用到自己的项目中建议从以下三个步骤开始定义你的中断点梳理业务流程找出哪些地方 AI 容易出错或法律合规性要求必须人工参与。实现持久化层配置好 SQLite 或 PostgreSQL 的 Checkpointer。构建 UI 闭环不仅要能显示状态还要能让用户直观地看到“AI 为什么要请你帮忙”例如显示 AI 的置信度分数。8.2 应用场景HITL系统适用于多种场景金融风控- 大额交易需要人工审批医疗诊断- AI辅助诊断需要医生确认内容审核- 用户生成内容需要人工审核工业质检- 异常检测需要工程师确认法律文档- AI生成的法律文档需要律师审查九、如何学习AI大模型大模型时代火爆出圈的LLM大模型让程序员们开始重新评估自己的本领。 “AI会取代那些行业”“谁的饭碗又将不保了”等问题热议不断。不如成为「掌握AI工具的技术人」毕竟AI时代谁先尝试谁就能占得先机想正式转到一些新兴的 AI 行业不仅需要系统的学习AI大模型。同时也要跟已有的技能结合辅助编程提效或上手实操应用增加自己的职场竞争力。但是LLM相关的内容很多现在网上的老课程老教材关于LLM又太少。所以现在小白入门就只能靠自学学习成本和门槛很高那么针对所有自学遇到困难的同学们我帮大家系统梳理大模型学习脉络将这份LLM大模型资料分享出来包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 有需要的小伙伴可以扫描下方二维码领取↓↓↓学习路线第一阶段 从大模型系统设计入手讲解大模型的主要方法第二阶段 在通过大模型提示词工程从Prompts角度入手更好发挥模型的作用第三阶段 大模型平台应用开发借助阿里云PAI平台构建电商领域虚拟试衣系统第四阶段 大模型知识库应用开发以LangChain框架为例构建物流行业咨询智能问答系统第五阶段 大模型微调开发借助以大健康、新零售、新媒体领域构建适合当前领域大模型第六阶段 以SD多模态大模型为主搭建了文生图小程序案例第七阶段 以大模型平台应用与开发为主通过星火大模型文心大模型等成熟大模型构建大模型行业应用。学会后的收获• 基于大模型全栈工程实现前端、后端、产品经理、设计、数据分析等通过这门课可获得不同能力• 能够利用大模型解决相关实际项目需求 大数据时代越来越多的企业和机构需要处理海量数据利用大模型技术可以更好地处理这些数据提高数据分析和决策的准确性。因此掌握大模型应用开发技能可以让程序员更好地应对实际项目需求• 基于大模型和企业数据AI应用开发实现大模型理论、掌握GPU算力、硬件、LangChain开发框架和项目实战技能 学会Fine-tuning垂直训练大模型数据准备、数据蒸馏、大模型部署一站式掌握• 能够完成时下热门大模型垂直领域模型训练能力提高程序员的编码能力 大模型应用开发需要掌握机器学习算法、深度学习框架等技术这些技术的掌握可以提高程序员的编码能力和分析能力让程序员更加熟练地编写高质量的代码。1.AI大模型学习路线图2.100套AI大模型商业化落地方案3.100集大模型视频教程4.200本大模型PDF书籍5.LLM面试题合集6.AI产品经理资源合集获取方式有需要的小伙伴可以保存图片到wx扫描二v码免费领取【保证100%免费】