feat: 工作流适配数据中枢查询节点

- 新增查询数据与写入数据节点并移除旧数据中心节点入口

- 将查询数据节点切换为连接服务加 SQL 的执行模型

- 同步更新工作流校验、提示词上下文与设计器交互
This commit is contained in:
2026-04-02 18:56:34 +08:00
parent 798effbd5b
commit 1ecc28e498
40 changed files with 1973 additions and 692 deletions

View File

@@ -4,6 +4,7 @@ import com.easyagents.flow.core.chain.ChainDefinition;
import com.easyagents.flow.core.chain.repository.ChainDefinitionRepository;
import com.easyagents.flow.core.parser.ChainParser;
import org.springframework.stereotype.Component;
import tech.easyflow.ai.easyagentsflow.service.WorkflowDatacenterContentService;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.ai.service.WorkflowService;
@@ -16,11 +17,13 @@ public class ChainDefinitionRepositoryImpl implements ChainDefinitionRepository
private WorkflowService workflowService;
@Resource
private ChainParser chainParser;
@Resource
private WorkflowDatacenterContentService workflowDatacenterContentService;
@Override
public ChainDefinition getChainDefinitionById(String id) {
Workflow workflow = workflowService.getById(id);
String json = workflow.getContent();
String json = workflowDatacenterContentService.prepareContent(workflow.getContent());
ChainDefinition chainDefinition = chainParser.parse(json);
chainDefinition.setId(workflow.getId().toString());
chainDefinition.setName(workflow.getEnglishName());

View File

@@ -65,14 +65,12 @@ public class TinyFlowConfigService {
MakeFileNodeParser makeFileNodeParser = new MakeFileNodeParser();
// 插件
PluginToolNodeParser pluginToolNodeParser = new PluginToolNodeParser();
// SQL查询
SqlNodeParser sqlNodeParser = new SqlNodeParser();
// 下载文件节点
DownloadNodeParser downloadNodeParser = new DownloadNodeParser();
// 保存数据节点
SaveToDatacenterNodeParser saveDaveParser = new SaveToDatacenterNodeParser();
SaveDatasetNodeParser saveDatasetNodeParser = new SaveDatasetNodeParser();
// 查询数据节点
SearchDatacenterNodeParser searchDatacenterNodeParser = new SearchDatacenterNodeParser();
SearchDatasetNodeParser searchDatasetNodeParser = new SearchDatasetNodeParser();
// 工作流节点
WorkflowNodeParser workflowNodeParser = new WorkflowNodeParser();
// 条件判断节点
@@ -81,10 +79,9 @@ public class TinyFlowConfigService {
chainParser.addNodeParser(docNodeParser.getNodeName(), docNodeParser);
chainParser.addNodeParser(makeFileNodeParser.getNodeName(), makeFileNodeParser);
chainParser.addNodeParser(pluginToolNodeParser.getNodeName(), pluginToolNodeParser);
chainParser.addNodeParser(sqlNodeParser.getNodeName(), sqlNodeParser);
chainParser.addNodeParser(downloadNodeParser.getNodeName(), downloadNodeParser);
chainParser.addNodeParser(saveDaveParser.getNodeName(), saveDaveParser);
chainParser.addNodeParser(searchDatacenterNodeParser.getNodeName(), searchDatacenterNodeParser);
chainParser.addNodeParser(saveDatasetNodeParser.getNodeName(), saveDatasetNodeParser);
chainParser.addNodeParser(searchDatasetNodeParser.getNodeName(), searchDatasetNodeParser);
chainParser.addNodeParser(workflowNodeParser.getNodeName(), workflowNodeParser);
chainParser.addNodeParser(conditionNodeParser.getNodeName(), conditionNodeParser);
}

View File

@@ -12,15 +12,30 @@ import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckStage;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.ai.service.WorkflowService;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.execution.model.DatacenterSchemaResponse;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.util.*;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@Service
public class WorkflowCheckService {
private static final String LEVEL_ERROR = "ERROR";
private static final String LEVEL_WARNING = "WARNING";
private static final String TYPE_START = "startNode";
private static final String TYPE_END = "endNode";
private static final String TYPE_LOOP = "loopNode";
@@ -30,6 +45,8 @@ public class WorkflowCheckService {
private WorkflowService workflowService;
@Resource
private ChainParser chainParser;
@Resource
private WorkflowDatacenterContentService workflowDatacenterContentService;
public WorkflowCheckResult checkWorkflow(BigInteger workflowId, WorkflowCheckStage stage) {
if (workflowId == null) {
@@ -177,9 +194,91 @@ public class WorkflowCheckService {
parsedWorkflow.nodes = nodes;
parsedWorkflow.edges = edges;
parsedWorkflow.nodeMap = nodeMap;
checkDatacenterNodes(parsedWorkflow, issues, issueKeys);
return parsedWorkflow;
}
private void checkDatacenterNodes(ParsedWorkflow parsed, List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
for (NodeView node : parsed.nodes) {
if (node == null) {
continue;
}
if (workflowDatacenterContentService.isSearchDatasetNode(node.type)) {
checkSearchDatasetNode(node, issues, issueKeys);
continue;
}
if (workflowDatacenterContentService.isSaveDatasetNode(node.type)) {
checkSaveDatasetNode(node, issues, issueKeys);
continue;
}
if (workflowDatacenterContentService.isLlmNode(node.type)) {
checkLlmQueryContext(node, parsed, issues, issueKeys);
}
}
}
private void checkSearchDatasetNode(NodeView node,
List<WorkflowCheckIssue> issues,
Set<String> issueKeys) {
try {
workflowDatacenterContentService.requireSearchDatasetRef(node.data);
} catch (BusinessException e) {
addIssue(issues, issueKeys, "SEARCH_DATASET_INVALID", e.getMessage(), node.id, null, node.name);
} catch (Exception e) {
addIssue(issues, issueKeys, "SEARCH_DATASET_INVALID",
"查询数据节点校验失败: " + shortError(e), node.id, null, node.name);
}
}
private void checkSaveDatasetNode(NodeView node,
List<WorkflowCheckIssue> issues,
Set<String> issueKeys) {
try {
workflowDatacenterContentService.requireSaveDatasetRef(node.data);
} catch (BusinessException e) {
addIssue(issues, issueKeys, "SAVE_DATASET_INVALID", e.getMessage(), node.id, null, node.name);
} catch (Exception e) {
addIssue(issues, issueKeys, "SAVE_DATASET_INVALID",
"写入数据节点校验失败: " + shortError(e), node.id, null, node.name);
}
}
private void checkLlmQueryContext(NodeView node,
ParsedWorkflow parsed,
List<WorkflowCheckIssue> issues,
Set<String> issueKeys) {
if (node.data == null) {
return;
}
Object rawNodeIds = node.data.get("queryContextNodeIds");
if (rawNodeIds == null) {
return;
}
if (!(rawNodeIds instanceof JSONArray nodeIds)) {
addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_INVALID",
"查询上下文配置无效,请重新选择查询数据节点", node.id, null, node.name);
return;
}
for (int i = 0; i < nodeIds.size(); i++) {
String refNodeId = trimToNull(nodeIds.getString(i));
if (!StringUtils.hasText(refNodeId)) {
addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_EMPTY",
"查询上下文配置无效,请重新选择查询数据节点", node.id, null, node.name);
continue;
}
NodeView refNode = parsed.nodeMap.get(refNodeId);
if (refNode == null) {
addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_NOT_FOUND",
"查询上下文节点不存在: " + refNodeId, node.id, null, node.name);
continue;
}
if (!workflowDatacenterContentService.isSearchDatasetNode(refNode.type)) {
addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_TYPE_INVALID",
"查询上下文只能选择查询数据节点", node.id, null, node.name);
}
}
}
private void runStrictChecks(String content, ParsedWorkflow parsed, BigInteger currentWorkflowId,
List<WorkflowCheckIssue> issues, Set<String> issueKeys) {
if (parsed.nodes.isEmpty()) {
@@ -190,7 +289,8 @@ public class WorkflowCheckService {
}
try {
Object definition = chainParser.parse(content);
String preparedContent = workflowDatacenterContentService.prepareContent(content);
Object definition = chainParser.parse(preparedContent);
if (definition == null) {
addIssue(issues, issueKeys, "PARSE_NULL", "预执行校验失败:节点配置错误,请检查", null, null, null);
}
@@ -606,31 +706,47 @@ public class WorkflowCheckService {
result.setStage(stage);
result.setIssues(issues);
result.setIssueCount(issues.size());
result.setPassed(issues.isEmpty());
result.setPassed(issues.stream().noneMatch(issue -> LEVEL_ERROR.equalsIgnoreCase(issue.getLevel())));
return result;
}
private void throwIfFailed(WorkflowCheckResult result) {
if (result == null || result.isPassed()) {
if (result == null) {
return;
}
String summary = result.getIssues().stream()
List<WorkflowCheckIssue> errorIssues = result.getIssues().stream()
.filter(issue -> LEVEL_ERROR.equalsIgnoreCase(issue.getLevel()))
.collect(Collectors.toList());
if (errorIssues.isEmpty()) {
return;
}
String summary = errorIssues.stream()
.limit(5)
.map(WorkflowCheckIssue::getMessage)
.collect(Collectors.joining(""));
if (result.getIssueCount() > 5) {
if (errorIssues.size() > 5) {
summary = summary + ";等";
}
throw new BusinessException("工作流校验未通过(" + result.getStage() + "),共 " + result.getIssueCount() + " 项:" + summary);
throw new BusinessException("工作流校验未通过(" + result.getStage() + "),共 " + errorIssues.size() + " 项:" + summary);
}
private void addIssue(List<WorkflowCheckIssue> issues, Set<String> issueKeys, String code,
String message, String nodeId, String edgeId, String nodeName) {
addIssue(issues, issueKeys, code, LEVEL_ERROR, message, nodeId, edgeId, nodeName);
}
private void addWarning(List<WorkflowCheckIssue> issues, Set<String> issueKeys, String code,
String message, String nodeId, String edgeId, String nodeName) {
addIssue(issues, issueKeys, code, LEVEL_WARNING, message, nodeId, edgeId, nodeName);
}
private void addIssue(List<WorkflowCheckIssue> issues, Set<String> issueKeys, String code, String level,
String message, String nodeId, String edgeId, String nodeName) {
String key = code + "|" + safe(nodeId) + "|" + safe(edgeId) + "|" + message;
if (!issueKeys.add(key)) {
return;
}
issues.add(new WorkflowCheckIssue(code, LEVEL_ERROR, message, nodeId, edgeId, nodeName));
issues.add(new WorkflowCheckIssue(code, level, message, nodeId, edgeId, nodeName));
}
private String extractNodeName(JSONObject nodeJson, JSONObject data, String fallback) {

View File

@@ -0,0 +1,349 @@
package tech.easyflow.ai.easyagentsflow.service;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import tech.easyflow.common.constant.enums.EnumFieldType;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.entity.DatacenterTable;
import tech.easyflow.datacenter.entity.DatacenterTableField;
import tech.easyflow.datacenter.execution.model.DatacenterSchemaResponse;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import tech.easyflow.datacenter.execution.service.DatacenterDatasetQueryService;
import tech.easyflow.datacenter.meta.entity.DatacenterCatalog;
import tech.easyflow.datacenter.meta.entity.DatacenterSource;
import tech.easyflow.datacenter.meta.service.DatacenterDatasetRegistryService;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Service
public class WorkflowDatacenterContentService {
public static final String SEARCH_NODE_TYPE = "search-dataset-node";
public static final String SAVE_NODE_TYPE = "save-dataset-node";
public static final String LLM_NODE_TYPE = "llmNode";
public static final String QUERY_DATA_CONTEXT = "queryDataContext";
public static final String SEARCH_SOURCE_MISSING_MESSAGE = "查询数据节点未选择连接服务";
public static final String SEARCH_SQL_MISSING_MESSAGE = "查询数据节点未设置 SQL";
public static final String SAVE_EXPIRED_MESSAGE = "写入数据节点配置已过期,请重新选择已接入表";
public static final String INVALID_QUERY_CONTEXT_MESSAGE = "查询上下文配置无效,请重新选择查询数据节点";
private static final String QUERY_CONTEXT_PROMPT = """
你是为工作流中的查询数据节点生成只读 SQL 的生成器,你的职责是返回可直接执行的 SQL并且你只能输出 SQL。
必须严格遵守以下规则:
1. 只能从下面给出的连接摘要中选择最合适的表。
2. 只能使用摘要中给出的字段名,不要虚构表名或字段名。
3. 只输出 SQL不要输出解释、注释、Markdown、JSON 或多余文本。
4. 只能生成只读 SELECT SQL允许 WITH、JOIN、子查询、聚合、分组、排序。
5. 不要生成 INSERT、UPDATE、DELETE、DDL、多语句、存储过程调用。
6. 优先使用逻辑表名和逻辑字段名不要输出物理表名、JDBC、驱动信息。
7. 如果存在重名表,请使用 catalog.table 形式消除歧义。
以下是可用的连接摘要:
""";
@Resource
private DatacenterDatasetQueryService queryService;
@Resource
private DatacenterDatasetRegistryService registryService;
public boolean isSearchDatasetNode(String nodeType) {
return SEARCH_NODE_TYPE.equals(nodeType);
}
public boolean isSaveDatasetNode(String nodeType) {
return SAVE_NODE_TYPE.equals(nodeType);
}
public boolean isLlmNode(String nodeType) {
return LLM_NODE_TYPE.equals(nodeType);
}
public String prepareContent(String content) {
if (!StringUtils.hasText(content)) {
throw new BusinessException("工作流内容不能为空");
}
Object parsed;
try {
parsed = JSON.parse(content);
} catch (Exception e) {
throw new BusinessException("工作流内容不是合法JSON: " + shortError(e));
}
if (!(parsed instanceof JSONObject root)) {
throw new BusinessException("工作流内容必须是JSON对象");
}
JSONObject copy = JSON.parseObject(root.toJSONString());
prepareRoot(copy);
return copy.toJSONString();
}
public JSONObject prepareRoot(JSONObject root) {
if (root == null) {
throw new BusinessException("工作流内容不能为空");
}
JSONArray nodes = root.getJSONArray("nodes");
if (nodes == null || nodes.isEmpty()) {
return root;
}
Map<String, JSONObject> nodeMap = buildNodeMap(nodes);
for (int i = 0; i < nodes.size(); i++) {
JSONObject node = nodes.getJSONObject(i);
if (node == null) {
continue;
}
String nodeType = node.getString("type");
JSONObject data = node.getJSONObject("data");
if (isSearchDatasetNode(nodeType)) {
requireSearchDatasetRef(data);
} else if (isSaveDatasetNode(nodeType)) {
requireSaveDatasetRef(data);
}
}
for (int i = 0; i < nodes.size(); i++) {
JSONObject node = nodes.getJSONObject(i);
if (node == null || !isLlmNode(node.getString("type"))) {
continue;
}
injectQueryDataContext(node.getJSONObject("data"), nodeMap);
}
return root;
}
public boolean hasSaveLegacyFields(JSONObject data) {
return hasAnyField(data, "tableId", "sourceId", "catalogId", "versionId");
}
public DatasetRef readDatasetRef(JSONObject data) {
return data == null ? null : data.getObject("datasetRef", DatasetRef.class);
}
public DatasetRef requireSearchDatasetRef(JSONObject data) {
DatasetRef datasetRef = readDatasetRef(data);
if (datasetRef == null || datasetRef.getSourceId() == null) {
throw new BusinessException(SEARCH_SOURCE_MISSING_MESSAGE);
}
String querySql = data == null ? null : trimToNull(data.getString("querySql"));
if (!StringUtils.hasText(querySql)) {
throw new BusinessException(SEARCH_SQL_MISSING_MESSAGE);
}
return datasetRef;
}
public DatasetRef requireSaveDatasetRef(JSONObject data) {
if (hasSaveLegacyFields(data)) {
throw new BusinessException(SAVE_EXPIRED_MESSAGE);
}
DatasetRef datasetRef = readDatasetRef(data);
if (datasetRef == null || datasetRef.getTableId() == null) {
throw new BusinessException(SAVE_EXPIRED_MESSAGE);
}
return datasetRef;
}
public DatacenterSchemaResponse loadSchema(DatasetRef datasetRef, Map<BigInteger, DatacenterSchemaResponse> schemaCache) {
if (datasetRef == null || datasetRef.getTableId() == null) {
throw new BusinessException("缺少已接入表配置");
}
BigInteger tableId = datasetRef.getTableId();
if (schemaCache != null && schemaCache.containsKey(tableId)) {
return schemaCache.get(tableId);
}
DatacenterSchemaResponse schema = queryService.getSchema(copyDatasetRef(datasetRef));
if (schemaCache != null) {
schemaCache.put(tableId, schema);
}
return schema;
}
public JSONObject buildSourceSummary(DatasetRef datasetRef) {
if (datasetRef == null || datasetRef.getSourceId() == null) {
throw new BusinessException(SEARCH_SOURCE_MISSING_MESSAGE);
}
DatacenterSource source = registryService.getSourceRequired(datasetRef.getSourceId());
List<DatacenterTable> managedTables = registryService.listManagedTables(datasetRef.getSourceId(), datasetRef.getCatalogId());
managedTables.sort(Comparator.comparing(table -> table.getTableName() == null ? "" : table.getTableName()));
JSONObject sourceSummary = new JSONObject();
sourceSummary.put("sourceName", source.getSourceName());
sourceSummary.put("sourceType", source.getSourceType());
JSONArray tables = new JSONArray();
for (DatacenterTable table : managedTables) {
DatacenterTable fullTable = registryService.getTableWithFields(table.getId());
DatacenterCatalog catalog = registryService.getCatalogById(fullTable.getCatalogId());
if (StringUtils.hasText(datasetRef.getCatalogName())
&& (catalog == null || !datasetRef.getCatalogName().equals(catalog.getCatalogName()))) {
continue;
}
JSONObject tableSummary = new JSONObject();
tableSummary.put("catalogName", catalog == null ? null : catalog.getCatalogName());
tableSummary.put("tableName", fullTable.getTableName());
tableSummary.put("tableDesc", fullTable.getTableDesc());
JSONArray fields = new JSONArray();
if (fullTable.getFields() != null) {
for (DatacenterTableField field : fullTable.getFields()) {
JSONObject fieldSummary = new JSONObject();
fieldSummary.put("fieldName", field.getFieldName());
fieldSummary.put("fieldDesc", field.getFieldDesc());
fieldSummary.put("fieldType", resolveFieldType(field));
fields.add(fieldSummary);
}
}
tableSummary.put("fields", fields);
tables.add(tableSummary);
}
sourceSummary.put("tables", tables);
return sourceSummary;
}
private void injectQueryDataContext(JSONObject data, Map<String, JSONObject> nodeMap) {
if (data == null) {
return;
}
JSONArray nodeIds = data.getJSONArray("queryContextNodeIds");
if (nodeIds == null || nodeIds.isEmpty()) {
removeQueryDataContextParameter(data);
return;
}
Map<BigInteger, JSONObject> sourceSummaries = new LinkedHashMap<>();
Set<String> visitedNodeIds = new LinkedHashSet<>();
for (int i = 0; i < nodeIds.size(); i++) {
String nodeId = trimToNull(nodeIds.getString(i));
if (!StringUtils.hasText(nodeId) || !visitedNodeIds.add(nodeId)) {
continue;
}
JSONObject targetNode = nodeMap.get(nodeId);
if (targetNode == null || !isSearchDatasetNode(targetNode.getString("type"))) {
throw new BusinessException(INVALID_QUERY_CONTEXT_MESSAGE);
}
DatasetRef datasetRef = requireSearchDatasetRef(targetNode.getJSONObject("data"));
sourceSummaries.putIfAbsent(datasetRef.getSourceId(), buildSourceSummary(datasetRef));
}
String contextValue = QUERY_CONTEXT_PROMPT + "\n" + JSON.toJSONString(new ArrayList<>(sourceSummaries.values()));
upsertQueryDataContextParameter(data, contextValue);
}
private String resolveFieldType(DatacenterTableField field) {
if (field == null) {
return null;
}
if (StringUtils.hasText(field.getJdbcType())) {
return field.getJdbcType();
}
try {
EnumFieldType enumFieldType = EnumFieldType.getByCode(field.getFieldType());
if (enumFieldType != null) {
return enumFieldType.getText();
}
} catch (Exception ignored) {
}
return field.getFieldType() == null ? null : String.valueOf(field.getFieldType());
}
private void upsertQueryDataContextParameter(JSONObject data, String contextValue) {
JSONArray parameters = data.getJSONArray("parameters");
if (parameters == null) {
parameters = new JSONArray();
data.put("parameters", parameters);
}
for (int i = parameters.size() - 1; i >= 0; i--) {
JSONObject parameter = parameters.getJSONObject(i);
if (parameter != null && QUERY_DATA_CONTEXT.equals(parameter.getString("name"))) {
parameters.remove(i);
}
}
JSONObject parameter = new JSONObject();
parameter.put("id", QUERY_DATA_CONTEXT);
parameter.put("name", QUERY_DATA_CONTEXT);
parameter.put("title", "查询上下文");
parameter.put("description", "数据查询规则与连接表摘要");
parameter.put("dataType", "String");
parameter.put("refType", "fixed");
parameter.put("value", contextValue);
parameter.put("required", false);
parameter.put("nameDisabled", true);
parameter.put("dataTypeDisabled", true);
parameter.put("deleteDisabled", true);
parameters.add(parameter);
}
private void removeQueryDataContextParameter(JSONObject data) {
if (data == null) {
return;
}
JSONArray parameters = data.getJSONArray("parameters");
if (parameters == null || parameters.isEmpty()) {
return;
}
for (int i = parameters.size() - 1; i >= 0; i--) {
JSONObject parameter = parameters.getJSONObject(i);
if (parameter != null && QUERY_DATA_CONTEXT.equals(parameter.getString("name"))) {
parameters.remove(i);
}
}
}
private Map<String, JSONObject> buildNodeMap(JSONArray nodes) {
Map<String, JSONObject> nodeMap = new LinkedHashMap<>();
for (int i = 0; i < nodes.size(); i++) {
JSONObject node = nodes.getJSONObject(i);
if (node == null) {
continue;
}
String nodeId = trimToNull(node.getString("id"));
if (StringUtils.hasText(nodeId)) {
nodeMap.put(nodeId, node);
}
}
return nodeMap;
}
private DatasetRef copyDatasetRef(DatasetRef datasetRef) {
DatasetRef copy = new DatasetRef();
copy.setSourceId(datasetRef.getSourceId());
copy.setCatalogId(datasetRef.getCatalogId());
copy.setCatalogName(datasetRef.getCatalogName());
copy.setTableId(datasetRef.getTableId());
copy.setTableName(datasetRef.getTableName());
copy.setVersionId(datasetRef.getVersionId());
return copy;
}
private boolean hasAnyField(JSONObject data, String... fieldNames) {
if (data == null || fieldNames == null) {
return false;
}
for (String fieldName : fieldNames) {
if (data.containsKey(fieldName)) {
return true;
}
}
return false;
}
private String trimToNull(String value) {
if (!StringUtils.hasText(value)) {
return null;
}
return value.trim();
}
private String shortError(Throwable throwable) {
if (throwable == null) {
return "unknown";
}
String message = throwable.getMessage();
if (!StringUtils.hasText(message)) {
return throwable.getClass().getSimpleName();
}
return message.length() > 120 ? message.substring(0, 120) + "..." : message;
}
}

View File

@@ -0,0 +1,75 @@
package tech.easyflow.ai.node;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.easyagents.flow.core.chain.Chain;
import com.easyagents.flow.core.node.BaseNode;
import com.mybatisflex.core.tenant.TenantManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.easyflow.ai.utils.WorkFlowUtil;
import tech.easyflow.common.entity.LoginAccount;
import tech.easyflow.common.util.SpringContextUtil;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import tech.easyflow.datacenter.execution.service.DatacenterDatasetQueryService;
import tech.easyflow.datacenter.execution.service.DatacenterDatasetWriteService;
import java.util.HashMap;
import java.util.Map;
public class SaveDatasetNode extends BaseNode {
private static final Logger log = LoggerFactory.getLogger(SaveDatasetNode.class);
private DatasetRef datasetRef;
public SaveDatasetNode() {
}
public SaveDatasetNode(DatasetRef datasetRef) {
this.datasetRef = datasetRef;
}
@Override
public Map<String, Object> execute(Chain chain) {
Map<String, Object> state = chain.getState().resolveParameters(this);
JSONObject payload = new JSONObject(state);
JSONArray saveList = payload.getJSONArray("saveList");
if (saveList == null || saveList.isEmpty()) {
throw new RuntimeException("saveList 不能为空");
}
LoginAccount account = WorkFlowUtil.getOperator(chain);
DatacenterDatasetWriteService writeService = SpringContextUtil.getBean(DatacenterDatasetWriteService.class);
DatacenterDatasetQueryService queryService = SpringContextUtil.getBean(DatacenterDatasetQueryService.class);
int successRows = 0;
try {
TenantManager.ignoreTenantCondition();
for (Object item : saveList) {
JSONObject row = item instanceof JSONObject json ? json : JSONObject.from(item);
writeService.saveRow(datasetRef, row, account);
successRows++;
}
var schema = queryService.getSchema(datasetRef);
Map<String, Object> result = new HashMap<>();
result.put("successRows", successRows);
result.put("source", schema.getSource());
result.put("catalog", schema.getCatalog());
result.put("table", schema.getTable());
result.put("version", datasetRef.getVersionId());
return result;
} catch (Exception ex) {
log.error("工作流保存数据到统一数据集失败datasetRef={}", datasetRef, ex);
throw ex;
} finally {
TenantManager.restoreTenantCondition();
}
}
public DatasetRef getDatasetRef() {
return datasetRef;
}
public void setDatasetRef(DatasetRef datasetRef) {
this.datasetRef = datasetRef;
}
}

View File

@@ -0,0 +1,41 @@
package tech.easyflow.ai.node;
import com.alibaba.fastjson.JSONObject;
import com.easyagents.flow.core.node.BaseNode;
import com.easyagents.flow.core.parser.BaseNodeParser;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.execution.model.DatasetRef;
public class SaveDatasetNodeParser extends BaseNodeParser {
private static final String EXPIRED_MESSAGE = "写入数据节点配置已过期,请重新选择已接入表";
@Override
protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) {
if (data == null) {
throw new BusinessException(EXPIRED_MESSAGE);
}
if (hasLegacyFields(data)) {
throw new BusinessException(EXPIRED_MESSAGE);
}
DatasetRef datasetRef = data.getObject("datasetRef", DatasetRef.class);
if (datasetRef == null || datasetRef.getTableId() == null) {
throw new BusinessException(EXPIRED_MESSAGE);
}
return new SaveDatasetNode(datasetRef);
}
public String getNodeName() {
return "save-dataset-node";
}
private boolean hasLegacyFields(JSONObject data) {
if (data == null) {
return false;
}
return data.containsKey("tableId")
|| data.containsKey("sourceId")
|| data.containsKey("catalogId")
|| data.containsKey("versionId");
}
}

View File

@@ -1,75 +0,0 @@
package tech.easyflow.ai.node;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.mybatisflex.core.tenant.TenantManager;
import com.easyagents.flow.core.chain.Chain;
import com.easyagents.flow.core.node.BaseNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.easyflow.ai.utils.WorkFlowUtil;
import tech.easyflow.common.entity.LoginAccount;
import tech.easyflow.common.util.SpringContextUtil;
import tech.easyflow.datacenter.service.DatacenterTableService;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class SaveToDatacenterNode extends BaseNode {
private static final Logger log = LoggerFactory.getLogger(SaveToDatacenterNode.class);
private BigInteger tableId;
public SaveToDatacenterNode() {
}
public SaveToDatacenterNode(BigInteger tableId) {
this.tableId = tableId;
}
@Override
public Map<String, Object> execute(Chain chain) {
Map<String, Object> map = chain.getState().resolveParameters(this);
JSONObject json = new JSONObject(map);
Map<String, Object> res = new HashMap<>();
// 默认为未知来源
LoginAccount account = WorkFlowUtil.getOperator(chain);
DatacenterTableService service = SpringContextUtil.getBean(DatacenterTableService.class);
JSONArray saveList = json.getJSONArray("saveList");
int successRows = 0;
for (Object object : saveList) {
JSONObject obj = new JSONObject((com.alibaba.fastjson.JSONObject) object);
obj.put("table_id", tableId);
try {
TenantManager.ignoreTenantCondition();
service.saveValue(tableId, obj, account);
} catch (Exception e) {
log.error("工作流保存数据到数据中枢失败表ID{},具体值:{}", tableId, obj, e);
throw e;
} finally {
TenantManager.restoreTenantCondition();
}
successRows++;
}
res.put("successRows", successRows);
return res;
}
public BigInteger getTableId() {
return tableId;
}
public void setTableId(BigInteger tableId) {
this.tableId = tableId;
}
}

View File

@@ -1,23 +0,0 @@
package tech.easyflow.ai.node;
import com.alibaba.fastjson.JSONObject;
import com.easyagents.flow.core.node.BaseNode;
import com.easyagents.flow.core.parser.BaseNodeParser;
import java.math.BigInteger;
public class SaveToDatacenterNodeParser extends BaseNodeParser {
@Override
protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) {
BigInteger tableId = data.getBigInteger("tableId");
if (tableId == null) {
throw new RuntimeException("请选择数据表");
}
return new SaveToDatacenterNode(tableId);
}
public String getNodeName() {
return "save-to-datacenter-node";
}
}

View File

@@ -1,143 +0,0 @@
package tech.easyflow.ai.node;
import com.easyagents.core.util.StringUtil;
import com.mybatisflex.core.paginate.Page;
import com.mybatisflex.core.row.Row;
import com.mybatisflex.core.tenant.TenantManager;
import com.easyagents.flow.core.chain.Chain;
import com.easyagents.flow.core.chain.Parameter;
import com.easyagents.flow.core.node.BaseNode;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.easyflow.common.entity.DatacenterQuery;
import tech.easyflow.common.util.SpringContextUtil;
import tech.easyflow.datacenter.entity.DatacenterTableField;
import tech.easyflow.datacenter.service.DatacenterTableService;
import tech.easyflow.datacenter.utils.WhereConditionSecurityChecker;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class SearchDatacenterNode extends BaseNode {
private static final Logger log = LoggerFactory.getLogger(SearchDatacenterNode.class);
private BigInteger tableId;
private String where;
private Long limit;
public SearchDatacenterNode() {
}
public SearchDatacenterNode(BigInteger tableId, String where, Long limit) {
this.tableId = tableId;
this.where = where;
this.limit = limit;
}
@Override
public Map<String, Object> execute(Chain chain) {
Map<String, Object> map = chain.getState().resolveParameters(this);
Map<String, Object> res = new HashMap<>();
long limitNum = 10;
if (limit != null) {
limitNum = Long.parseLong(limit.toString());
}
DatacenterTableService service = SpringContextUtil.getBean(DatacenterTableService.class);
DatacenterQuery condition = new DatacenterQuery();
condition.setTableId(tableId);
condition.setPageNumber(1L);
condition.setPageSize(limitNum);
// 组合查询条件
if (where != null) {
setCondition(where, condition, map);
}
try {
TenantManager.ignoreTenantCondition();
Page<Row> pageData = service.getPageData(condition);
String key = "rows";
List<Parameter> outputDefs = getOutputDefs();
if (outputDefs != null && !outputDefs.isEmpty()) {
String defName = outputDefs.get(0).getName();
if (StringUtil.hasText(defName)) key = defName;
}
res.put(key, pageData.getRecords());
} finally {
TenantManager.restoreTenantCondition();
}
return res;
}
public BigInteger getTableId() {
return tableId;
}
public void setTableId(BigInteger tableId) {
this.tableId = tableId;
}
public String getWhere() {
return where;
}
public void setWhere(String where) {
this.where = where;
}
public Long getLimit() {
return limit;
}
public void setLimit(Long limit) {
this.limit = limit;
}
private void setCondition(String where, DatacenterQuery condition, Map<String, Object> params) {
// 条件封装
Pattern pattern = Pattern.compile("\\{\\{(.+?)\\}\\}");
Matcher matcher = pattern.matcher(where);
StringBuffer result = new StringBuffer();
while (matcher.find()) {
String key = matcher.group(1);
Object value = params.get(key);
if (value == null) {
throw new RuntimeException("参数" + key + "不存在");
}
String replacement = value.toString();
matcher.appendReplacement(result, "'" + replacement + "'");
}
matcher.appendTail(result);
try {
Expression expression = CCJSqlParserUtil.parseCondExpression(result.toString());
if (expression != null) {
WhereConditionSecurityChecker checker = new WhereConditionSecurityChecker();
DatacenterTableService service = SpringContextUtil.getBean(DatacenterTableService.class);
List<DatacenterTableField> fields = service.getFields(tableId);
Set<String> columns = fields.stream().map(DatacenterTableField::getFieldName).collect(Collectors.toSet());
columns.add("id");
columns.add("created");
columns.add("modified");
columns.add("created_by");
columns.add("modified_by");
checker.checkConditionSafety(expression, columns);
condition.setWhere(expression.toString());
}
} catch (Exception e) {
log.error("WHERE SQL解析错误", e);
throw new RuntimeException(e);
}
}
}

View File

@@ -1,25 +0,0 @@
package tech.easyflow.ai.node;
import com.alibaba.fastjson.JSONObject;
import com.easyagents.flow.core.node.BaseNode;
import com.easyagents.flow.core.parser.BaseNodeParser;
import java.math.BigInteger;
public class SearchDatacenterNodeParser extends BaseNodeParser {
@Override
protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) {
BigInteger tableId = data.getBigInteger("tableId");
String where = data.getString("where");
Long limit = data.getLong("limit");
if (tableId == null) {
throw new RuntimeException("请选择数据表");
}
return new SearchDatacenterNode(tableId,where,limit);
}
public String getNodeName() {
return "search-datacenter-node";
}
}

View File

@@ -0,0 +1,120 @@
package tech.easyflow.ai.node;
import com.easyagents.core.util.StringUtil;
import com.easyagents.flow.core.chain.Chain;
import com.easyagents.flow.core.chain.Parameter;
import com.easyagents.flow.core.node.BaseNode;
import com.mybatisflex.core.row.Row;
import com.mybatisflex.core.tenant.TenantManager;
import tech.easyflow.common.util.SpringContextUtil;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.execution.model.DatacenterSqlQueryRequest;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import tech.easyflow.datacenter.execution.service.DatacenterDatasetQueryService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SearchDatasetNode extends BaseNode {
private static final Pattern PARAM_PATTERN = Pattern.compile("\\{\\{(.+?)\\}\\}");
private DatasetRef datasetRef;
private String querySql;
public SearchDatasetNode() {
}
public SearchDatasetNode(DatasetRef datasetRef) {
this.datasetRef = datasetRef;
}
public SearchDatasetNode(DatasetRef datasetRef, String querySql) {
this.datasetRef = datasetRef;
this.querySql = querySql;
}
@Override
public Map<String, Object> execute(Chain chain) {
Map<String, Object> params = chain.getState().resolveParameters(this);
DatacenterDatasetQueryService queryService = SpringContextUtil.getBean(DatacenterDatasetQueryService.class);
DatacenterSqlQueryRequest request = buildRuntimeRequest(params);
Map<String, Object> result = new HashMap<>();
try {
TenantManager.ignoreTenantCondition();
List<Row> rows = queryService.queryBySql(request);
result.put(resolveOutputKey("data"), rows);
return result;
} finally {
TenantManager.restoreTenantCondition();
}
}
private DatacenterSqlQueryRequest buildRuntimeRequest(Map<String, Object> params) {
DatacenterSqlQueryRequest request = new DatacenterSqlQueryRequest();
request.setDatasetRef(copyDatasetRef());
request.setSql(resolveQuerySql(params));
return request;
}
private String resolveQuerySql(Map<String, Object> params) {
String sql = resolveTemplateString(querySql, params);
if (!StringUtil.hasText(sql)) {
throw new BusinessException("查询数据节点未设置 SQL");
}
return sql.trim();
}
private DatasetRef copyDatasetRef() {
DatasetRef copy = new DatasetRef();
copy.setSourceId(datasetRef == null ? null : datasetRef.getSourceId());
copy.setCatalogId(datasetRef == null ? null : datasetRef.getCatalogId());
copy.setCatalogName(datasetRef == null ? null : datasetRef.getCatalogName());
copy.setTableId(null);
copy.setTableName(null);
copy.setVersionId(null);
return copy;
}
private String resolveTemplateString(String text, Map<String, Object> params) {
if (!StringUtil.hasText(text) || params == null || params.isEmpty()) {
return text;
}
Matcher matcher = PARAM_PATTERN.matcher(text);
StringBuffer buffer = new StringBuffer();
while (matcher.find()) {
Object replacement = params.get(matcher.group(1));
matcher.appendReplacement(buffer, replacement == null ? "" : Matcher.quoteReplacement(String.valueOf(replacement)));
}
matcher.appendTail(buffer);
return buffer.toString();
}
private String resolveOutputKey(String defaultName) {
List<Parameter> outputDefs = getOutputDefs();
if (outputDefs == null || outputDefs.isEmpty()) {
return defaultName;
}
String name = outputDefs.get(0).getName();
return StringUtil.hasText(name) ? name : defaultName;
}
public DatasetRef getDatasetRef() {
return datasetRef;
}
public void setDatasetRef(DatasetRef datasetRef) {
this.datasetRef = datasetRef;
}
public String getQuerySql() {
return querySql;
}
public void setQuerySql(String querySql) {
this.querySql = querySql;
}
}

View File

@@ -0,0 +1,33 @@
package tech.easyflow.ai.node;
import com.alibaba.fastjson.JSONObject;
import com.easyagents.flow.core.node.BaseNode;
import com.easyagents.flow.core.parser.BaseNodeParser;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.execution.model.DatasetRef;
public class SearchDatasetNodeParser extends BaseNodeParser {
private static final String SOURCE_MISSING_MESSAGE = "查询数据节点未选择连接服务";
private static final String SQL_MISSING_MESSAGE = "查询数据节点未设置 SQL";
@Override
protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) {
if (data == null) {
throw new BusinessException(SOURCE_MISSING_MESSAGE);
}
DatasetRef datasetRef = data.getObject("datasetRef", DatasetRef.class);
if (datasetRef == null || datasetRef.getSourceId() == null) {
throw new BusinessException(SOURCE_MISSING_MESSAGE);
}
String querySql = data.getString("querySql");
if (querySql == null || querySql.isBlank()) {
throw new BusinessException(SQL_MISSING_MESSAGE);
}
return new SearchDatasetNode(datasetRef, querySql);
}
public String getNodeName() {
return "search-dataset-node";
}
}

View File

@@ -1,149 +0,0 @@
package tech.easyflow.ai.node;
import com.mybatisflex.core.row.Db;
import com.mybatisflex.core.row.Row;
import com.easyagents.flow.core.chain.Chain;
import com.easyagents.flow.core.node.BaseNode;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.select.Select;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import tech.easyflow.common.web.exceptions.BusinessException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* SQL查询节点
*
* @author tao
* @date 2025-05-21
*/
public class SqlNode extends BaseNode {
private String sql;
private static final Logger logger = LoggerFactory.getLogger(SqlNode.class);
public SqlNode() {
}
public SqlNode(String sql) {
this.sql = sql;
}
@Override
public Map<String, Object> execute(Chain chain) {
Map<String, Object> map = chain.getState().resolveParameters(this);
Map<String, Object> res = new HashMap<>();
Map<String, Object> formatSqlMap = formatSql(sql, map);
String formatSql = (String) formatSqlMap.get("replacedSql");
Statement statement = null;
try {
statement = CCJSqlParserUtil.parse(formatSql);
} catch (JSQLParserException e) {
logger.error("sql 解析报错:", e);
throw new BusinessException("SQL解析失败请确认SQL语法无误");
}
if (!(statement instanceof Select)) {
logger.error("sql 解析报错statement instanceof Select 结果为false");
throw new BusinessException("仅支持查询语句!");
}
List<String> paramNames = (List<String>) formatSqlMap.get("paramNames");
List<Object> paramValues = new ArrayList<>();
paramNames.forEach(paramName -> {
Object o = map.get(paramName);
paramValues.add(o);
});
List<Row> rows = Db.selectListBySql(formatSql, paramValues.toArray());
if (rows == null || rows.isEmpty()) {
return Collections.emptyMap();
}
res.put("queryData", rows);
return res;
}
private Map<String, Object> formatSql(String rawSql, Map<String, Object> paramMap) {
if (!StringUtils.hasLength(rawSql)) {
logger.error("sql解析报错sql为空");
throw new BusinessException("sql 不能为空!");
}
// 匹配 {{?...}} 表示可用占位符的参数
Pattern paramPattern = Pattern.compile("\\{\\{\\?([^}]+)}}");
// 匹配 {{...}} 表示直接替换的参数(非占位符)
Pattern directPattern = Pattern.compile("\\{\\{([^}?][^}]*)}}");
List<String> paramNames = new ArrayList<>();
StringBuffer sqlBuffer = new StringBuffer();
// 替换 {{?...}} -> ?
Matcher paramMatcher = paramPattern.matcher(rawSql);
while (paramMatcher.find()) {
String paramName = paramMatcher.group(1).trim();
paramNames.add(paramName);
paramMatcher.appendReplacement(sqlBuffer, "?");
}
paramMatcher.appendTail(sqlBuffer);
String intermediateSql = sqlBuffer.toString();
// 替换 {{...}} -> 实际值(用于表名/列名等)
sqlBuffer = new StringBuffer(); // 清空 buffer 重新处理
Matcher directMatcher = directPattern.matcher(intermediateSql);
while (directMatcher.find()) {
String key = directMatcher.group(1).trim();
Object value = paramMap.get(key);
if (value == null) {
logger.error("未找到参数:" + key);
throw new BusinessException("sql解析失败请确保sql语法正确");
}
String safeValue = value.toString();
directMatcher.appendReplacement(sqlBuffer, Matcher.quoteReplacement(safeValue));
}
directMatcher.appendTail(sqlBuffer);
String finalSql = sqlBuffer.toString().trim();
// 清理末尾分号与中文引号
if (finalSql.endsWith(";") || finalSql.endsWith("")) {
finalSql = finalSql.substring(0, finalSql.length() - 1);
}
finalSql = finalSql.replace("", "\"").replace("", "\"");
logger.info("Final SQL: {}", finalSql);
logger.info("Param names: {}", paramNames);
Map<String, Object> result = new HashMap<>();
result.put("replacedSql", finalSql);
result.put("paramNames", paramNames);
return result;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}

View File

@@ -1,25 +0,0 @@
package tech.easyflow.ai.node;
import com.alibaba.fastjson.JSONObject;
import com.easyagents.flow.core.node.BaseNode;
import com.easyagents.flow.core.parser.BaseNodeParser;
/**
* Sql查询节点解析
*
* @author tao
* @date 2025-05-21
*/
public class SqlNodeParser extends BaseNodeParser {
@Override
public BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) {
String sql = data.getString("sql");
return new SqlNode(sql);
}
public String getNodeName() {
return "sql-node";
}
}

View File

@@ -8,6 +8,7 @@ import org.junit.Test;
import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckResult;
import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckStage;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.ai.node.SearchDatasetNodeParser;
import tech.easyflow.ai.node.WorkflowNodeParser;
import tech.easyflow.ai.service.WorkflowService;
@@ -69,6 +70,24 @@ public class WorkflowCheckServiceTest {
assertHasCode(result, "EDGE_TARGET_NOT_FOUND");
}
@Test
public void testSaveShouldBlockSearchDatasetWithoutSql() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
JSONObject searchData = data("查询数据");
JSONObject datasetRef = new JSONObject();
datasetRef.put("sourceId", "1001");
datasetRef.put("tableId", "2001");
searchData.put("datasetRef", datasetRef);
String content = workflowJson(
array(node("search-1", "search-dataset-node", null, searchData)),
new JSONArray()
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.SAVE, null);
Assert.assertFalse(result.isPassed());
assertHasCode(result, "SEARCH_DATASET_INVALID");
}
@Test
public void testPreExecuteShouldBlockMissingStartOrEnd() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
@@ -86,6 +105,26 @@ public class WorkflowCheckServiceTest {
assertHasCode(result, "END_NODE_MISSING");
}
@Test
public void testPreExecuteShouldPassForSourceOnlySearchDatasetNode() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
String content = workflowJson(
array(
node("s1", "startNode", null, data("开始")),
searchDatasetNode("q1", null, "1001"),
node("e1", "endNode", null, data("结束"))
),
array(
edge("e1", "s1", "q1"),
edge("e2", "q1", "e1")
)
);
WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE);
Assert.assertTrue(result.isPassed());
Assert.assertEquals(0, result.getIssueCount());
}
@Test
public void testPreExecuteShouldBlockRootEntryNotStart() throws Exception {
WorkflowCheckService service = newService(new HashMap<>());
@@ -203,8 +242,10 @@ public class WorkflowCheckServiceTest {
.withDefaultParsers(true)
.build();
parser.addNodeParser("workflow-node", new WorkflowNodeParser());
parser.addNodeParser("search-dataset-node", new SearchDatasetNodeParser());
setField(service, "chainParser", parser);
setField(service, "workflowService", mockWorkflowService(workflowStore));
setField(service, "workflowDatacenterContentService", new WorkflowDatacenterContentService());
return service;
}
@@ -294,6 +335,15 @@ public class WorkflowCheckServiceTest {
return node(id, "workflow-node", parentId, data);
}
private static JSONObject searchDatasetNode(String id, String parentId, String sourceId) {
JSONObject data = data("查询数据");
JSONObject datasetRef = new JSONObject();
datasetRef.put("sourceId", sourceId);
data.put("datasetRef", datasetRef);
data.put("querySql", "SELECT 1");
return node(id, "search-dataset-node", parentId, data);
}
private static JSONObject data(String title) {
JSONObject data = new JSONObject();
data.put("title", title);

View File

@@ -0,0 +1,73 @@
package tech.easyflow.ai.node;
import org.junit.Assert;
import org.junit.Test;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import java.lang.reflect.Method;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class SearchDatasetNodeTest {
@Test
public void testResolveQuerySqlShouldUseNodeQuerySqlAndResolveTemplate() throws Exception {
DatasetRef datasetRef = new DatasetRef();
datasetRef.setSourceId(BigInteger.valueOf(1001L));
SearchDatasetNode node = new SearchDatasetNode(datasetRef, """
SELECT id, name
FROM orders_{{biz}}
WHERE name LIKE '%{{keyword}}%'
ORDER BY created_at {{direction}}
""");
Map<String, Object> params = new HashMap<>();
params.put("biz", "prod");
params.put("keyword", "vip");
params.put("direction", "DESC");
String sql = invokeResolveQuerySql(node, params);
Assert.assertEquals("""
SELECT id, name
FROM orders_prod
WHERE name LIKE '%vip%'
ORDER BY created_at DESC
""".trim(), sql);
}
@Test
public void testResolveQuerySqlShouldUseNodeQuerySqlWhenParamsDoNotContainSql() throws Exception {
DatasetRef datasetRef = new DatasetRef();
datasetRef.setSourceId(BigInteger.valueOf(2002L));
SearchDatasetNode node = new SearchDatasetNode(datasetRef, "SELECT * FROM fallback_table");
Map<String, Object> params = new HashMap<>();
String sql = invokeResolveQuerySql(node, params);
Assert.assertEquals("SELECT * FROM fallback_table", sql);
}
@Test
public void testResolveQuerySqlShouldRejectBlankSql() throws Exception {
DatasetRef datasetRef = new DatasetRef();
datasetRef.setSourceId(BigInteger.valueOf(3003L));
SearchDatasetNode node = new SearchDatasetNode(datasetRef, " ");
try {
invokeResolveQuerySql(node, new HashMap<>());
Assert.fail("expected BusinessException");
} catch (Exception e) {
Throwable cause = e.getCause() == null ? e : e.getCause();
Assert.assertTrue(cause instanceof BusinessException);
Assert.assertEquals("查询数据节点未设置 SQL", cause.getMessage());
}
}
private String invokeResolveQuerySql(SearchDatasetNode node, Map<String, Object> params) throws Exception {
Method method = SearchDatasetNode.class.getDeclaredMethod("resolveQuerySql", Map.class);
method.setAccessible(true);
return (String) method.invoke(node, params);
}
}