feat: 增加工作流合法性校验功能

This commit is contained in:
2026-03-04 19:56:42 +08:00
parent a79718b03b
commit ae9bb2c53f
12 changed files with 1755 additions and 38 deletions

View File

@@ -0,0 +1,70 @@
package tech.easyflow.ai.easyagentsflow.entity;
public class WorkflowCheckIssue {
private String code;
private String level;
private String message;
private String nodeId;
private String edgeId;
private String nodeName;
public WorkflowCheckIssue() {
}
public WorkflowCheckIssue(String code, String level, String message, String nodeId, String edgeId, String nodeName) {
this.code = code;
this.level = level;
this.message = message;
this.nodeId = nodeId;
this.edgeId = edgeId;
this.nodeName = nodeName;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getEdgeId() {
return edgeId;
}
public void setEdgeId(String edgeId) {
this.edgeId = edgeId;
}
public String getNodeName() {
return nodeName;
}
public void setNodeName(String nodeName) {
this.nodeName = nodeName;
}
}

View File

@@ -0,0 +1,46 @@
package tech.easyflow.ai.easyagentsflow.entity;
import java.util.ArrayList;
import java.util.List;
public class WorkflowCheckResult {
private boolean passed;
private WorkflowCheckStage stage;
private int issueCount;
private List<WorkflowCheckIssue> issues = new ArrayList<>();
public WorkflowCheckResult() {
}
public boolean isPassed() {
return passed;
}
public void setPassed(boolean passed) {
this.passed = passed;
}
public WorkflowCheckStage getStage() {
return stage;
}
public void setStage(WorkflowCheckStage stage) {
this.stage = stage;
}
public int getIssueCount() {
return issueCount;
}
public void setIssueCount(int issueCount) {
this.issueCount = issueCount;
}
public List<WorkflowCheckIssue> getIssues() {
return issues;
}
public void setIssues(List<WorkflowCheckIssue> issues) {
this.issues = issues == null ? new ArrayList<>() : issues;
}
}

View File

@@ -0,0 +1,21 @@
package tech.easyflow.ai.easyagentsflow.entity;
import tech.easyflow.common.web.exceptions.BusinessException;
import java.util.Locale;
public enum WorkflowCheckStage {
SAVE,
PRE_EXECUTE;
public static WorkflowCheckStage from(String value) {
if (value == null || value.trim().isEmpty()) {
throw new BusinessException("校验阶段不能为空");
}
try {
return WorkflowCheckStage.valueOf(value.trim().toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new BusinessException("不支持的校验阶段: " + value);
}
}
}

View File

@@ -0,0 +1,692 @@
package tech.easyflow.ai.easyagentsflow.service;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.easyagents.flow.core.parser.ChainParser;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckIssue;
import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckResult;
import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckStage;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.ai.service.WorkflowService;
import tech.easyflow.common.web.exceptions.BusinessException;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.util.*;
import java.util.stream.Collectors;
@Service
public class WorkflowCheckService {
private static final String LEVEL_ERROR = "ERROR";
private static final String TYPE_START = "startNode";
private static final String TYPE_END = "endNode";
private static final String TYPE_LOOP = "loopNode";
private static final String TYPE_WORKFLOW = "workflow-node";
@Resource
private WorkflowService workflowService;
@Resource
private ChainParser chainParser;
public WorkflowCheckResult checkWorkflow(BigInteger workflowId, WorkflowCheckStage stage) {
if (workflowId == null) {
throw new BusinessException("工作流ID不能为空");
}
Workflow workflow = workflowService.getById(workflowId);
if (workflow == null) {
throw new BusinessException("工作流不存在: " + workflowId);
}
return checkContent(workflow.getContent(), stage, workflowId);
}
public WorkflowCheckResult checkContent(String content, WorkflowCheckStage stage, BigInteger currentWorkflowId) {
if (stage == null) {
throw new BusinessException("校验阶段不能为空");
}
List<WorkflowCheckIssue> issues = new ArrayList<>();
Set<String> issueKeys = new LinkedHashSet<>();
ParsedWorkflow parsedWorkflow = parseAndCheckBase(content, issues, issueKeys);
if (stage == WorkflowCheckStage.PRE_EXECUTE && parsedWorkflow != null) {
runStrictChecks(content, parsedWorkflow, currentWorkflowId, issues, issueKeys);
}
return buildResult(stage, issues);
}
public void checkOrThrow(String content, WorkflowCheckStage stage, BigInteger currentWorkflowId) {
WorkflowCheckResult result = checkContent(content, stage, currentWorkflowId);
throwIfFailed(result);
}
public void checkOrThrow(BigInteger workflowId, WorkflowCheckStage stage) {
WorkflowCheckResult result = checkWorkflow(workflowId, stage);
throwIfFailed(result);
}
private ParsedWorkflow parseAndCheckBase(String content, List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
if (!StringUtils.hasText(content)) {
addIssue(issues, issueKeys, "INVALID_JSON", "工作流内容不能为空", null, null, null);
return null;
}
Object parsed;
try {
parsed = JSON.parse(content);
} catch (Exception e) {
addIssue(issues, issueKeys, "INVALID_JSON", "工作流内容不是合法JSON: " + shortError(e), null, null, null);
return null;
}
if (!(parsed instanceof JSONObject)) {
addIssue(issues, issueKeys, "INVALID_JSON_OBJECT", "工作流内容必须是JSON对象", null, null, null);
return null;
}
JSONObject root = (JSONObject) parsed;
JSONArray nodesArray = getArrayField(root, "nodes", "NODES_NOT_ARRAY", "nodes 必须是数组", issues, issueKeys);
JSONArray edgesArray = getArrayField(root, "edges", "EDGES_NOT_ARRAY", "edges 必须是数组", issues, issueKeys);
List<NodeView> nodes = new ArrayList<>();
Map<String, NodeView> nodeMap = new LinkedHashMap<>();
Set<String> nodeIds = new HashSet<>();
Map<String, ?> parserMap = chainParser.getNodeParserMap() == null ? Collections.emptyMap() : chainParser.getNodeParserMap();
for (int i = 0; i < nodesArray.size(); i++) {
Object nodeObject = nodesArray.get(i);
if (!(nodeObject instanceof JSONObject)) {
addIssue(issues, issueKeys, "NODE_INVALID", "" + (i + 1) + " 个节点不是对象", null, null, null);
continue;
}
JSONObject nodeJson = (JSONObject) nodeObject;
NodeView node = new NodeView();
node.id = trimToNull(nodeJson.getString("id"));
node.type = trimToNull(nodeJson.getString("type"));
node.parentId = trimToNull(nodeJson.getString("parentId"));
node.data = nodeJson.getJSONObject("data");
node.name = extractNodeName(nodeJson, node.data, node.id);
if (!StringUtils.hasText(node.id)) {
addIssue(issues, issueKeys, "NODE_ID_EMPTY", "存在节点缺少 id", null, null, node.name);
continue;
}
if (!nodeIds.add(node.id)) {
addIssue(issues, issueKeys, "NODE_ID_DUPLICATE", "节点ID重复: " + node.id, node.id, null, node.name);
}
if (!StringUtils.hasText(node.type) || !parserMap.containsKey(node.type)) {
addIssue(issues, issueKeys, "NODE_TYPE_UNKNOWN", "节点类型无法识别: " + safe(node.type), node.id, null, node.name);
}
if (StringUtils.hasText(node.parentId) && node.parentId.equals(node.id)) {
addIssue(issues, issueKeys, "NODE_PARENT_SELF", "节点不能引用自己作为父节点", node.id, null, node.name);
}
nodes.add(node);
nodeMap.put(node.id, node);
}
for (NodeView node : nodes) {
if (StringUtils.hasText(node.parentId) && !nodeMap.containsKey(node.parentId)) {
addIssue(issues, issueKeys, "NODE_PARENT_NOT_FOUND",
"父节点不存在: " + node.parentId, node.id, null, node.name);
}
}
List<EdgeView> edges = new ArrayList<>();
Set<String> edgeIds = new HashSet<>();
for (int i = 0; i < edgesArray.size(); i++) {
Object edgeObject = edgesArray.get(i);
if (!(edgeObject instanceof JSONObject)) {
addIssue(issues, issueKeys, "EDGE_INVALID", "" + (i + 1) + " 条连线不是对象", null, null, null);
continue;
}
JSONObject edgeJson = (JSONObject) edgeObject;
EdgeView edge = new EdgeView();
edge.id = trimToNull(edgeJson.getString("id"));
edge.source = trimToNull(edgeJson.getString("source"));
edge.target = trimToNull(edgeJson.getString("target"));
if (!StringUtils.hasText(edge.id)) {
addIssue(issues, issueKeys, "EDGE_ID_EMPTY", "存在连线缺少 id", null, null, null);
continue;
}
if (!edgeIds.add(edge.id)) {
addIssue(issues, issueKeys, "EDGE_ID_DUPLICATE", "连线ID重复: " + edge.id, null, edge.id, null);
}
if (!StringUtils.hasText(edge.source)) {
addIssue(issues, issueKeys, "EDGE_SOURCE_EMPTY", "连线 source 不能为空", null, edge.id, null);
}
if (!StringUtils.hasText(edge.target)) {
addIssue(issues, issueKeys, "EDGE_TARGET_EMPTY", "连线 target 不能为空", null, edge.id, null);
}
edges.add(edge);
}
for (EdgeView edge : edges) {
if (StringUtils.hasText(edge.source) && !nodeMap.containsKey(edge.source)) {
addIssue(issues, issueKeys, "EDGE_SOURCE_NOT_FOUND",
"连线 source 不存在: " + edge.source, edge.source, edge.id, null);
}
if (StringUtils.hasText(edge.target) && !nodeMap.containsKey(edge.target)) {
addIssue(issues, issueKeys, "EDGE_TARGET_NOT_FOUND",
"连线 target 不存在: " + edge.target, edge.target, edge.id, null);
}
}
ParsedWorkflow parsedWorkflow = new ParsedWorkflow();
parsedWorkflow.nodes = nodes;
parsedWorkflow.edges = edges;
parsedWorkflow.nodeMap = nodeMap;
return parsedWorkflow;
}
private void runStrictChecks(String content, ParsedWorkflow parsed, BigInteger currentWorkflowId,
List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
if (parsed.nodes.isEmpty()) {
addIssue(issues, issueKeys, "NODES_EMPTY", "预执行校验失败nodes 不能为空", null, null, null);
}
if (parsed.edges.isEmpty()) {
addIssue(issues, issueKeys, "EDGES_EMPTY", "预执行校验失败edges 不能为空", null, null, null);
}
try {
Object definition = chainParser.parse(content);
if (definition == null) {
addIssue(issues, issueKeys, "PARSE_NULL", "预执行校验失败:节点配置错误,请检查", null, null, null);
}
} catch (Exception e) {
addIssue(issues, issueKeys, "PARSE_FAILED", "预执行校验失败:解析流程失败 - " + shortError(e), null, null, null);
}
List<NodeView> startNodes = parsed.nodes.stream()
.filter(node -> TYPE_START.equals(node.type))
.collect(Collectors.toList());
List<NodeView> endNodes = parsed.nodes.stream()
.filter(node -> TYPE_END.equals(node.type))
.collect(Collectors.toList());
if (startNodes.isEmpty()) {
addIssue(issues, issueKeys, "START_NODE_MISSING", "预执行校验失败:至少需要一个 startNode", null, null, null);
}
if (endNodes.isEmpty()) {
addIssue(issues, issueKeys, "END_NODE_MISSING", "预执行校验失败:至少需要一个 endNode", null, null, null);
}
List<NodeView> rootNodes = parsed.nodes.stream()
.filter(NodeView::isRootLevel)
.collect(Collectors.toList());
Map<String, Integer> rootInDegree = new HashMap<>();
for (NodeView rootNode : rootNodes) {
rootInDegree.put(rootNode.id, 0);
}
for (EdgeView edge : parsed.edges) {
NodeView source = parsed.nodeMap.get(edge.source);
NodeView target = parsed.nodeMap.get(edge.target);
if (source == null || target == null) {
continue;
}
if (source.isRootLevel() && target.isRootLevel() && isSameParent(source, target)) {
rootInDegree.put(target.id, rootInDegree.getOrDefault(target.id, 0) + 1);
}
}
List<NodeView> rootEntries = new ArrayList<>();
for (NodeView rootNode : rootNodes) {
if (rootInDegree.getOrDefault(rootNode.id, 0) == 0) {
rootEntries.add(rootNode);
}
}
for (NodeView entry : rootEntries) {
if (!TYPE_START.equals(entry.type)) {
addIssue(issues, issueKeys, "ROOT_ENTRY_NOT_START",
"根级入度为0的节点必须是 startNode", entry.id, null, entry.name);
}
}
detectExplicitCycle(parsed, issues, issueKeys);
Map<String, Set<String>> runtimeGraph = buildRuntimeGraph(parsed);
Set<String> entryNodeIds = rootEntries.stream()
.map(node -> node.id)
.collect(Collectors.toCollection(LinkedHashSet::new));
if (entryNodeIds.isEmpty()) {
for (NodeView startNode : startNodes) {
entryNodeIds.add(startNode.id);
}
}
Set<String> reachable = bfsReachable(runtimeGraph, entryNodeIds);
for (String nodeId : reachable) {
NodeView node = parsed.nodeMap.get(nodeId);
if (node == null || TYPE_END.equals(node.type)) {
continue;
}
if (runtimeGraph.getOrDefault(nodeId, Collections.emptySet()).isEmpty()) {
addIssue(issues, issueKeys, "DEAD_END_NODE",
"存在无法继续执行的死路节点(非 end 且无后继)", node.id, null, node.name);
}
}
if (!endNodes.isEmpty()) {
Set<String> canReachEnd = reverseReachable(runtimeGraph, endNodes.stream()
.map(node -> node.id)
.collect(Collectors.toSet()));
for (String nodeId : reachable) {
if (!canReachEnd.contains(nodeId)) {
NodeView node = parsed.nodeMap.get(nodeId);
if (node != null) {
addIssue(issues, issueKeys, "END_UNREACHABLE",
"该节点无法到达任一 endNode", node.id, null, node.name);
}
}
}
}
checkWorkflowReferences(parsed, currentWorkflowId, content, issues, issueKeys);
}
private void checkWorkflowReferences(ParsedWorkflow parsed, BigInteger currentWorkflowId, String currentContent,
List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
Map<String, String> contentCache = new HashMap<>();
String currentWorkflowIdString = currentWorkflowId == null ? null : currentWorkflowId.toString();
if (StringUtils.hasText(currentWorkflowIdString)) {
contentCache.put(currentWorkflowIdString, currentContent);
}
for (NodeView node : parsed.nodes) {
if (!TYPE_WORKFLOW.equals(node.type)) {
continue;
}
String workflowId = getWorkflowIdInNode(node);
if (!StringUtils.hasText(workflowId)) {
addIssue(issues, issueKeys, "WORKFLOW_REF_EMPTY", "子流程节点缺少 workflowId", node.id, null, node.name);
continue;
}
if (StringUtils.hasText(currentWorkflowIdString) && currentWorkflowIdString.equals(workflowId)) {
addIssue(issues, issueKeys, "WORKFLOW_REF_CYCLE", "子流程递归引用:工作流不能引用自身", node.id, null, node.name);
continue;
}
String workflowContent = loadWorkflowContent(workflowId, currentWorkflowIdString, currentContent, contentCache);
if (!StringUtils.hasText(workflowContent)) {
addIssue(issues, issueKeys, "WORKFLOW_REF_NOT_FOUND",
"子流程引用不存在: " + workflowId, node.id, null, node.name);
}
}
if (!StringUtils.hasText(currentWorkflowIdString)) {
return;
}
detectWorkflowReferenceCycle(currentWorkflowIdString, currentWorkflowIdString, currentContent, contentCache, issues, issueKeys);
}
private void detectWorkflowReferenceCycle(String rootWorkflowId, String currentWorkflowId, String currentContent,
Map<String, String> contentCache,
List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
Set<String> visited = new HashSet<>();
LinkedHashSet<String> visiting = new LinkedHashSet<>();
dfsWorkflowGraph(rootWorkflowId, currentWorkflowId, currentContent, contentCache, visited, visiting, issues, issueKeys);
}
private void dfsWorkflowGraph(String workflowId, String currentWorkflowId, String currentContent,
Map<String, String> contentCache,
Set<String> visited, LinkedHashSet<String> visiting,
List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
if (visited.contains(workflowId)) {
return;
}
if (visiting.contains(workflowId)) {
addIssue(issues, issueKeys, "WORKFLOW_REF_CYCLE",
"检测到子流程递归引用: " + formatCyclePath(visiting, workflowId), null, null, null);
return;
}
String content = loadWorkflowContent(workflowId, currentWorkflowId, currentContent, contentCache);
if (!StringUtils.hasText(content)) {
visited.add(workflowId);
return;
}
visiting.add(workflowId);
Set<String> referencedWorkflowIds = extractWorkflowRefIds(content);
for (String refWorkflowId : referencedWorkflowIds) {
if (!StringUtils.hasText(refWorkflowId)) {
continue;
}
if (visiting.contains(refWorkflowId)) {
addIssue(issues, issueKeys, "WORKFLOW_REF_CYCLE",
"检测到子流程递归引用: " + formatCyclePath(visiting, refWorkflowId), null, null, null);
continue;
}
String refContent = loadWorkflowContent(refWorkflowId, currentWorkflowId, currentContent, contentCache);
if (!StringUtils.hasText(refContent)) {
continue;
}
dfsWorkflowGraph(refWorkflowId, currentWorkflowId, currentContent, contentCache, visited, visiting, issues, issueKeys);
}
visiting.remove(workflowId);
visited.add(workflowId);
}
private String loadWorkflowContent(String workflowId, String currentWorkflowId, String currentContent, Map<String, String> contentCache) {
if (!StringUtils.hasText(workflowId)) {
return null;
}
if (StringUtils.hasText(currentWorkflowId) && currentWorkflowId.equals(workflowId)) {
return currentContent;
}
if (contentCache.containsKey(workflowId)) {
return contentCache.get(workflowId);
}
Workflow workflow = workflowService.getById(workflowId);
String content = workflow == null ? null : workflow.getContent();
contentCache.put(workflowId, content);
return content;
}
private Set<String> extractWorkflowRefIds(String content) {
Set<String> refs = new LinkedHashSet<>();
if (!StringUtils.hasText(content)) {
return refs;
}
try {
Object parsed = JSON.parse(content);
if (!(parsed instanceof JSONObject)) {
return refs;
}
JSONArray nodes = ((JSONObject) parsed).getJSONArray("nodes");
if (nodes == null) {
return refs;
}
for (int i = 0; i < nodes.size(); i++) {
JSONObject node = nodes.getJSONObject(i);
if (node == null) {
continue;
}
if (!TYPE_WORKFLOW.equals(trimToNull(node.getString("type")))) {
continue;
}
JSONObject data = node.getJSONObject("data");
String workflowId = trimToNull(data == null ? null : data.getString("workflowId"));
if (StringUtils.hasText(workflowId)) {
refs.add(workflowId);
}
}
} catch (Exception ignored) {
// ignore
}
return refs;
}
private String formatCyclePath(LinkedHashSet<String> visiting, String cycleStart) {
List<String> chain = new ArrayList<>();
boolean started = false;
for (String id : visiting) {
if (id.equals(cycleStart)) {
started = true;
}
if (started) {
chain.add(id);
}
}
chain.add(cycleStart);
return String.join(" -> ", chain);
}
private String getWorkflowIdInNode(NodeView node) {
if (node == null || node.data == null) {
return null;
}
return trimToNull(node.data.getString("workflowId"));
}
private void detectExplicitCycle(ParsedWorkflow parsed, List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
Map<String, List<String>> graph = new HashMap<>();
for (NodeView node : parsed.nodes) {
graph.put(node.id, new ArrayList<>());
}
for (EdgeView edge : parsed.edges) {
if (!StringUtils.hasText(edge.source) || !StringUtils.hasText(edge.target)) {
continue;
}
if (!parsed.nodeMap.containsKey(edge.source) || !parsed.nodeMap.containsKey(edge.target)) {
continue;
}
if (edge.source.equals(edge.target)) {
addIssue(issues, issueKeys, "GRAPH_CYCLE", "检测到自环连线", edge.source, edge.id, parsed.nodeMap.get(edge.source).name);
}
graph.get(edge.source).add(edge.target);
}
Map<String, Integer> state = new HashMap<>();
Deque<String> path = new ArrayDeque<>();
for (NodeView node : parsed.nodes) {
if (state.getOrDefault(node.id, 0) != 0) {
continue;
}
if (dfsCycle(node.id, graph, state, path, parsed, issues, issueKeys)) {
return;
}
}
}
private boolean dfsCycle(String current, Map<String, List<String>> graph, Map<String, Integer> state,
Deque<String> path, ParsedWorkflow parsed, List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
state.put(current, 1);
path.addLast(current);
for (String next : graph.getOrDefault(current, Collections.emptyList())) {
int nextState = state.getOrDefault(next, 0);
if (nextState == 0) {
if (dfsCycle(next, graph, state, path, parsed, issues, issueKeys)) {
return true;
}
} else if (nextState == 1) {
String cyclePath = formatNodeCyclePath(path, next);
NodeView node = parsed.nodeMap.get(next);
addIssue(issues, issueKeys, "GRAPH_CYCLE", "检测到环路: " + cyclePath, next, null, node == null ? null : node.name);
return true;
}
}
path.removeLast();
state.put(current, 2);
return false;
}
private String formatNodeCyclePath(Deque<String> path, String cycleStart) {
List<String> list = new ArrayList<>(path);
int index = list.indexOf(cycleStart);
if (index < 0) {
return String.join(" -> ", list);
}
List<String> cycle = new ArrayList<>(list.subList(index, list.size()));
cycle.add(cycleStart);
return String.join(" -> ", cycle);
}
private Map<String, Set<String>> buildRuntimeGraph(ParsedWorkflow parsed) {
Map<String, Set<String>> graph = new HashMap<>();
for (NodeView node : parsed.nodes) {
graph.put(node.id, new LinkedHashSet<>());
}
for (EdgeView edge : parsed.edges) {
if (!StringUtils.hasText(edge.source) || !StringUtils.hasText(edge.target)) {
continue;
}
NodeView source = parsed.nodeMap.get(edge.source);
NodeView target = parsed.nodeMap.get(edge.target);
if (source == null || target == null) {
continue;
}
if (TYPE_END.equals(source.type)) {
continue;
}
if (isSameParent(source, target)
|| (TYPE_LOOP.equals(source.type) && source.id.equals(target.parentId))) {
graph.get(source.id).add(target.id);
}
}
for (NodeView node : parsed.nodes) {
if (TYPE_END.equals(node.type)) {
continue;
}
Set<String> nextNodes = graph.getOrDefault(node.id, new LinkedHashSet<>());
if (nextNodes.isEmpty() && StringUtils.hasText(node.parentId) && parsed.nodeMap.containsKey(node.parentId)) {
nextNodes.add(node.parentId);
}
graph.put(node.id, nextNodes);
}
return graph;
}
private Set<String> bfsReachable(Map<String, Set<String>> graph, Set<String> starts) {
Set<String> visited = new LinkedHashSet<>();
Deque<String> queue = new ArrayDeque<>();
for (String start : starts) {
if (StringUtils.hasText(start) && graph.containsKey(start)) {
queue.add(start);
visited.add(start);
}
}
while (!queue.isEmpty()) {
String current = queue.pollFirst();
for (String next : graph.getOrDefault(current, Collections.emptySet())) {
if (visited.add(next)) {
queue.addLast(next);
}
}
}
return visited;
}
private Set<String> reverseReachable(Map<String, Set<String>> graph, Set<String> endNodes) {
Map<String, Set<String>> reverseGraph = new HashMap<>();
for (String source : graph.keySet()) {
reverseGraph.computeIfAbsent(source, key -> new LinkedHashSet<>());
for (String target : graph.getOrDefault(source, Collections.emptySet())) {
reverseGraph.computeIfAbsent(target, key -> new LinkedHashSet<>()).add(source);
}
}
Set<String> visited = new LinkedHashSet<>();
Deque<String> queue = new ArrayDeque<>();
for (String endNode : endNodes) {
if (StringUtils.hasText(endNode) && reverseGraph.containsKey(endNode)) {
visited.add(endNode);
queue.add(endNode);
}
}
while (!queue.isEmpty()) {
String current = queue.pollFirst();
for (String prev : reverseGraph.getOrDefault(current, Collections.emptySet())) {
if (visited.add(prev)) {
queue.addLast(prev);
}
}
}
return visited;
}
private boolean isSameParent(NodeView source, NodeView target) {
return Objects.equals(trimToNull(source.parentId), trimToNull(target.parentId));
}
private JSONArray getArrayField(JSONObject root, String fieldName, String issueCode, String issueMessage,
List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
Object value = root.get(fieldName);
if (value == null) {
return new JSONArray();
}
if (!(value instanceof JSONArray)) {
addIssue(issues, issueKeys, issueCode, issueMessage, null, null, null);
return new JSONArray();
}
return (JSONArray) value;
}
private WorkflowCheckResult buildResult(WorkflowCheckStage stage, List<WorkflowCheckIssue> issues) {
WorkflowCheckResult result = new WorkflowCheckResult();
result.setStage(stage);
result.setIssues(issues);
result.setIssueCount(issues.size());
result.setPassed(issues.isEmpty());
return result;
}
private void throwIfFailed(WorkflowCheckResult result) {
if (result == null || result.isPassed()) {
return;
}
String summary = result.getIssues().stream()
.limit(5)
.map(WorkflowCheckIssue::getMessage)
.collect(Collectors.joining(""));
if (result.getIssueCount() > 5) {
summary = summary + ";等";
}
throw new BusinessException("工作流校验未通过(" + result.getStage() + "),共 " + result.getIssueCount() + " 项:" + summary);
}
private void addIssue(List<WorkflowCheckIssue> issues, Set<String> issueKeys, String code,
String message, String nodeId, String edgeId, String nodeName) {
String key = code + "|" + safe(nodeId) + "|" + safe(edgeId) + "|" + message;
if (!issueKeys.add(key)) {
return;
}
issues.add(new WorkflowCheckIssue(code, LEVEL_ERROR, message, nodeId, edgeId, nodeName));
}
private String extractNodeName(JSONObject nodeJson, JSONObject data, String fallback) {
String name = null;
if (data != null) {
name = trimToNull(data.getString("title"));
}
if (!StringUtils.hasText(name) && nodeJson != null) {
name = trimToNull(nodeJson.getString("label"));
}
return StringUtils.hasText(name) ? name : fallback;
}
private String trimToNull(String value) {
if (!StringUtils.hasText(value)) {
return null;
}
return value.trim();
}
private String safe(String value) {
return value == null ? "" : value;
}
private String shortError(Throwable throwable) {
if (throwable == null) {
return "unknown";
}
String message = throwable.getMessage();
if (!StringUtils.hasText(message)) {
return throwable.getClass().getSimpleName();
}
return message.length() > 120 ? message.substring(0, 120) + "..." : message;
}
private static class ParsedWorkflow {
private List<NodeView> nodes = new ArrayList<>();
private List<EdgeView> edges = new ArrayList<>();
private Map<String, NodeView> nodeMap = new HashMap<>();
}
private static class NodeView {
private String id;
private String type;
private String parentId;
private String name;
private JSONObject data;
private boolean isRootLevel() {
return !StringUtils.hasText(parentId);
}
}
private static class EdgeView {
private String id;
private String source;
private String target;
}
}

View File

@@ -0,0 +1,310 @@
package tech.easyflow.ai.easyagentsflow.service;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.easyagents.flow.core.parser.ChainParser;
import org.junit.Assert;
import org.junit.Test;
import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckResult;
import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckStage;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.ai.node.WorkflowNodeParser;
import tech.easyflow.ai.service.WorkflowService;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class WorkflowCheckServiceTest {
@Test
public void testSaveShouldPassForValidDraft() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(
node("start-1", "startNode", null, data("开始")),
node("code-1", "codeNode", null, data("处理中"))
),
new JSONArray()
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.SAVE, null);
Assert.assertTrue(result.isPassed());
Assert.assertEquals(0, result.getIssueCount());
}
@Test
public void testSaveShouldBlockInvalidJson() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
WorkflowCheckResult result = service.checkContent("{invalid-json", WorkflowCheckStage.SAVE, null);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "INVALID_JSON");
}
@Test
public void testSaveShouldBlockUnknownNodeType() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(node("n1", "unknownNodeType", null, data("未知节点"))),
new JSONArray()
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.SAVE, null);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "NODE_TYPE_UNKNOWN");
}
@Test
public void testSaveShouldBlockEdgeWithMissingNode() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(node("n1", "startNode", null, data("开始"))),
array(edge("e1", "n1", "n2"))
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.SAVE, null);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "EDGE_TARGET_NOT_FOUND");
}
@Test
public void testPreExecuteShouldBlockMissingStartOrEnd() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(
node("c1", "codeNode", null, data("处理")),
node("c2", "codeNode", null, data("处理2"))
),
array(edge("e1", "c1", "c2"))
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "START_NODE_MISSING");
assertHasCode(result, "END_NODE_MISSING");
}
@Test
public void testPreExecuteShouldBlockRootEntryNotStart() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(
node("s1", "startNode", null, data("开始")),
node("x1", "codeNode", null, data("孤立入口")),
node("e1", "endNode", null, data("结束"))
),
array(edge("e-1", "s1", "e1"))
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "ROOT_ENTRY_NOT_START");
}
@Test
public void testPreExecuteShouldBlockGraphCycle() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(
node("s1", "startNode", null, data("开始")),
node("c1", "codeNode", null, data("处理")),
node("e1", "endNode", null, data("结束"))
),
array(
edge("e1", "s1", "c1"),
edge("e2", "c1", "s1"),
edge("e3", "c1", "e1")
)
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "GRAPH_CYCLE");
}
@Test
public void testPreExecuteShouldBlockDeadEndAndUnreachableEnd() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(
node("s1", "startNode", null, data("开始")),
node("c1", "codeNode", null, data("死路节点")),
node("e1", "endNode", null, data("结束"))
),
array(
edge("e1", "s1", "c1"),
edge("e2", "s1", "e1")
)
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "DEAD_END_NODE");
assertHasCode(result, "END_UNREACHABLE");
}
@Test
public void testPreExecuteShouldBlockWorkflowRecursiveReference() throws Exception {
Map<String, String> workflowStore = new HashMap<>();
workflowStore.put("2", workflowJson(
array(
node("s2", "startNode", null, data("开始2")),
workflowNode("w2", null, "1"),
node("e2", "endNode", null, data("结束2"))
),
array(
edge("e2-1", "s2", "w2"),
edge("e2-2", "w2", "e2")
)
));
WorkflowCheckService service = newService(workflowStore);
String rootContent = workflowJson(
array(
node("s1", "startNode", null, data("开始1")),
workflowNode("w1", null, "2"),
node("e1", "endNode", null, data("结束1"))
),
array(
edge("e1-1", "s1", "w1"),
edge("e1-2", "w1", "e1")
)
);
WorkflowCheckResult result = service.checkContent(rootContent, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "WORKFLOW_REF_CYCLE");
}
@Test
public void testPreExecuteShouldPassForExecutableWorkflow() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(
node("s1", "startNode", null, data("开始")),
node("c1", "codeNode", null, data("处理")),
node("e1", "endNode", null, data("结束"))
),
array(
edge("e1", "s1", "c1"),
edge("e2", "c1", "e1")
)
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE);
Assert.assertTrue(result.isPassed());
Assert.assertEquals(0, result.getIssueCount());
}
private static WorkflowCheckService newService(Map<String, String> workflowStore) throws Exception {
WorkflowCheckService service = new WorkflowCheckService();
ChainParser parser = ChainParser.builder()
.withDefaultParsers(true)
.build();
parser.addNodeParser("workflow-node", new WorkflowNodeParser());
setField(service, "chainParser", parser);
setField(service, "workflowService", mockWorkflowService(workflowStore));
return service;
}
private static WorkflowService mockWorkflowService(Map<String, String> workflowStore) {
return (WorkflowService) Proxy.newProxyInstance(
WorkflowService.class.getClassLoader(),
new Class[]{WorkflowService.class},
(proxy, method, args) -> {
String methodName = method.getName();
if ("getById".equals(methodName)) {
if (args == null || args.length == 0 || args[0] == null) {
return null;
}
String id = String.valueOf(args[0]);
if (!workflowStore.containsKey(id)) {
return null;
}
Workflow workflow = new Workflow();
try {
workflow.setId(new BigInteger(id));
} catch (Exception ignored) {
workflow.setId(null);
}
workflow.setContent(workflowStore.get(id));
workflow.setTitle("workflow-" + id);
return workflow;
}
if ("equals".equals(methodName)) {
return proxy == args[0];
}
if ("hashCode".equals(methodName)) {
return System.identityHashCode(proxy);
}
if (method.getReturnType() == boolean.class) {
return false;
}
if (method.getReturnType() == int.class) {
return 0;
}
if (method.getReturnType() == long.class) {
return 0L;
}
return null;
});
}
private static void setField(Object target, String fieldName, Object value) throws Exception {
Field field = WorkflowCheckService.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}
private static void assertHasCode(WorkflowCheckResult result, String code) {
boolean exists = result.getIssues().stream().anyMatch(issue -> code.equals(issue.getCode()));
Assert.assertTrue("missing issue code: " + code, exists);
}
private static String workflowJson(JSONArray nodes, JSONArray edges) {
JSONObject root = new JSONObject();
root.put("nodes", nodes);
root.put("edges", edges);
return root.toJSONString();
}
private static JSONArray array(JSONObject... objects) {
JSONArray array = new JSONArray();
for (JSONObject object : objects) {
array.add(object);
}
return array;
}
private static JSONObject node(String id, String type, String parentId, JSONObject data) {
JSONObject node = new JSONObject();
node.put("id", id);
node.put("type", type);
if (parentId != null) {
node.put("parentId", parentId);
}
node.put("data", data);
return node;
}
private static JSONObject workflowNode(String id, String parentId, String workflowId) {
JSONObject data = data("子流程");
data.put("workflowId", workflowId);
return node(id, "workflow-node", parentId, data);
}
private static JSONObject data(String title) {
JSONObject data = new JSONObject();
data.put("title", title);
return data;
}
private static JSONObject edge(String id, String source, String target) {
JSONObject edge = new JSONObject();
edge.put("id", id);
edge.put("source", source);
edge.put("target", target);
return edge;
}
}