From 1ecc28e4982f6bd482f34a45e0e6c32c5f67d4da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=AD=90=E9=BB=98?= <925456043@qq.com> Date: Thu, 2 Apr 2026 18:56:34 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B7=A5=E4=BD=9C=E6=B5=81=E9=80=82?= =?UTF-8?q?=E9=85=8D=E6=95=B0=E6=8D=AE=E4=B8=AD=E6=9E=A2=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增查询数据与写入数据节点并移除旧数据中心节点入口 - 将查询数据节点切换为连接服务加 SQL 的执行模型 - 同步更新工作流校验、提示词上下文与设计器交互 --- .../controller/ai/WorkFlowNodeController.java | 5 +- .../controller/ai/WorkflowController.java | 5 +- .../controller/PublicWorkflowController.java | 5 +- .../controller/ai/UcWorkflowController.java | 5 +- .../ChainDefinitionRepositoryImpl.java | 5 +- .../service/TinyFlowConfigService.java | 11 +- .../service/WorkflowCheckService.java | 132 +++- .../WorkflowDatacenterContentService.java | 349 +++++++++++ .../easyflow/ai/node/SaveDatasetNode.java | 75 +++ .../ai/node/SaveDatasetNodeParser.java | 41 ++ .../ai/node/SaveToDatacenterNode.java | 75 --- .../ai/node/SaveToDatacenterNodeParser.java | 23 - .../ai/node/SearchDatacenterNode.java | 143 ----- .../ai/node/SearchDatacenterNodeParser.java | 25 - .../easyflow/ai/node/SearchDatasetNode.java | 120 ++++ .../ai/node/SearchDatasetNodeParser.java | 33 + .../java/tech/easyflow/ai/node/SqlNode.java | 149 ----- .../tech/easyflow/ai/node/SqlNodeParser.java | 25 - .../service/WorkflowCheckServiceTest.java | 50 ++ .../ai/node/SearchDatasetNodeTest.java | 73 +++ .../src/locales/langs/en-US/aiWorkflow.json | 14 + .../src/locales/langs/zh-CN/aiWorkflow.json | 14 + .../app/src/router/routes/modules/workflow.ts | 2 +- .../src/views/ai/workflow/WorkflowDesign.vue | 43 +- .../customNode/datasetNodeRenderer.ts | 573 ++++++++++++++++++ .../ai/workflow/customNode/datasetOptions.ts | 184 ++++++ .../src/views/ai/workflow/customNode/index.ts | 14 +- .../views/ai/workflow/customNode/nodeNames.ts | 5 +- .../ai/workflow/customNode/saveDataset.ts | 43 ++ .../workflow/customNode/saveToDatacenter.ts | 58 -- .../workflow/customNode/searchDatacenter.ts | 69 --- .../ai/workflow/customNode/searchDataset.ts | 44 ++ .../views/ai/workflow/customNode/sqlNode.ts | 38 -- .../src/components/base/select.svelte | 71 ++- .../src/components/core/OutputDefList.svelte | 15 +- .../components/core/RefParameterItem.svelte | 3 +- .../components/core/RefParameterList.svelte | 16 +- .../src/components/nodes/CustomNode.svelte | 9 +- .../src/components/nodes/LLMNode.svelte | 94 ++- .../packages/tinyflow-ui/src/styles/base.less | 7 +- 40 files changed, 1973 insertions(+), 692 deletions(-) create mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/easyagentsflow/service/WorkflowDatacenterContentService.java create mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNode.java create mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNodeParser.java delete mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNode.java delete mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNodeParser.java delete mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNode.java delete mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNodeParser.java create mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNode.java create mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNodeParser.java delete mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNode.java delete mode 100644 easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNodeParser.java create mode 100644 easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/node/SearchDatasetNodeTest.java create mode 100644 easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetNodeRenderer.ts create mode 100644 easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetOptions.ts create mode 100644 easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveDataset.ts delete mode 100644 easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveToDatacenter.ts delete mode 100644 easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDatacenter.ts create mode 100644 easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDataset.ts delete mode 100644 easyflow-ui-admin/app/src/views/ai/workflow/customNode/sqlNode.ts diff --git a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkFlowNodeController.java b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkFlowNodeController.java index af25cf7..f61c21a 100644 --- a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkFlowNodeController.java +++ b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkFlowNodeController.java @@ -13,6 +13,7 @@ import com.easyagents.flow.core.parser.ChainParser; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import tech.easyflow.ai.easyagentsflow.service.WorkflowDatacenterContentService; import tech.easyflow.ai.entity.Workflow; import tech.easyflow.ai.service.WorkflowService; import tech.easyflow.common.domain.Result; @@ -29,6 +30,8 @@ public class WorkFlowNodeController { private WorkflowService workflowService; @Resource private ChainParser chainParser; + @Resource + private WorkflowDatacenterContentService workflowDatacenterContentService; @GetMapping("/getChainParams") public Result getChainParams(String currentId, String workflowId) { @@ -43,7 +46,7 @@ public class WorkFlowNodeController { nodeData.put("workflowId", workflow.getId()); nodeData.put("workflowName", workflow.getTitle()); - ChainDefinition definition = chainParser.parse(workflow.getContent()); + ChainDefinition definition = chainParser.parse(workflowDatacenterContentService.prepareContent(workflow.getContent())); List nodes = definition.getNodes(); JSONArray inputs = new JSONArray(); JSONArray outputs = new JSONArray(); diff --git a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkflowController.java b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkflowController.java index 9970e71..7550d1e 100644 --- a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkflowController.java +++ b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/WorkflowController.java @@ -21,6 +21,7 @@ import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckStage; import tech.easyflow.ai.easyagentsflow.service.CodeEngineCapabilityService; import tech.easyflow.ai.easyagentsflow.service.TinyFlowService; import tech.easyflow.ai.easyagentsflow.service.WorkflowCheckService; +import tech.easyflow.ai.easyagentsflow.service.WorkflowDatacenterContentService; import tech.easyflow.ai.entity.Workflow; import tech.easyflow.ai.service.BotWorkflowService; import tech.easyflow.ai.service.ModelService; @@ -76,6 +77,8 @@ public class WorkflowController extends BaseCurdController issues, Set issueKeys) { + for (NodeView node : parsed.nodes) { + if (node == null) { + continue; + } + if (workflowDatacenterContentService.isSearchDatasetNode(node.type)) { + checkSearchDatasetNode(node, issues, issueKeys); + continue; + } + if (workflowDatacenterContentService.isSaveDatasetNode(node.type)) { + checkSaveDatasetNode(node, issues, issueKeys); + continue; + } + if (workflowDatacenterContentService.isLlmNode(node.type)) { + checkLlmQueryContext(node, parsed, issues, issueKeys); + } + } + } + + private void checkSearchDatasetNode(NodeView node, + List issues, + Set issueKeys) { + try { + workflowDatacenterContentService.requireSearchDatasetRef(node.data); + } catch (BusinessException e) { + addIssue(issues, issueKeys, "SEARCH_DATASET_INVALID", e.getMessage(), node.id, null, node.name); + } catch (Exception e) { + addIssue(issues, issueKeys, "SEARCH_DATASET_INVALID", + "查询数据节点校验失败: " + shortError(e), node.id, null, node.name); + } + } + + private void checkSaveDatasetNode(NodeView node, + List issues, + Set issueKeys) { + try { + workflowDatacenterContentService.requireSaveDatasetRef(node.data); + } catch (BusinessException e) { + addIssue(issues, issueKeys, "SAVE_DATASET_INVALID", e.getMessage(), node.id, null, node.name); + } catch (Exception e) { + addIssue(issues, issueKeys, "SAVE_DATASET_INVALID", + "写入数据节点校验失败: " + shortError(e), node.id, null, node.name); + } + } + + private void checkLlmQueryContext(NodeView node, + ParsedWorkflow parsed, + List issues, + Set issueKeys) { + if (node.data == null) { + return; + } + Object rawNodeIds = node.data.get("queryContextNodeIds"); + if (rawNodeIds == null) { + return; + } + if (!(rawNodeIds instanceof JSONArray nodeIds)) { + addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_INVALID", + "查询上下文配置无效,请重新选择查询数据节点", node.id, null, node.name); + return; + } + for (int i = 0; i < nodeIds.size(); i++) { + String refNodeId = trimToNull(nodeIds.getString(i)); + if (!StringUtils.hasText(refNodeId)) { + addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_EMPTY", + "查询上下文配置无效,请重新选择查询数据节点", node.id, null, node.name); + continue; + } + NodeView refNode = parsed.nodeMap.get(refNodeId); + if (refNode == null) { + addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_NOT_FOUND", + "查询上下文节点不存在: " + refNodeId, node.id, null, node.name); + continue; + } + if (!workflowDatacenterContentService.isSearchDatasetNode(refNode.type)) { + addIssue(issues, issueKeys, "LLM_QUERY_CONTEXT_TYPE_INVALID", + "查询上下文只能选择查询数据节点", node.id, null, node.name); + } + } + } + private void runStrictChecks(String content, ParsedWorkflow parsed, BigInteger currentWorkflowId, List issues, Set issueKeys) { if (parsed.nodes.isEmpty()) { @@ -190,7 +289,8 @@ public class WorkflowCheckService { } try { - Object definition = chainParser.parse(content); + String preparedContent = workflowDatacenterContentService.prepareContent(content); + Object definition = chainParser.parse(preparedContent); if (definition == null) { addIssue(issues, issueKeys, "PARSE_NULL", "预执行校验失败:节点配置错误,请检查", null, null, null); } @@ -606,31 +706,47 @@ public class WorkflowCheckService { result.setStage(stage); result.setIssues(issues); result.setIssueCount(issues.size()); - result.setPassed(issues.isEmpty()); + result.setPassed(issues.stream().noneMatch(issue -> LEVEL_ERROR.equalsIgnoreCase(issue.getLevel()))); return result; } private void throwIfFailed(WorkflowCheckResult result) { - if (result == null || result.isPassed()) { + if (result == null) { return; } - String summary = result.getIssues().stream() + List errorIssues = result.getIssues().stream() + .filter(issue -> LEVEL_ERROR.equalsIgnoreCase(issue.getLevel())) + .collect(Collectors.toList()); + if (errorIssues.isEmpty()) { + return; + } + String summary = errorIssues.stream() .limit(5) .map(WorkflowCheckIssue::getMessage) .collect(Collectors.joining(";")); - if (result.getIssueCount() > 5) { + if (errorIssues.size() > 5) { summary = summary + ";等"; } - throw new BusinessException("工作流校验未通过(" + result.getStage() + "),共 " + result.getIssueCount() + " 项:" + summary); + throw new BusinessException("工作流校验未通过(" + result.getStage() + "),共 " + errorIssues.size() + " 项:" + summary); } private void addIssue(List issues, Set issueKeys, String code, String message, String nodeId, String edgeId, String nodeName) { + addIssue(issues, issueKeys, code, LEVEL_ERROR, message, nodeId, edgeId, nodeName); + } + + private void addWarning(List issues, Set issueKeys, String code, + String message, String nodeId, String edgeId, String nodeName) { + addIssue(issues, issueKeys, code, LEVEL_WARNING, message, nodeId, edgeId, nodeName); + } + + private void addIssue(List issues, Set issueKeys, String code, String level, + String message, String nodeId, String edgeId, String nodeName) { String key = code + "|" + safe(nodeId) + "|" + safe(edgeId) + "|" + message; if (!issueKeys.add(key)) { return; } - issues.add(new WorkflowCheckIssue(code, LEVEL_ERROR, message, nodeId, edgeId, nodeName)); + issues.add(new WorkflowCheckIssue(code, level, message, nodeId, edgeId, nodeName)); } private String extractNodeName(JSONObject nodeJson, JSONObject data, String fallback) { diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/easyagentsflow/service/WorkflowDatacenterContentService.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/easyagentsflow/service/WorkflowDatacenterContentService.java new file mode 100644 index 0000000..221b9bd --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/easyagentsflow/service/WorkflowDatacenterContentService.java @@ -0,0 +1,349 @@ +package tech.easyflow.ai.easyagentsflow.service; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; +import tech.easyflow.common.constant.enums.EnumFieldType; +import tech.easyflow.common.web.exceptions.BusinessException; +import tech.easyflow.datacenter.entity.DatacenterTable; +import tech.easyflow.datacenter.entity.DatacenterTableField; +import tech.easyflow.datacenter.execution.model.DatacenterSchemaResponse; +import tech.easyflow.datacenter.execution.model.DatasetRef; +import tech.easyflow.datacenter.execution.service.DatacenterDatasetQueryService; +import tech.easyflow.datacenter.meta.entity.DatacenterCatalog; +import tech.easyflow.datacenter.meta.entity.DatacenterSource; +import tech.easyflow.datacenter.meta.service.DatacenterDatasetRegistryService; + +import javax.annotation.Resource; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Service +public class WorkflowDatacenterContentService { + + public static final String SEARCH_NODE_TYPE = "search-dataset-node"; + public static final String SAVE_NODE_TYPE = "save-dataset-node"; + public static final String LLM_NODE_TYPE = "llmNode"; + public static final String QUERY_DATA_CONTEXT = "queryDataContext"; + public static final String SEARCH_SOURCE_MISSING_MESSAGE = "查询数据节点未选择连接服务"; + public static final String SEARCH_SQL_MISSING_MESSAGE = "查询数据节点未设置 SQL"; + public static final String SAVE_EXPIRED_MESSAGE = "写入数据节点配置已过期,请重新选择已接入表"; + public static final String INVALID_QUERY_CONTEXT_MESSAGE = "查询上下文配置无效,请重新选择查询数据节点"; + private static final String QUERY_CONTEXT_PROMPT = """ + 你是为工作流中的查询数据节点生成只读 SQL 的生成器,你的职责是返回可直接执行的 SQL,并且你只能输出 SQL。 + + 必须严格遵守以下规则: + 1. 只能从下面给出的连接摘要中选择最合适的表。 + 2. 只能使用摘要中给出的字段名,不要虚构表名或字段名。 + 3. 只输出 SQL,不要输出解释、注释、Markdown、JSON 或多余文本。 + 4. 只能生成只读 SELECT SQL,允许 WITH、JOIN、子查询、聚合、分组、排序。 + 5. 不要生成 INSERT、UPDATE、DELETE、DDL、多语句、存储过程调用。 + 6. 优先使用逻辑表名和逻辑字段名,不要输出物理表名、JDBC、驱动信息。 + 7. 如果存在重名表,请使用 catalog.table 形式消除歧义。 + + 以下是可用的连接摘要: + """; + + @Resource + private DatacenterDatasetQueryService queryService; + @Resource + private DatacenterDatasetRegistryService registryService; + + public boolean isSearchDatasetNode(String nodeType) { + return SEARCH_NODE_TYPE.equals(nodeType); + } + + public boolean isSaveDatasetNode(String nodeType) { + return SAVE_NODE_TYPE.equals(nodeType); + } + + public boolean isLlmNode(String nodeType) { + return LLM_NODE_TYPE.equals(nodeType); + } + + public String prepareContent(String content) { + if (!StringUtils.hasText(content)) { + throw new BusinessException("工作流内容不能为空"); + } + Object parsed; + try { + parsed = JSON.parse(content); + } catch (Exception e) { + throw new BusinessException("工作流内容不是合法JSON: " + shortError(e)); + } + if (!(parsed instanceof JSONObject root)) { + throw new BusinessException("工作流内容必须是JSON对象"); + } + JSONObject copy = JSON.parseObject(root.toJSONString()); + prepareRoot(copy); + return copy.toJSONString(); + } + + public JSONObject prepareRoot(JSONObject root) { + if (root == null) { + throw new BusinessException("工作流内容不能为空"); + } + JSONArray nodes = root.getJSONArray("nodes"); + if (nodes == null || nodes.isEmpty()) { + return root; + } + Map nodeMap = buildNodeMap(nodes); + for (int i = 0; i < nodes.size(); i++) { + JSONObject node = nodes.getJSONObject(i); + if (node == null) { + continue; + } + String nodeType = node.getString("type"); + JSONObject data = node.getJSONObject("data"); + if (isSearchDatasetNode(nodeType)) { + requireSearchDatasetRef(data); + } else if (isSaveDatasetNode(nodeType)) { + requireSaveDatasetRef(data); + } + } + for (int i = 0; i < nodes.size(); i++) { + JSONObject node = nodes.getJSONObject(i); + if (node == null || !isLlmNode(node.getString("type"))) { + continue; + } + injectQueryDataContext(node.getJSONObject("data"), nodeMap); + } + return root; + } + + public boolean hasSaveLegacyFields(JSONObject data) { + return hasAnyField(data, "tableId", "sourceId", "catalogId", "versionId"); + } + + public DatasetRef readDatasetRef(JSONObject data) { + return data == null ? null : data.getObject("datasetRef", DatasetRef.class); + } + + public DatasetRef requireSearchDatasetRef(JSONObject data) { + DatasetRef datasetRef = readDatasetRef(data); + if (datasetRef == null || datasetRef.getSourceId() == null) { + throw new BusinessException(SEARCH_SOURCE_MISSING_MESSAGE); + } + String querySql = data == null ? null : trimToNull(data.getString("querySql")); + if (!StringUtils.hasText(querySql)) { + throw new BusinessException(SEARCH_SQL_MISSING_MESSAGE); + } + return datasetRef; + } + + public DatasetRef requireSaveDatasetRef(JSONObject data) { + if (hasSaveLegacyFields(data)) { + throw new BusinessException(SAVE_EXPIRED_MESSAGE); + } + DatasetRef datasetRef = readDatasetRef(data); + if (datasetRef == null || datasetRef.getTableId() == null) { + throw new BusinessException(SAVE_EXPIRED_MESSAGE); + } + return datasetRef; + } + + public DatacenterSchemaResponse loadSchema(DatasetRef datasetRef, Map schemaCache) { + if (datasetRef == null || datasetRef.getTableId() == null) { + throw new BusinessException("缺少已接入表配置"); + } + BigInteger tableId = datasetRef.getTableId(); + if (schemaCache != null && schemaCache.containsKey(tableId)) { + return schemaCache.get(tableId); + } + DatacenterSchemaResponse schema = queryService.getSchema(copyDatasetRef(datasetRef)); + if (schemaCache != null) { + schemaCache.put(tableId, schema); + } + return schema; + } + + public JSONObject buildSourceSummary(DatasetRef datasetRef) { + if (datasetRef == null || datasetRef.getSourceId() == null) { + throw new BusinessException(SEARCH_SOURCE_MISSING_MESSAGE); + } + DatacenterSource source = registryService.getSourceRequired(datasetRef.getSourceId()); + List managedTables = registryService.listManagedTables(datasetRef.getSourceId(), datasetRef.getCatalogId()); + managedTables.sort(Comparator.comparing(table -> table.getTableName() == null ? "" : table.getTableName())); + JSONObject sourceSummary = new JSONObject(); + sourceSummary.put("sourceName", source.getSourceName()); + sourceSummary.put("sourceType", source.getSourceType()); + JSONArray tables = new JSONArray(); + for (DatacenterTable table : managedTables) { + DatacenterTable fullTable = registryService.getTableWithFields(table.getId()); + DatacenterCatalog catalog = registryService.getCatalogById(fullTable.getCatalogId()); + if (StringUtils.hasText(datasetRef.getCatalogName()) + && (catalog == null || !datasetRef.getCatalogName().equals(catalog.getCatalogName()))) { + continue; + } + JSONObject tableSummary = new JSONObject(); + tableSummary.put("catalogName", catalog == null ? null : catalog.getCatalogName()); + tableSummary.put("tableName", fullTable.getTableName()); + tableSummary.put("tableDesc", fullTable.getTableDesc()); + JSONArray fields = new JSONArray(); + if (fullTable.getFields() != null) { + for (DatacenterTableField field : fullTable.getFields()) { + JSONObject fieldSummary = new JSONObject(); + fieldSummary.put("fieldName", field.getFieldName()); + fieldSummary.put("fieldDesc", field.getFieldDesc()); + fieldSummary.put("fieldType", resolveFieldType(field)); + fields.add(fieldSummary); + } + } + tableSummary.put("fields", fields); + tables.add(tableSummary); + } + sourceSummary.put("tables", tables); + return sourceSummary; + } + + private void injectQueryDataContext(JSONObject data, Map nodeMap) { + if (data == null) { + return; + } + JSONArray nodeIds = data.getJSONArray("queryContextNodeIds"); + if (nodeIds == null || nodeIds.isEmpty()) { + removeQueryDataContextParameter(data); + return; + } + Map sourceSummaries = new LinkedHashMap<>(); + Set visitedNodeIds = new LinkedHashSet<>(); + for (int i = 0; i < nodeIds.size(); i++) { + String nodeId = trimToNull(nodeIds.getString(i)); + if (!StringUtils.hasText(nodeId) || !visitedNodeIds.add(nodeId)) { + continue; + } + JSONObject targetNode = nodeMap.get(nodeId); + if (targetNode == null || !isSearchDatasetNode(targetNode.getString("type"))) { + throw new BusinessException(INVALID_QUERY_CONTEXT_MESSAGE); + } + DatasetRef datasetRef = requireSearchDatasetRef(targetNode.getJSONObject("data")); + sourceSummaries.putIfAbsent(datasetRef.getSourceId(), buildSourceSummary(datasetRef)); + } + String contextValue = QUERY_CONTEXT_PROMPT + "\n" + JSON.toJSONString(new ArrayList<>(sourceSummaries.values())); + upsertQueryDataContextParameter(data, contextValue); + } + + private String resolveFieldType(DatacenterTableField field) { + if (field == null) { + return null; + } + if (StringUtils.hasText(field.getJdbcType())) { + return field.getJdbcType(); + } + try { + EnumFieldType enumFieldType = EnumFieldType.getByCode(field.getFieldType()); + if (enumFieldType != null) { + return enumFieldType.getText(); + } + } catch (Exception ignored) { + } + return field.getFieldType() == null ? null : String.valueOf(field.getFieldType()); + } + + private void upsertQueryDataContextParameter(JSONObject data, String contextValue) { + JSONArray parameters = data.getJSONArray("parameters"); + if (parameters == null) { + parameters = new JSONArray(); + data.put("parameters", parameters); + } + for (int i = parameters.size() - 1; i >= 0; i--) { + JSONObject parameter = parameters.getJSONObject(i); + if (parameter != null && QUERY_DATA_CONTEXT.equals(parameter.getString("name"))) { + parameters.remove(i); + } + } + JSONObject parameter = new JSONObject(); + parameter.put("id", QUERY_DATA_CONTEXT); + parameter.put("name", QUERY_DATA_CONTEXT); + parameter.put("title", "查询上下文"); + parameter.put("description", "数据查询规则与连接表摘要"); + parameter.put("dataType", "String"); + parameter.put("refType", "fixed"); + parameter.put("value", contextValue); + parameter.put("required", false); + parameter.put("nameDisabled", true); + parameter.put("dataTypeDisabled", true); + parameter.put("deleteDisabled", true); + parameters.add(parameter); + } + + private void removeQueryDataContextParameter(JSONObject data) { + if (data == null) { + return; + } + JSONArray parameters = data.getJSONArray("parameters"); + if (parameters == null || parameters.isEmpty()) { + return; + } + for (int i = parameters.size() - 1; i >= 0; i--) { + JSONObject parameter = parameters.getJSONObject(i); + if (parameter != null && QUERY_DATA_CONTEXT.equals(parameter.getString("name"))) { + parameters.remove(i); + } + } + } + + private Map buildNodeMap(JSONArray nodes) { + Map nodeMap = new LinkedHashMap<>(); + for (int i = 0; i < nodes.size(); i++) { + JSONObject node = nodes.getJSONObject(i); + if (node == null) { + continue; + } + String nodeId = trimToNull(node.getString("id")); + if (StringUtils.hasText(nodeId)) { + nodeMap.put(nodeId, node); + } + } + return nodeMap; + } + + private DatasetRef copyDatasetRef(DatasetRef datasetRef) { + DatasetRef copy = new DatasetRef(); + copy.setSourceId(datasetRef.getSourceId()); + copy.setCatalogId(datasetRef.getCatalogId()); + copy.setCatalogName(datasetRef.getCatalogName()); + copy.setTableId(datasetRef.getTableId()); + copy.setTableName(datasetRef.getTableName()); + copy.setVersionId(datasetRef.getVersionId()); + return copy; + } + + private boolean hasAnyField(JSONObject data, String... fieldNames) { + if (data == null || fieldNames == null) { + return false; + } + for (String fieldName : fieldNames) { + if (data.containsKey(fieldName)) { + return true; + } + } + return false; + } + + private String trimToNull(String value) { + if (!StringUtils.hasText(value)) { + return null; + } + return value.trim(); + } + + private String shortError(Throwable throwable) { + if (throwable == null) { + return "unknown"; + } + String message = throwable.getMessage(); + if (!StringUtils.hasText(message)) { + return throwable.getClass().getSimpleName(); + } + return message.length() > 120 ? message.substring(0, 120) + "..." : message; + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNode.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNode.java new file mode 100644 index 0000000..39ec518 --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNode.java @@ -0,0 +1,75 @@ +package tech.easyflow.ai.node; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.easyagents.flow.core.chain.Chain; +import com.easyagents.flow.core.node.BaseNode; +import com.mybatisflex.core.tenant.TenantManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.easyflow.ai.utils.WorkFlowUtil; +import tech.easyflow.common.entity.LoginAccount; +import tech.easyflow.common.util.SpringContextUtil; +import tech.easyflow.datacenter.execution.model.DatasetRef; +import tech.easyflow.datacenter.execution.service.DatacenterDatasetQueryService; +import tech.easyflow.datacenter.execution.service.DatacenterDatasetWriteService; + +import java.util.HashMap; +import java.util.Map; + +public class SaveDatasetNode extends BaseNode { + + private static final Logger log = LoggerFactory.getLogger(SaveDatasetNode.class); + + private DatasetRef datasetRef; + + public SaveDatasetNode() { + } + + public SaveDatasetNode(DatasetRef datasetRef) { + this.datasetRef = datasetRef; + } + + @Override + public Map execute(Chain chain) { + Map state = chain.getState().resolveParameters(this); + JSONObject payload = new JSONObject(state); + JSONArray saveList = payload.getJSONArray("saveList"); + if (saveList == null || saveList.isEmpty()) { + throw new RuntimeException("saveList 不能为空"); + } + LoginAccount account = WorkFlowUtil.getOperator(chain); + DatacenterDatasetWriteService writeService = SpringContextUtil.getBean(DatacenterDatasetWriteService.class); + DatacenterDatasetQueryService queryService = SpringContextUtil.getBean(DatacenterDatasetQueryService.class); + int successRows = 0; + try { + TenantManager.ignoreTenantCondition(); + for (Object item : saveList) { + JSONObject row = item instanceof JSONObject json ? json : JSONObject.from(item); + writeService.saveRow(datasetRef, row, account); + successRows++; + } + var schema = queryService.getSchema(datasetRef); + Map result = new HashMap<>(); + result.put("successRows", successRows); + result.put("source", schema.getSource()); + result.put("catalog", schema.getCatalog()); + result.put("table", schema.getTable()); + result.put("version", datasetRef.getVersionId()); + return result; + } catch (Exception ex) { + log.error("工作流保存数据到统一数据集失败,datasetRef={}", datasetRef, ex); + throw ex; + } finally { + TenantManager.restoreTenantCondition(); + } + } + + public DatasetRef getDatasetRef() { + return datasetRef; + } + + public void setDatasetRef(DatasetRef datasetRef) { + this.datasetRef = datasetRef; + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNodeParser.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNodeParser.java new file mode 100644 index 0000000..24b631a --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveDatasetNodeParser.java @@ -0,0 +1,41 @@ +package tech.easyflow.ai.node; + +import com.alibaba.fastjson.JSONObject; +import com.easyagents.flow.core.node.BaseNode; +import com.easyagents.flow.core.parser.BaseNodeParser; +import tech.easyflow.common.web.exceptions.BusinessException; +import tech.easyflow.datacenter.execution.model.DatasetRef; + +public class SaveDatasetNodeParser extends BaseNodeParser { + + private static final String EXPIRED_MESSAGE = "写入数据节点配置已过期,请重新选择已接入表"; + + @Override + protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) { + if (data == null) { + throw new BusinessException(EXPIRED_MESSAGE); + } + if (hasLegacyFields(data)) { + throw new BusinessException(EXPIRED_MESSAGE); + } + DatasetRef datasetRef = data.getObject("datasetRef", DatasetRef.class); + if (datasetRef == null || datasetRef.getTableId() == null) { + throw new BusinessException(EXPIRED_MESSAGE); + } + return new SaveDatasetNode(datasetRef); + } + + public String getNodeName() { + return "save-dataset-node"; + } + + private boolean hasLegacyFields(JSONObject data) { + if (data == null) { + return false; + } + return data.containsKey("tableId") + || data.containsKey("sourceId") + || data.containsKey("catalogId") + || data.containsKey("versionId"); + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNode.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNode.java deleted file mode 100644 index 897f742..0000000 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNode.java +++ /dev/null @@ -1,75 +0,0 @@ -package tech.easyflow.ai.node; - -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.mybatisflex.core.tenant.TenantManager; -import com.easyagents.flow.core.chain.Chain; -import com.easyagents.flow.core.node.BaseNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import tech.easyflow.ai.utils.WorkFlowUtil; -import tech.easyflow.common.entity.LoginAccount; -import tech.easyflow.common.util.SpringContextUtil; -import tech.easyflow.datacenter.service.DatacenterTableService; - -import java.math.BigInteger; -import java.util.HashMap; -import java.util.Map; - -public class SaveToDatacenterNode extends BaseNode { - - private static final Logger log = LoggerFactory.getLogger(SaveToDatacenterNode.class); - - private BigInteger tableId; - - public SaveToDatacenterNode() { - } - - public SaveToDatacenterNode(BigInteger tableId) { - this.tableId = tableId; - } - - @Override - public Map execute(Chain chain) { - - Map map = chain.getState().resolveParameters(this); - JSONObject json = new JSONObject(map); - - Map res = new HashMap<>(); - - // 默认为未知来源 - LoginAccount account = WorkFlowUtil.getOperator(chain); - - DatacenterTableService service = SpringContextUtil.getBean(DatacenterTableService.class); - - JSONArray saveList = json.getJSONArray("saveList"); - - int successRows = 0; - for (Object object : saveList) { - JSONObject obj = new JSONObject((com.alibaba.fastjson.JSONObject) object); - obj.put("table_id", tableId); - try { - TenantManager.ignoreTenantCondition(); - service.saveValue(tableId, obj, account); - } catch (Exception e) { - log.error("工作流保存数据到数据中枢失败,表ID:{},具体值:{}", tableId, obj, e); - throw e; - } finally { - TenantManager.restoreTenantCondition(); - } - successRows++; - } - - res.put("successRows", successRows); - return res; - } - - public BigInteger getTableId() { - return tableId; - } - - public void setTableId(BigInteger tableId) { - this.tableId = tableId; - } - -} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNodeParser.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNodeParser.java deleted file mode 100644 index ad6407d..0000000 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SaveToDatacenterNodeParser.java +++ /dev/null @@ -1,23 +0,0 @@ -package tech.easyflow.ai.node; - -import com.alibaba.fastjson.JSONObject; -import com.easyagents.flow.core.node.BaseNode; -import com.easyagents.flow.core.parser.BaseNodeParser; - -import java.math.BigInteger; - -public class SaveToDatacenterNodeParser extends BaseNodeParser { - - @Override - protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) { - BigInteger tableId = data.getBigInteger("tableId"); - if (tableId == null) { - throw new RuntimeException("请选择数据表"); - } - return new SaveToDatacenterNode(tableId); - } - - public String getNodeName() { - return "save-to-datacenter-node"; - } -} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNode.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNode.java deleted file mode 100644 index 840683a..0000000 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNode.java +++ /dev/null @@ -1,143 +0,0 @@ -package tech.easyflow.ai.node; - -import com.easyagents.core.util.StringUtil; -import com.mybatisflex.core.paginate.Page; -import com.mybatisflex.core.row.Row; -import com.mybatisflex.core.tenant.TenantManager; -import com.easyagents.flow.core.chain.Chain; -import com.easyagents.flow.core.chain.Parameter; -import com.easyagents.flow.core.node.BaseNode; -import net.sf.jsqlparser.expression.Expression; -import net.sf.jsqlparser.parser.CCJSqlParserUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import tech.easyflow.common.entity.DatacenterQuery; -import tech.easyflow.common.util.SpringContextUtil; -import tech.easyflow.datacenter.entity.DatacenterTableField; -import tech.easyflow.datacenter.service.DatacenterTableService; -import tech.easyflow.datacenter.utils.WhereConditionSecurityChecker; - -import java.math.BigInteger; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -public class SearchDatacenterNode extends BaseNode { - - private static final Logger log = LoggerFactory.getLogger(SearchDatacenterNode.class); - private BigInteger tableId; - private String where; - private Long limit; - - public SearchDatacenterNode() { - } - - public SearchDatacenterNode(BigInteger tableId, String where, Long limit) { - this.tableId = tableId; - this.where = where; - this.limit = limit; - } - - @Override - public Map execute(Chain chain) { - - Map map = chain.getState().resolveParameters(this); - Map res = new HashMap<>(); - long limitNum = 10; - if (limit != null) { - limitNum = Long.parseLong(limit.toString()); - } - - DatacenterTableService service = SpringContextUtil.getBean(DatacenterTableService.class); - - DatacenterQuery condition = new DatacenterQuery(); - condition.setTableId(tableId); - condition.setPageNumber(1L); - condition.setPageSize(limitNum); - // 组合查询条件 - if (where != null) { - setCondition(where, condition, map); - } - try { - TenantManager.ignoreTenantCondition(); - Page pageData = service.getPageData(condition); - - String key = "rows"; - List outputDefs = getOutputDefs(); - if (outputDefs != null && !outputDefs.isEmpty()) { - String defName = outputDefs.get(0).getName(); - if (StringUtil.hasText(defName)) key = defName; - } - res.put(key, pageData.getRecords()); - } finally { - TenantManager.restoreTenantCondition(); - } - return res; - } - - public BigInteger getTableId() { - return tableId; - } - - public void setTableId(BigInteger tableId) { - this.tableId = tableId; - } - - public String getWhere() { - return where; - } - - public void setWhere(String where) { - this.where = where; - } - - public Long getLimit() { - return limit; - } - - public void setLimit(Long limit) { - this.limit = limit; - } - - private void setCondition(String where, DatacenterQuery condition, Map params) { - // 条件封装 - Pattern pattern = Pattern.compile("\\{\\{(.+?)\\}\\}"); - Matcher matcher = pattern.matcher(where); - - StringBuffer result = new StringBuffer(); - while (matcher.find()) { - String key = matcher.group(1); - Object value = params.get(key); - if (value == null) { - throw new RuntimeException("参数" + key + "不存在"); - } - String replacement = value.toString(); - matcher.appendReplacement(result, "'" + replacement + "'"); - } - matcher.appendTail(result); - - try { - Expression expression = CCJSqlParserUtil.parseCondExpression(result.toString()); - if (expression != null) { - WhereConditionSecurityChecker checker = new WhereConditionSecurityChecker(); - DatacenterTableService service = SpringContextUtil.getBean(DatacenterTableService.class); - List fields = service.getFields(tableId); - Set columns = fields.stream().map(DatacenterTableField::getFieldName).collect(Collectors.toSet()); - columns.add("id"); - columns.add("created"); - columns.add("modified"); - columns.add("created_by"); - columns.add("modified_by"); - checker.checkConditionSafety(expression, columns); - condition.setWhere(expression.toString()); - } - } catch (Exception e) { - log.error("WHERE SQL解析错误:", e); - throw new RuntimeException(e); - } - } -} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNodeParser.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNodeParser.java deleted file mode 100644 index fb2fa70..0000000 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatacenterNodeParser.java +++ /dev/null @@ -1,25 +0,0 @@ -package tech.easyflow.ai.node; - -import com.alibaba.fastjson.JSONObject; -import com.easyagents.flow.core.node.BaseNode; -import com.easyagents.flow.core.parser.BaseNodeParser; - -import java.math.BigInteger; - -public class SearchDatacenterNodeParser extends BaseNodeParser { - - @Override - protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) { - BigInteger tableId = data.getBigInteger("tableId"); - String where = data.getString("where"); - Long limit = data.getLong("limit"); - if (tableId == null) { - throw new RuntimeException("请选择数据表"); - } - return new SearchDatacenterNode(tableId,where,limit); - } - - public String getNodeName() { - return "search-datacenter-node"; - } -} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNode.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNode.java new file mode 100644 index 0000000..96747de --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNode.java @@ -0,0 +1,120 @@ +package tech.easyflow.ai.node; + +import com.easyagents.core.util.StringUtil; +import com.easyagents.flow.core.chain.Chain; +import com.easyagents.flow.core.chain.Parameter; +import com.easyagents.flow.core.node.BaseNode; +import com.mybatisflex.core.row.Row; +import com.mybatisflex.core.tenant.TenantManager; +import tech.easyflow.common.util.SpringContextUtil; +import tech.easyflow.common.web.exceptions.BusinessException; +import tech.easyflow.datacenter.execution.model.DatacenterSqlQueryRequest; +import tech.easyflow.datacenter.execution.model.DatasetRef; +import tech.easyflow.datacenter.execution.service.DatacenterDatasetQueryService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SearchDatasetNode extends BaseNode { + + private static final Pattern PARAM_PATTERN = Pattern.compile("\\{\\{(.+?)\\}\\}"); + + private DatasetRef datasetRef; + private String querySql; + + public SearchDatasetNode() { + } + + public SearchDatasetNode(DatasetRef datasetRef) { + this.datasetRef = datasetRef; + } + + public SearchDatasetNode(DatasetRef datasetRef, String querySql) { + this.datasetRef = datasetRef; + this.querySql = querySql; + } + + @Override + public Map execute(Chain chain) { + Map params = chain.getState().resolveParameters(this); + DatacenterDatasetQueryService queryService = SpringContextUtil.getBean(DatacenterDatasetQueryService.class); + DatacenterSqlQueryRequest request = buildRuntimeRequest(params); + Map result = new HashMap<>(); + try { + TenantManager.ignoreTenantCondition(); + List rows = queryService.queryBySql(request); + result.put(resolveOutputKey("data"), rows); + return result; + } finally { + TenantManager.restoreTenantCondition(); + } + } + + private DatacenterSqlQueryRequest buildRuntimeRequest(Map params) { + DatacenterSqlQueryRequest request = new DatacenterSqlQueryRequest(); + request.setDatasetRef(copyDatasetRef()); + request.setSql(resolveQuerySql(params)); + return request; + } + + private String resolveQuerySql(Map params) { + String sql = resolveTemplateString(querySql, params); + if (!StringUtil.hasText(sql)) { + throw new BusinessException("查询数据节点未设置 SQL"); + } + return sql.trim(); + } + + private DatasetRef copyDatasetRef() { + DatasetRef copy = new DatasetRef(); + copy.setSourceId(datasetRef == null ? null : datasetRef.getSourceId()); + copy.setCatalogId(datasetRef == null ? null : datasetRef.getCatalogId()); + copy.setCatalogName(datasetRef == null ? null : datasetRef.getCatalogName()); + copy.setTableId(null); + copy.setTableName(null); + copy.setVersionId(null); + return copy; + } + + private String resolveTemplateString(String text, Map params) { + if (!StringUtil.hasText(text) || params == null || params.isEmpty()) { + return text; + } + Matcher matcher = PARAM_PATTERN.matcher(text); + StringBuffer buffer = new StringBuffer(); + while (matcher.find()) { + Object replacement = params.get(matcher.group(1)); + matcher.appendReplacement(buffer, replacement == null ? "" : Matcher.quoteReplacement(String.valueOf(replacement))); + } + matcher.appendTail(buffer); + return buffer.toString(); + } + + private String resolveOutputKey(String defaultName) { + List outputDefs = getOutputDefs(); + if (outputDefs == null || outputDefs.isEmpty()) { + return defaultName; + } + String name = outputDefs.get(0).getName(); + return StringUtil.hasText(name) ? name : defaultName; + } + + public DatasetRef getDatasetRef() { + return datasetRef; + } + + public void setDatasetRef(DatasetRef datasetRef) { + this.datasetRef = datasetRef; + } + + public String getQuerySql() { + return querySql; + } + + public void setQuerySql(String querySql) { + this.querySql = querySql; + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNodeParser.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNodeParser.java new file mode 100644 index 0000000..879353d --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SearchDatasetNodeParser.java @@ -0,0 +1,33 @@ +package tech.easyflow.ai.node; + +import com.alibaba.fastjson.JSONObject; +import com.easyagents.flow.core.node.BaseNode; +import com.easyagents.flow.core.parser.BaseNodeParser; +import tech.easyflow.common.web.exceptions.BusinessException; +import tech.easyflow.datacenter.execution.model.DatasetRef; + +public class SearchDatasetNodeParser extends BaseNodeParser { + + private static final String SOURCE_MISSING_MESSAGE = "查询数据节点未选择连接服务"; + private static final String SQL_MISSING_MESSAGE = "查询数据节点未设置 SQL"; + + @Override + protected BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) { + if (data == null) { + throw new BusinessException(SOURCE_MISSING_MESSAGE); + } + DatasetRef datasetRef = data.getObject("datasetRef", DatasetRef.class); + if (datasetRef == null || datasetRef.getSourceId() == null) { + throw new BusinessException(SOURCE_MISSING_MESSAGE); + } + String querySql = data.getString("querySql"); + if (querySql == null || querySql.isBlank()) { + throw new BusinessException(SQL_MISSING_MESSAGE); + } + return new SearchDatasetNode(datasetRef, querySql); + } + + public String getNodeName() { + return "search-dataset-node"; + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNode.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNode.java deleted file mode 100644 index d9e2474..0000000 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNode.java +++ /dev/null @@ -1,149 +0,0 @@ -package tech.easyflow.ai.node; - -import com.mybatisflex.core.row.Db; -import com.mybatisflex.core.row.Row; -import com.easyagents.flow.core.chain.Chain; -import com.easyagents.flow.core.node.BaseNode; -import net.sf.jsqlparser.JSQLParserException; -import net.sf.jsqlparser.parser.CCJSqlParserUtil; -import net.sf.jsqlparser.statement.Statement; -import net.sf.jsqlparser.statement.select.Select; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.StringUtils; -import tech.easyflow.common.web.exceptions.BusinessException; - -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - - -/** - * SQL查询节点 - * - * @author tao - * @date 2025-05-21 - */ -public class SqlNode extends BaseNode { - - private String sql; - - private static final Logger logger = LoggerFactory.getLogger(SqlNode.class); - - public SqlNode() { - } - - public SqlNode(String sql) { - this.sql = sql; - } - - @Override - public Map execute(Chain chain) { - - Map map = chain.getState().resolveParameters(this); - Map res = new HashMap<>(); - - - Map formatSqlMap = formatSql(sql, map); - String formatSql = (String) formatSqlMap.get("replacedSql"); - - Statement statement = null; - try { - statement = CCJSqlParserUtil.parse(formatSql); - - } catch (JSQLParserException e) { - logger.error("sql 解析报错:", e); - throw new BusinessException("SQL解析失败,请确认SQL语法无误"); - } - - if (!(statement instanceof Select)) { - logger.error("sql 解析报错:statement instanceof Select 结果为false"); - throw new BusinessException("仅支持查询语句!"); - } - - List paramNames = (List) formatSqlMap.get("paramNames"); - - List paramValues = new ArrayList<>(); - paramNames.forEach(paramName -> { - Object o = map.get(paramName); - paramValues.add(o); - }); - - List rows = Db.selectListBySql(formatSql, paramValues.toArray()); - - if (rows == null || rows.isEmpty()) { - return Collections.emptyMap(); - } - - res.put("queryData", rows); - return res; - } - - private Map formatSql(String rawSql, Map paramMap) { - - if (!StringUtils.hasLength(rawSql)) { - logger.error("sql解析报错:sql为空"); - throw new BusinessException("sql 不能为空!"); - } - - // 匹配 {{?...}} 表示可用占位符的参数 - Pattern paramPattern = Pattern.compile("\\{\\{\\?([^}]+)}}"); - - // 匹配 {{...}} 表示直接替换的参数(非占位符) - Pattern directPattern = Pattern.compile("\\{\\{([^}?][^}]*)}}"); - - List paramNames = new ArrayList<>(); - StringBuffer sqlBuffer = new StringBuffer(); - - // 替换 {{?...}} -> ? - Matcher paramMatcher = paramPattern.matcher(rawSql); - while (paramMatcher.find()) { - String paramName = paramMatcher.group(1).trim(); - paramNames.add(paramName); - paramMatcher.appendReplacement(sqlBuffer, "?"); - } - paramMatcher.appendTail(sqlBuffer); - String intermediateSql = sqlBuffer.toString(); - - // 替换 {{...}} -> 实际值(用于表名/列名等) - sqlBuffer = new StringBuffer(); // 清空 buffer 重新处理 - Matcher directMatcher = directPattern.matcher(intermediateSql); - while (directMatcher.find()) { - String key = directMatcher.group(1).trim(); - Object value = paramMap.get(key); - if (value == null) { - logger.error("未找到参数:" + key); - throw new BusinessException("sql解析失败,请确保sql语法正确!"); - } - - String safeValue = value.toString(); - - directMatcher.appendReplacement(sqlBuffer, Matcher.quoteReplacement(safeValue)); - } - directMatcher.appendTail(sqlBuffer); - - String finalSql = sqlBuffer.toString().trim(); - - // 清理末尾分号与中文引号 - if (finalSql.endsWith(";") || finalSql.endsWith(";")) { - finalSql = finalSql.substring(0, finalSql.length() - 1); - } - finalSql = finalSql.replace("“", "\"").replace("”", "\""); - - logger.info("Final SQL: {}", finalSql); - logger.info("Param names: {}", paramNames); - - Map result = new HashMap<>(); - result.put("replacedSql", finalSql); - result.put("paramNames", paramNames); - return result; - } - - public String getSql() { - return sql; - } - - public void setSql(String sql) { - this.sql = sql; - } -} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNodeParser.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNodeParser.java deleted file mode 100644 index 9b2dfcf..0000000 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/node/SqlNodeParser.java +++ /dev/null @@ -1,25 +0,0 @@ -package tech.easyflow.ai.node; - -import com.alibaba.fastjson.JSONObject; -import com.easyagents.flow.core.node.BaseNode; -import com.easyagents.flow.core.parser.BaseNodeParser; - -/** - * Sql查询节点解析 - * - * @author tao - * @date 2025-05-21 - */ -public class SqlNodeParser extends BaseNodeParser { - - - @Override - public BaseNode doParse(JSONObject root, JSONObject data, JSONObject tinyflow) { - String sql = data.getString("sql"); - return new SqlNode(sql); - } - - public String getNodeName() { - return "sql-node"; - } -} diff --git a/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/easyagentsflow/service/WorkflowCheckServiceTest.java b/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/easyagentsflow/service/WorkflowCheckServiceTest.java index e824b25..b1f6882 100644 --- a/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/easyagentsflow/service/WorkflowCheckServiceTest.java +++ b/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/easyagentsflow/service/WorkflowCheckServiceTest.java @@ -8,6 +8,7 @@ import org.junit.Test; import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckResult; import tech.easyflow.ai.easyagentsflow.entity.WorkflowCheckStage; import tech.easyflow.ai.entity.Workflow; +import tech.easyflow.ai.node.SearchDatasetNodeParser; import tech.easyflow.ai.node.WorkflowNodeParser; import tech.easyflow.ai.service.WorkflowService; @@ -69,6 +70,24 @@ public class WorkflowCheckServiceTest { assertHasCode(result, "EDGE_TARGET_NOT_FOUND"); } + @Test + public void testSaveShouldBlockSearchDatasetWithoutSql() throws Exception { + WorkflowCheckService service = newService(new HashMap<>()); + JSONObject searchData = data("查询数据"); + JSONObject datasetRef = new JSONObject(); + datasetRef.put("sourceId", "1001"); + datasetRef.put("tableId", "2001"); + searchData.put("datasetRef", datasetRef); + String content = workflowJson( + array(node("search-1", "search-dataset-node", null, searchData)), + new JSONArray() + ); + + WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.SAVE, null); + Assert.assertFalse(result.isPassed()); + assertHasCode(result, "SEARCH_DATASET_INVALID"); + } + @Test public void testPreExecuteShouldBlockMissingStartOrEnd() throws Exception { WorkflowCheckService service = newService(new HashMap<>()); @@ -86,6 +105,26 @@ public class WorkflowCheckServiceTest { assertHasCode(result, "END_NODE_MISSING"); } + @Test + public void testPreExecuteShouldPassForSourceOnlySearchDatasetNode() throws Exception { + WorkflowCheckService service = newService(new HashMap<>()); + String content = workflowJson( + array( + node("s1", "startNode", null, data("开始")), + searchDatasetNode("q1", null, "1001"), + node("e1", "endNode", null, data("结束")) + ), + array( + edge("e1", "s1", "q1"), + edge("e2", "q1", "e1") + ) + ); + + WorkflowCheckResult result = service.checkContent(content, WorkflowCheckStage.PRE_EXECUTE, BigInteger.ONE); + Assert.assertTrue(result.isPassed()); + Assert.assertEquals(0, result.getIssueCount()); + } + @Test public void testPreExecuteShouldBlockRootEntryNotStart() throws Exception { WorkflowCheckService service = newService(new HashMap<>()); @@ -203,8 +242,10 @@ public class WorkflowCheckServiceTest { .withDefaultParsers(true) .build(); parser.addNodeParser("workflow-node", new WorkflowNodeParser()); + parser.addNodeParser("search-dataset-node", new SearchDatasetNodeParser()); setField(service, "chainParser", parser); setField(service, "workflowService", mockWorkflowService(workflowStore)); + setField(service, "workflowDatacenterContentService", new WorkflowDatacenterContentService()); return service; } @@ -294,6 +335,15 @@ public class WorkflowCheckServiceTest { return node(id, "workflow-node", parentId, data); } + private static JSONObject searchDatasetNode(String id, String parentId, String sourceId) { + JSONObject data = data("查询数据"); + JSONObject datasetRef = new JSONObject(); + datasetRef.put("sourceId", sourceId); + data.put("datasetRef", datasetRef); + data.put("querySql", "SELECT 1"); + return node(id, "search-dataset-node", parentId, data); + } + private static JSONObject data(String title) { JSONObject data = new JSONObject(); data.put("title", title); diff --git a/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/node/SearchDatasetNodeTest.java b/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/node/SearchDatasetNodeTest.java new file mode 100644 index 0000000..7460f67 --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/node/SearchDatasetNodeTest.java @@ -0,0 +1,73 @@ +package tech.easyflow.ai.node; + +import org.junit.Assert; +import org.junit.Test; +import tech.easyflow.common.web.exceptions.BusinessException; +import tech.easyflow.datacenter.execution.model.DatasetRef; + +import java.lang.reflect.Method; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; + +public class SearchDatasetNodeTest { + + @Test + public void testResolveQuerySqlShouldUseNodeQuerySqlAndResolveTemplate() throws Exception { + DatasetRef datasetRef = new DatasetRef(); + datasetRef.setSourceId(BigInteger.valueOf(1001L)); + SearchDatasetNode node = new SearchDatasetNode(datasetRef, """ + SELECT id, name + FROM orders_{{biz}} + WHERE name LIKE '%{{keyword}}%' + ORDER BY created_at {{direction}} + """); + + Map params = new HashMap<>(); + params.put("biz", "prod"); + params.put("keyword", "vip"); + params.put("direction", "DESC"); + + String sql = invokeResolveQuerySql(node, params); + Assert.assertEquals(""" + SELECT id, name + FROM orders_prod + WHERE name LIKE '%vip%' + ORDER BY created_at DESC + """.trim(), sql); + } + + @Test + public void testResolveQuerySqlShouldUseNodeQuerySqlWhenParamsDoNotContainSql() throws Exception { + DatasetRef datasetRef = new DatasetRef(); + datasetRef.setSourceId(BigInteger.valueOf(2002L)); + SearchDatasetNode node = new SearchDatasetNode(datasetRef, "SELECT * FROM fallback_table"); + + Map params = new HashMap<>(); + + String sql = invokeResolveQuerySql(node, params); + Assert.assertEquals("SELECT * FROM fallback_table", sql); + } + + @Test + public void testResolveQuerySqlShouldRejectBlankSql() throws Exception { + DatasetRef datasetRef = new DatasetRef(); + datasetRef.setSourceId(BigInteger.valueOf(3003L)); + SearchDatasetNode node = new SearchDatasetNode(datasetRef, " "); + + try { + invokeResolveQuerySql(node, new HashMap<>()); + Assert.fail("expected BusinessException"); + } catch (Exception e) { + Throwable cause = e.getCause() == null ? e : e.getCause(); + Assert.assertTrue(cause instanceof BusinessException); + Assert.assertEquals("查询数据节点未设置 SQL", cause.getMessage()); + } + } + + private String invokeResolveQuerySql(SearchDatasetNode node, Map params) throws Exception { + Method method = SearchDatasetNode.class.getDeclaredMethod("resolveQuerySql", Map.class); + method.setAccessible(true); + return (String) method.invoke(node, params); + } +} diff --git a/easyflow-ui-admin/app/src/locales/langs/en-US/aiWorkflow.json b/easyflow-ui-admin/app/src/locales/langs/en-US/aiWorkflow.json index d1d859b..198cb4f 100644 --- a/easyflow-ui-admin/app/src/locales/langs/en-US/aiWorkflow.json +++ b/easyflow-ui-admin/app/src/locales/langs/en-US/aiWorkflow.json @@ -43,11 +43,19 @@ "fileDownloadURL": "FileDownloadURL", "pluginSelect": "PluginSelect", "saveData": "SaveData", + "saveDataset": "Write Data", "dataToBeSaved": "DataToBeSaved", "successInsertedRecords": "SuccessInsertedRecords", "dataTable": "DataTable", + "dataset": "Dataset", + "datasetDsl": "Query Conditions", + "datasetDslPlaceholder": "Structured query conditions", "queryData": "QueryData", + "queryDataset": "Query Data", + "querySpec": "SQL", + "querySql": "SQL", "queryResult": "QueryResult", + "querySummary": "QuerySummary", "filterConditions": "FilterConditions", "limit": "Limit", "sqlQuery": "SQL Query", @@ -79,9 +87,15 @@ "fileDownloadURL": "Generated file URL", "plugin": "Select a predefined plugin", "saveData": "Save data to data hub", + "saveDataset": "Write data into a managed table", "dataToBeSaved": "List of data to be saved", "dataTable": "Please select a data table", + "dataset": "Please select a managed table", "queryData": "Query data from the data hub", + "queryDataset": "Run a read-only SQL query through the selected connection service", + "querySpec": "Enter SQL. Parameters can be referenced", + "querySql": "Enter SQL. Parameters can be referenced", + "datasetDsl": "Enter read-only SQL. Write statements, multiple statements, and unmanaged tables are not allowed", "sqlQuery": "Query the database via SQL", "enterSQL": "Please enter the SQL statement", "queryResultJson": "Query result (JSON object)", diff --git a/easyflow-ui-admin/app/src/locales/langs/zh-CN/aiWorkflow.json b/easyflow-ui-admin/app/src/locales/langs/zh-CN/aiWorkflow.json index c29f0ea..cc787df 100644 --- a/easyflow-ui-admin/app/src/locales/langs/zh-CN/aiWorkflow.json +++ b/easyflow-ui-admin/app/src/locales/langs/zh-CN/aiWorkflow.json @@ -43,11 +43,19 @@ "fileDownloadURL": "文件下载地址", "pluginSelect": "插件选择", "saveData": "保存数据", + "saveDataset": "写入数据", "dataToBeSaved": "待保存的数据", "successInsertedRecords": "成功插入条数", "dataTable": "数据表", + "dataset": "数据集", + "datasetDsl": "查询条件", + "datasetDslPlaceholder": "结构化查询条件", "queryData": "查询数据", + "queryDataset": "查询数据", + "querySpec": "SQL", + "querySql": "SQL", "queryResult": "查询结果", + "querySummary": "查询摘要", "filterConditions": "过滤条件", "limit": "限制条数", "sqlQuery": "SQL 查询", @@ -79,9 +87,15 @@ "fileDownloadURL": "生成后的文件地址", "plugin": "选择定义好的插件", "saveData": "保存数据到数据中枢", + "saveDataset": "将数据写入已接入表", "dataToBeSaved": "待保存的数据列表", "dataTable": "请选择数据表", + "dataset": "请选择已接入表", "queryData": "查询数据中枢的数据", + "queryDataset": "按连接服务执行只读 SQL 查询", + "querySpec": "请输入 SQL,可引用输入参数", + "querySql": "请输入 SQL,可引用输入参数", + "datasetDsl": "请输入只读 SQL,不支持写入语句、多语句和未接入表访问", "sqlQuery": "通过 SQL 查询数据库", "enterSQL": "请输入SQL语句", "queryResultJson": "查询结果(json对象)", diff --git a/easyflow-ui-admin/app/src/router/routes/modules/workflow.ts b/easyflow-ui-admin/app/src/router/routes/modules/workflow.ts index 34df9ec..8bce4f3 100644 --- a/easyflow-ui-admin/app/src/router/routes/modules/workflow.ts +++ b/easyflow-ui-admin/app/src/router/routes/modules/workflow.ts @@ -6,7 +6,7 @@ const routes: RouteRecordRaw[] = [ { meta: { icon: 'ant-design:apartment-outlined', - title: $t('datacenterTable.title'), + title: '工作流设计', hideInMenu: true, activePath: '/ai/workflow', }, diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/WorkflowDesign.vue b/easyflow-ui-admin/app/src/views/ai/workflow/WorkflowDesign.vue index 6c38ba1..a45656e 100644 --- a/easyflow-ui-admin/app/src/views/ai/workflow/WorkflowDesign.vue +++ b/easyflow-ui-admin/app/src/views/ai/workflow/WorkflowDesign.vue @@ -20,7 +20,6 @@ import WorkflowForm from '#/views/ai/workflow/components/WorkflowForm.vue'; import WorkflowSteps from '#/views/ai/workflow/components/WorkflowSteps.vue'; import {getCustomNode} from './customNode/index'; -import nodeNames from './customNode/nodeNames'; import '@tinyflow-ai/vue/dist/index.css'; @@ -59,17 +58,35 @@ const codeEngineList = ref([ available: true, }, ]); + +function escapeHtmlAttr(value?: string) { + return String(value || '') + .replaceAll('&', '&') + .replaceAll('"', '"') + .replaceAll('<', '<') + .replaceAll('>', '>'); +} + +function buildModelIconMarkup(icon?: string, providerType?: string) { + const normalized = String(icon || '').trim(); + if (normalized) { + if (normalized.startsWith('`; + } + if (!providerType) { + return undefined; + } + return getIconByValue(providerType) || undefined; +} + const provider = computed(() => ({ llm: () => llmList.value.map((item: any) => { - let iconStr = undefined; - if (item.modelProvider?.icon) { - iconStr = ``; - } else if (item.modelProvider?.providerType) { - const svgStr = getIconByValue(item.modelProvider.providerType); - if (svgStr) { - iconStr = svgStr; - } - } + const iconStr = buildModelIconMarkup( + item.modelProvider?.icon, + item.modelProvider?.providerType, + ); // Extract brand and model name directly from the title if it contains '/' let displayTitle = item.title || ''; @@ -330,15 +347,15 @@ async function runCheck(stage: WorkflowCheckStage, silentPass: boolean = false) stage, }); checkResult.value = res.data; + const issues = Array.isArray(res.data?.issues) ? res.data.issues : []; + checkIssuesVisible.value = issues.length > 0; if (!res.data?.passed) { - checkIssuesVisible.value = true; ElMessage.error($t('aiWorkflow.checkFailed')); return false; } - checkIssuesVisible.value = false; focusedIssueKey.value = ''; issueFocusActive.value = false; - if (!silentPass) { + if (!silentPass && issues.length === 0) { ElMessage.success($t('aiWorkflow.checkPassed')); } return true; diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetNodeRenderer.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetNodeRenderer.ts new file mode 100644 index 0000000..550f40a --- /dev/null +++ b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetNodeRenderer.ts @@ -0,0 +1,573 @@ +import type { + DatasetRefPayload, + ManagedDatasetOption, + ManagedDatasetSourceOption, +} from './datasetOptions'; +import huaweiIcon from '#/assets/datacenter/huawei-icon.svg'; +import mysqlIcon from '#/assets/datacenter/mysql-icon.svg'; +import postgresqlIcon from '#/assets/datacenter/postgresql-icon.svg'; +import { + groupManagedDatasetOptions, + loadManagedDatasetOptions, +} from './datasetOptions'; + +const SOURCE_LOGO_MAP: Record = { + EXCEL: 'excel', + EXCEL_MATERIALIZED: 'excel', + GAUSSDB_NATIVE: 'gaussdb', + GBASE_8A: 'gbase', + GBASE_8S: 'gbase', + MYSQL: 'mysql', + ORACLE: 'oracle', + POSTGRESQL: 'postgresql', + PROJECT_MYSQL: 'mysql', +}; + +type NodeLike = { + id: string; + data?: Record; +}; + +type UpdateNodeData = ( + nodeId: string, + data: Record | ((node: Record) => Record), +) => void; + +type FlowInstance = { + updateNodeData: UpdateNodeData; +}; + +type RenderContext = FlowInstance | undefined; + +type RendererState = { + pickerOpen: boolean; + loadingOptions: boolean; + optionsLoaded: boolean; + options: ManagedDatasetOption[]; + sources: ManagedDatasetSourceOption[]; + tableSearchText: string; + updateNodeData?: UpdateNodeData; +}; + +function getState(parent: HTMLElement): RendererState { + const holder = parent as HTMLElement & { __datasetState?: RendererState }; + if (!holder.__datasetState) { + holder.__datasetState = { + pickerOpen: false, + loadingOptions: false, + optionsLoaded: false, + options: [], + sources: [], + tableSearchText: '', + }; + } + return holder.__datasetState; +} + +function getUpdateNodeData(parent: HTMLElement, flowInstance?: RenderContext) { + const state = getState(parent); + if (flowInstance?.updateNodeData) { + state.updateNodeData = flowInstance.updateNodeData.bind(flowInstance); + } + return state.updateNodeData; +} + +function getDatasetKey(datasetRef?: DatasetRefPayload | null) { + return datasetRef?.tableId == null ? '' : String(datasetRef.tableId); +} + +function getSourceKey(datasetRef?: DatasetRefPayload | null) { + return datasetRef?.sourceId == null ? '' : String(datasetRef.sourceId); +} + +function createSourceOnlyDatasetRef(source: ManagedDatasetSourceOption): DatasetRefPayload { + return { + sourceId: source.sourceId, + catalogId: null, + catalogName: '', + tableId: null, + tableName: '', + versionId: null, + }; +} + +function escapeHtml(value?: string | number | null) { + return String(value ?? '') + .replaceAll('&', '&') + .replaceAll('<', '<') + .replaceAll('>', '>') + .replaceAll('"', '"') + .replaceAll("'", '''); +} + +function getSourceLogoType(sourceType?: string) { + return SOURCE_LOGO_MAP[sourceType || ''] || 'default'; +} + +function buildSourceLogo(sourceType?: string) { + const logoType = getSourceLogoType(sourceType); + if (logoType === 'mysql') { + return ``; + } + if (logoType === 'postgresql') { + return ``; + } + if (logoType === 'gaussdb') { + return ``; + } + if (logoType === 'oracle') { + return ` + + `; + } + if (logoType === 'gbase') { + return ` + + `; + } + if (logoType === 'excel') { + return ` + + `; + } + return ` + + `; +} + +function buildStyles() { + return ` + + `; +} + +function filterTableOptions(options: ManagedDatasetOption[], searchText: string) { + const keyword = searchText.trim().toLowerCase(); + if (!keyword) { + return options; + } + return options.filter((option) => option.keywords.includes(keyword)); +} + +function ensureOptionsLoaded( + state: RendererState, + parent: HTMLElement, + node: NodeLike, + flowInstance: RenderContext, + rerender: (parent: HTMLElement, node: NodeLike, flowInstance?: RenderContext) => void, +) { + if (state.loadingOptions || state.optionsLoaded) { + return; + } + state.loadingOptions = true; + loadManagedDatasetOptions() + .then((options) => { + state.options = options; + state.sources = groupManagedDatasetOptions(options); + state.optionsLoaded = true; + }) + .finally(() => { + state.loadingOptions = false; + rerender(parent, node, flowInstance); + }); +} + +function buildPickerListItem( + title: string, + meta: string, + active: boolean, + action: string, + extraAttr: string, + iconHtml: string = '', +) { + return ` + + `; +} + +function buildSourceList(options: ManagedDatasetSourceOption[], activeKey: string, emptyText: string) { + if (!options.length) { + return `
${emptyText}
`; + } + return options + .map((option) => + buildPickerListItem( + option.sourceName, + `${option.tables.length} 张表`, + activeKey === String(option.sourceId), + 'select-source', + `data-source-id="${escapeHtml(option.sourceId)}"`, + buildSourceLogo(option.sourceType), + ), + ) + .join(''); +} + +function buildTableList(options: ManagedDatasetOption[], activeKey: string, emptyText: string) { + if (!options.length) { + return `
${emptyText}
`; + } + return options + .map((option) => + buildPickerListItem( + option.tableName, + `${option.sourceName} / ${option.catalogName}`, + activeKey === String(option.datasetRef.tableId), + 'select-dataset', + `data-table-id="${escapeHtml(option.datasetRef.tableId)}"`, + buildSourceLogo(option.sourceType), + ), + ) + .join(''); +} + +function buildSearchSummary(currentSource?: ManagedDatasetSourceOption) { + if (!currentSource) { + return '请选择连接服务'; + } + return ` + ${buildSourceLogo(currentSource.sourceType)} + + ${escapeHtml(currentSource.sourceName)} + + `; +} + +function bindInteractiveElements(parent: HTMLElement) { + parent.querySelectorAll('button, input, select, textarea').forEach((element) => { + element.onpointerdown = (event) => event.stopPropagation(); + element.onmousedown = (event) => event.stopPropagation(); + }); +} + +export function rerenderSearchNode(parent: HTMLElement, node: NodeLike, flowInstance?: RenderContext) { + const state = getState(parent); + const updateNodeData = getUpdateNodeData(parent, flowInstance); + ensureOptionsLoaded(state, parent, node, flowInstance, rerenderSearchNode); + + const datasetRef = (node.data?.datasetRef || null) as DatasetRefPayload | null; + const sourceKey = getSourceKey(datasetRef); + const currentSource = state.sources.find((item) => String(item.sourceId) === sourceKey); + + parent.innerHTML = ` + ${buildStyles()} +
+
连接服务
+
+ + ${state.pickerOpen ? ` +
+
+ ${state.loadingOptions ? '
正在加载连接服务...
' : buildSourceList(state.sources, sourceKey, '暂无可用连接服务')} +
+
+ ` : ''} +
+
+ `; + + bindInteractiveElements(parent); + + parent.querySelector('[data-action="toggle-picker"]')?.addEventListener('click', (event) => { + event.preventDefault(); + event.stopPropagation(); + state.pickerOpen = !state.pickerOpen; + rerenderSearchNode(parent, node, flowInstance); + }); + + parent.querySelectorAll('[data-action="select-source"]').forEach((element) => { + element.addEventListener('click', (event) => { + event.preventDefault(); + event.stopPropagation(); + const sourceId = element.dataset.sourceId; + const source = state.sources.find((item) => String(item.sourceId) === String(sourceId)); + if (!source) { + return; + } + state.pickerOpen = false; + const nextDatasetRef = createSourceOnlyDatasetRef(source); + node.data = { + ...(node.data || {}), + datasetRef: nextDatasetRef, + sourceName: source.sourceName, + sourceType: source.sourceType, + }; + updateNodeData?.(node.id, { + datasetRef: nextDatasetRef, + sourceName: source.sourceName, + sourceType: source.sourceType, + }); + rerenderSearchNode(parent, node, flowInstance); + }); + }); +} + +export function rerenderSaveNode(parent: HTMLElement, node: NodeLike, flowInstance?: RenderContext) { + const state = getState(parent); + const updateNodeData = getUpdateNodeData(parent, flowInstance); + ensureOptionsLoaded(state, parent, node, flowInstance, rerenderSaveNode); + + const datasetRef = (node.data?.datasetRef || null) as DatasetRefPayload | null; + const activeKey = getDatasetKey(datasetRef); + const currentOption = state.options.find((item) => String(item.datasetRef.tableId) === activeKey); + const filtered = filterTableOptions(state.options, state.tableSearchText); + + parent.innerHTML = ` + ${buildStyles()} +
+
已接入表
+ +
+
+ ${state.loadingOptions ? '
正在加载已接入表...
' : buildTableList(filtered, activeKey, '没有匹配的已接入表')} +
+
+
+ `; + + bindInteractiveElements(parent); + + parent.querySelector('[data-role="table-search"]')?.addEventListener('input', (event) => { + event.stopPropagation(); + state.tableSearchText = (event.currentTarget as HTMLInputElement).value || ''; + rerenderSaveNode(parent, node, flowInstance); + }); + + parent.querySelectorAll('[data-action="select-dataset"]').forEach((element) => { + element.addEventListener('click', (event) => { + event.preventDefault(); + event.stopPropagation(); + const tableId = element.dataset.tableId; + const option = state.options.find((item) => String(item.datasetRef.tableId) === String(tableId)); + if (!option) { + return; + } + node.data = { + ...(node.data || {}), + datasetRef: option.datasetRef, + sourceName: option.sourceName, + sourceType: option.sourceType, + }; + updateNodeData?.(node.id, { + datasetRef: option.datasetRef, + sourceName: option.sourceName, + sourceType: option.sourceType, + }); + rerenderSaveNode(parent, node, flowInstance); + }); + }); +} diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetOptions.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetOptions.ts new file mode 100644 index 0000000..c768e94 --- /dev/null +++ b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/datasetOptions.ts @@ -0,0 +1,184 @@ +import { api } from '#/api/request'; + +export interface DatasetRefPayload { + sourceId: number | string | null; + catalogId?: number | string | null; + catalogName?: string; + tableId: number | string | null; + tableName: string; + versionId?: number | string | null; +} + +export interface ManagedDatasetFieldOption { + fieldName: string; + fieldDesc?: string; + fieldType?: string; +} + +export interface ManagedDatasetSchema { + tableName?: string; + tableDesc?: string; + fields: ManagedDatasetFieldOption[]; +} + +export interface ManagedDatasetOption { + label: string; + value: number | string; + keywords: string; + sourceName: string; + sourceType?: string; + catalogName: string; + tableName: string; + datasetRef: DatasetRefPayload; +} + +export interface ManagedDatasetSourceOption { + sourceId: number | string; + sourceName: string; + sourceType?: string; + label: string; + keywords: string; + tables: ManagedDatasetOption[]; +} + +const SOURCE_MISSING_MESSAGE = '连接不存在'; +const SOURCE_UNAVAILABLE_MESSAGE = '当前连接不可用,请检查连接配置后重试'; + +function shouldSkipSourceError(error: any) { + const responseData = error?.response?.data ?? {}; + const message = String(responseData?.message ?? error?.message ?? ''); + + return ( + message.includes(SOURCE_MISSING_MESSAGE) || + message.includes(SOURCE_UNAVAILABLE_MESSAGE) + ); +} + +function dedupeManagedDatasetOptions(options: ManagedDatasetOption[]) { + const uniqueOptions = new Map(); + for (const option of options || []) { + const key = option.datasetRef?.tableId != null + ? String(option.datasetRef.tableId) + : [ + option.datasetRef?.sourceId ?? '', + option.datasetRef?.catalogId ?? '', + option.tableName ?? '', + ].join(':'); + if (!uniqueOptions.has(key)) { + uniqueOptions.set(key, option); + } + } + return Array.from(uniqueOptions.values()); +} + +export async function loadManagedDatasetOptions(): Promise { + const sourceRes = await api.get('/api/v1/datacenterSource/page', { + params: { + pageNumber: 1, + pageSize: 200, + }, + }); + const sources = sourceRes.data?.records || []; + const options: ManagedDatasetOption[] = []; + for (const source of sources) { + try { + const catalogRes = await api.get('/api/v1/datacenterSource/catalogs', { + params: { + sourceId: source.id, + }, + }); + const catalogs = catalogRes.data || []; + for (const catalog of catalogs) { + const tableRes = await api.get('/api/v1/datacenterDataset/managedTables', { + params: { + sourceId: source.id, + catalogId: catalog.id, + }, + }); + const tables = tableRes.data || []; + for (const table of tables) { + const label = `${source.sourceName} / ${catalog.catalogName} / ${table.tableName}`; + options.push({ + label, + value: table.id, + keywords: `${source.sourceName} ${catalog.catalogName} ${table.tableName}`.toLowerCase(), + sourceName: source.sourceName, + sourceType: source.sourceType, + catalogName: catalog.catalogName, + tableName: table.tableName, + datasetRef: { + sourceId: source.id, + catalogId: catalog.id, + catalogName: catalog.catalogName, + tableId: table.id, + tableName: table.tableName, + }, + }); + } + } + } catch (error) { + if (shouldSkipSourceError(error)) { + continue; + } + throw error; + } + } + return dedupeManagedDatasetOptions(options); +} + +export function groupManagedDatasetOptions( + options: ManagedDatasetOption[], +): ManagedDatasetSourceOption[] { + const grouped = new Map(); + for (const option of dedupeManagedDatasetOptions(options || [])) { + const sourceId = option.datasetRef?.sourceId; + if (sourceId == null) { + continue; + } + if (!grouped.has(sourceId)) { + grouped.set(sourceId, { + sourceId, + sourceName: option.sourceName, + sourceType: option.sourceType, + label: option.sourceName, + keywords: option.sourceName.toLowerCase(), + tables: [], + }); + } + grouped.get(sourceId)!.tables.push(option); + } + return Array.from(grouped.values()).map((item) => ({ + ...item, + keywords: `${item.sourceName} ${item.tables + .map((table) => `${table.catalogName} ${table.tableName}`) + .join(' ')}`.toLowerCase(), + tables: item.tables.sort((a, b) => a.label.localeCompare(b.label)), + })); +} + +export async function loadManagedDatasetSchema( + datasetRef?: DatasetRefPayload | null, +): Promise { + if (!datasetRef?.tableId) { + return { + tableName: datasetRef?.tableName, + fields: [], + }; + } + const res = await api.get('/api/v1/datacenterDataset/schema', { + params: datasetRef, + }); + const data = res.data || {}; + const fields = Array.isArray(data.fields) + ? data.fields.map((field: any) => ({ + fieldName: field.fieldName, + fieldDesc: field.fieldDesc, + fieldType: field.jdbcType || field.fieldType, + })) + : []; + return { + tableName: data.table?.tableName || datasetRef.tableName, + tableDesc: data.table?.tableDesc, + fields, + }; +} diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/index.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/index.ts index 6356070..10b4d39 100644 --- a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/index.ts +++ b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/index.ts @@ -3,9 +3,8 @@ import downloadNode from './downloadNode'; import makeFileNode from './makeFileNode'; import nodeNames from './nodeNames'; import { PluginNode } from './pluginNode'; -import { SaveToDatacenterNode } from './saveToDatacenter'; -import { SearchDatacenterNode } from './searchDatacenter'; -import sqlNode from './sqlNode'; +import { SaveDatasetNode } from './saveDataset'; +import { SearchDatasetNode } from './searchDataset'; import { WorkflowNode } from './workflowNode'; export interface CustomNodeOptions { @@ -14,16 +13,15 @@ export interface CustomNodeOptions { export const getCustomNode = async (options: CustomNodeOptions) => { const pluginNode = PluginNode({ onChosen: options.handleChosen }); const workflowNode = WorkflowNode({ onChosen: options.handleChosen }); - const searchDatacenterNode = await SearchDatacenterNode(); - const saveToDatacenterNode = await SaveToDatacenterNode(); + const searchDatasetNode = await SearchDatasetNode(); + const saveDatasetNode = await SaveDatasetNode(); return { ...docNode, ...makeFileNode, ...downloadNode, - ...sqlNode, [nodeNames.pluginNode]: pluginNode, [nodeNames.workflowNode]: workflowNode, - [nodeNames.searchDatacenterNode]: searchDatacenterNode, - [nodeNames.saveToDatacenterNode]: saveToDatacenterNode, + [nodeNames.searchDatasetNode]: searchDatasetNode, + [nodeNames.saveDatasetNode]: saveDatasetNode, }; }; diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/nodeNames.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/nodeNames.ts index d727f0c..c2a97a5 100644 --- a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/nodeNames.ts +++ b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/nodeNames.ts @@ -2,9 +2,8 @@ export default { documentNode: 'document-node', makeFileNode: 'make-file', downloadNode: 'download-node', - sqlNode: 'sql-node', pluginNode: 'plugin-node', workflowNode: 'workflow-node', - searchDatacenterNode: 'search-datacenter-node', - saveToDatacenterNode: 'save-to-datacenter-node', + searchDatasetNode: 'search-dataset-node', + saveDatasetNode: 'save-dataset-node', }; diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveDataset.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveDataset.ts new file mode 100644 index 0000000..f7bb1f8 --- /dev/null +++ b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveDataset.ts @@ -0,0 +1,43 @@ +import { $t } from '#/locales'; + +import { rerenderSaveNode } from './datasetNodeRenderer'; + +export const SaveDatasetNode = async () => { + return { + title: $t('aiWorkflow.saveDataset'), + group: 'base', + description: $t('aiWorkflow.descriptions.saveDataset'), + icon: '', + sortNo: 812, + parametersAddEnable: false, + outputDefsAddEnable: false, + parameters: [ + { + name: 'saveList', + title: $t('aiWorkflow.dataToBeSaved'), + dataType: 'Array', + dataTypeDisabled: true, + required: true, + parametersAddEnable: false, + description: $t('aiWorkflow.descriptions.dataToBeSaved'), + deleteDisabled: true, + nameDisabled: true, + }, + ], + outputDefs: [ + { + name: 'successRows', + title: $t('aiWorkflow.successInsertedRecords'), + dataType: 'Number', + dataTypeDisabled: true, + required: true, + parametersAddEnable: false, + description: $t('aiWorkflow.successInsertedRecords'), + deleteDisabled: true, + nameDisabled: true, + }, + ], + render: rerenderSaveNode, + onUpdate: rerenderSaveNode, + }; +}; diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveToDatacenter.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveToDatacenter.ts deleted file mode 100644 index 4205e92..0000000 --- a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/saveToDatacenter.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { getOptions } from '@easyflow/utils'; - -import { api } from '#/api/request'; -import { $t } from '#/locales'; - -export const SaveToDatacenterNode = async () => { - const res = await api.get('/api/v1/datacenterTable/list'); - - return { - title: $t('aiWorkflow.saveData'), - group: 'base', - description: $t('aiWorkflow.descriptions.saveData'), - icon: '', - sortNo: 812, - parametersAddEnable: false, - outputDefsAddEnable: false, - parameters: [ - { - name: 'saveList', - title: $t('aiWorkflow.dataToBeSaved'), - dataType: 'Array', - dataTypeDisabled: true, - required: true, - parametersAddEnable: false, - description: $t('aiWorkflow.descriptions.dataToBeSaved'), - deleteDisabled: true, - nameDisabled: true, - }, - ], - outputDefs: [ - { - name: 'successRows', - title: $t('aiWorkflow.successInsertedRecords'), - dataType: 'Number', - dataTypeDisabled: true, - required: true, - parametersAddEnable: false, - description: $t('aiWorkflow.successInsertedRecords'), - deleteDisabled: true, - nameDisabled: true, - }, - ], - forms: [ - { - type: 'heading', - label: $t('aiWorkflow.dataTable'), - }, - { - type: 'select', - label: '', - description: $t('aiWorkflow.descriptions.dataTable'), - name: 'tableId', - defaultValue: '', - options: getOptions('tableName', 'id', res.data), - }, - ], - }; -}; diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDatacenter.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDatacenter.ts deleted file mode 100644 index 13cb551..0000000 --- a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDatacenter.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { getOptions } from '@easyflow/utils'; - -import { api } from '#/api/request'; -import { $t } from '#/locales'; - -export const SearchDatacenterNode = async () => { - const res = await api.get('/api/v1/datacenterTable/list'); - - return { - title: $t('aiWorkflow.queryData'), - group: 'base', - description: $t('aiWorkflow.descriptions.queryData'), - icon: '', - sortNo: 813, - parametersAddEnable: true, - outputDefsAddEnable: false, - parameters: [], - outputDefs: [ - { - name: 'rows', - title: $t('aiWorkflow.queryResult'), - dataType: 'Array', - dataTypeDisabled: true, - required: true, - parametersAddEnable: false, - description: $t('aiWorkflow.queryResult'), - deleteDisabled: true, - nameDisabled: false, - }, - ], - forms: [ - { - type: 'heading', - label: $t('aiWorkflow.dataTable'), - }, - { - type: 'select', - label: '', - description: $t('aiWorkflow.descriptions.dataTable'), - name: 'tableId', - defaultValue: '', - options: getOptions('tableName', 'id', res.data), - }, - { - type: 'heading', - label: $t('aiWorkflow.filterConditions'), - }, - { - type: 'textarea', - templateSupport: true, - label: "如:name='张三' and age=21 or field = {{流程变量}}", - description: '', - name: 'where', - defaultValue: '', - }, - { - type: 'heading', - label: $t('aiWorkflow.limit'), - }, - { - type: 'input', - label: '', - description: '', - name: 'limit', - defaultValue: '10', - }, - ], - }; -}; diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDataset.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDataset.ts new file mode 100644 index 0000000..aabc85d --- /dev/null +++ b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/searchDataset.ts @@ -0,0 +1,44 @@ +import { $t } from '#/locales'; + +import { rerenderSearchNode } from './datasetNodeRenderer'; + +export const SearchDatasetNode = async () => { + return { + title: $t('aiWorkflow.queryDataset'), + group: 'base', + description: $t('aiWorkflow.descriptions.queryDataset'), + icon: '', + sortNo: 813, + parametersAddEnable: true, + outputDefsAddEnable: false, + renderFirst: true, + parameters: [], + forms: [ + { + name: 'querySql', + type: 'textarea', + templateSupport: true, + label: 'SQL', + placeholder: $t('aiWorkflow.descriptions.enterSQL'), + attrs: { + rows: 6, + }, + }, + ], + outputDefs: [ + { + name: 'data', + title: 'data', + dataType: 'Array', + dataTypeDisabled: true, + required: true, + parametersAddEnable: false, + description: 'data', + deleteDisabled: true, + nameDisabled: true, + }, + ], + render: rerenderSearchNode, + onUpdate: rerenderSearchNode, + }; +}; diff --git a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/sqlNode.ts b/easyflow-ui-admin/app/src/views/ai/workflow/customNode/sqlNode.ts deleted file mode 100644 index cb3e406..0000000 --- a/easyflow-ui-admin/app/src/views/ai/workflow/customNode/sqlNode.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { $t } from '#/locales'; - -import nodeNames from './nodeNames'; - -export default { - [nodeNames.sqlNode]: { - title: $t('aiWorkflow.sqlQuery'), - group: 'base', - description: $t('aiWorkflow.descriptions.sqlQuery'), - icon: '', - sortNo: 803, - parametersAddEnable: true, - outputDefsAddEnable: true, - parameters: [], - forms: [ - { - name: 'sql', - type: 'textarea', - templateSupport: true, - label: 'SQL', - placeholder: $t('aiWorkflow.descriptions.enterSQL'), - }, - ], - outputDefs: [ - { - name: 'queryData', - title: $t('aiWorkflow.queryResult'), - dataType: 'Array', - dataTypeDisabled: true, - required: true, - parametersAddEnable: false, - description: $t('aiWorkflow.descriptions.queryResultJson'), - deleteDisabled: true, - nameDisabled: true, - }, - ], - }, -}; diff --git a/easyflow-ui-admin/packages/tinyflow-ui/src/components/base/select.svelte b/easyflow-ui-admin/packages/tinyflow-ui/src/components/base/select.svelte index 42f42b2..9df041d 100644 --- a/easyflow-ui-admin/packages/tinyflow-ui/src/components/base/select.svelte +++ b/easyflow-ui-admin/packages/tinyflow-ui/src/components/base/select.svelte @@ -77,6 +77,10 @@ } } + function isMarkupIcon(icon?: string) { + return typeof icon === 'string' && icon.trim().startsWith('<'); + } + {#snippet renderDefaultItems(items: SelectItem[], depth = 0)} @@ -97,7 +101,13 @@ {#if group.selectable === false}
{#if group.icon} - {@html group.icon} + + {#if isMarkupIcon(group.icon)} + {@html group.icon} + {:else} + + {/if} + {/if} {group.label}
@@ -105,7 +115,11 @@