feat: 重构数据中枢工作台与接入管理

- 新增统一的数据源、目录、纳管表与 Excel 处理后端能力

- 重建管理端数据中枢工作台并替换旧表管理页面

- 补充数据中枢迁移脚本、连接器底座与说明字段支持
This commit is contained in:
2026-04-02 18:55:31 +08:00
parent b6213d0933
commit 798effbd5b
117 changed files with 9739 additions and 1824 deletions

View File

@@ -0,0 +1,24 @@
package tech.easyflow.datacenter.execution.model;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class DatacenterConnectionTestResult {
private boolean success;
private String errorCode;
private String message;
private List<String> capabilities = new ArrayList<>();
private Map<String, Object> details;
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
public String getErrorCode() { return errorCode; }
public void setErrorCode(String errorCode) { this.errorCode = errorCode; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public List<String> getCapabilities() { return capabilities; }
public void setCapabilities(List<String> capabilities) { this.capabilities = capabilities; }
public Map<String, Object> getDetails() { return details; }
public void setDetails(Map<String, Object> details) { this.details = details; }
}

View File

@@ -0,0 +1,19 @@
package tech.easyflow.datacenter.execution.model;
import java.util.List;
public class DatacenterQueryFilter {
private String column;
private String operator;
private Object value;
private List<Object> values;
public String getColumn() { return column; }
public void setColumn(String column) { this.column = column; }
public String getOperator() { return operator; }
public void setOperator(String operator) { this.operator = operator; }
public Object getValue() { return value; }
public void setValue(Object value) { this.value = value; }
public List<Object> getValues() { return values; }
public void setValues(List<Object> values) { this.values = values; }
}

View File

@@ -0,0 +1,29 @@
package tech.easyflow.datacenter.execution.model;
import java.util.ArrayList;
import java.util.List;
public class DatacenterQueryRequest {
private DatasetRef datasetRef;
private Long pageNumber = 1L;
private Long pageSize = 10L;
private List<DatacenterQueryFilter> filters = new ArrayList<>();
private List<DatacenterQuerySort> sorts = new ArrayList<>();
private List<String> selectedColumns = new ArrayList<>();
private String where;
public DatasetRef getDatasetRef() { return datasetRef; }
public void setDatasetRef(DatasetRef datasetRef) { this.datasetRef = datasetRef; }
public Long getPageNumber() { return pageNumber; }
public void setPageNumber(Long pageNumber) { this.pageNumber = pageNumber; }
public Long getPageSize() { return pageSize; }
public void setPageSize(Long pageSize) { this.pageSize = pageSize; }
public List<DatacenterQueryFilter> getFilters() { return filters; }
public void setFilters(List<DatacenterQueryFilter> filters) { this.filters = filters; }
public List<DatacenterQuerySort> getSorts() { return sorts; }
public void setSorts(List<DatacenterQuerySort> sorts) { this.sorts = sorts; }
public List<String> getSelectedColumns() { return selectedColumns; }
public void setSelectedColumns(List<String> selectedColumns) { this.selectedColumns = selectedColumns; }
public String getWhere() { return where; }
public void setWhere(String where) { this.where = where; }
}

View File

@@ -0,0 +1,11 @@
package tech.easyflow.datacenter.execution.model;
public class DatacenterQuerySort {
private String column;
private String direction;
public String getColumn() { return column; }
public void setColumn(String column) { this.column = column; }
public String getDirection() { return direction; }
public void setDirection(String direction) { this.direction = direction; }
}

View File

@@ -0,0 +1,39 @@
package tech.easyflow.datacenter.execution.model;
import tech.easyflow.datacenter.entity.DatacenterTable;
import tech.easyflow.datacenter.entity.DatacenterTableField;
import tech.easyflow.datacenter.meta.entity.DatacenterCatalog;
import tech.easyflow.datacenter.meta.entity.DatacenterDatasetVersion;
import tech.easyflow.datacenter.meta.entity.DatacenterDerivedTable;
import tech.easyflow.datacenter.meta.entity.DatacenterSource;
import java.util.ArrayList;
import java.util.List;
public class DatacenterSchemaResponse {
private DatasetRef datasetRef;
private DatacenterSource source;
private DatacenterCatalog catalog;
private DatacenterTable table;
private List<DatacenterTableField> fields = new ArrayList<>();
private List<DatacenterDatasetVersion> versions = new ArrayList<>();
private List<DatacenterDerivedTable> upstreamLineage = new ArrayList<>();
private List<DatacenterDerivedTable> downstreamLineage = new ArrayList<>();
public DatasetRef getDatasetRef() { return datasetRef; }
public void setDatasetRef(DatasetRef datasetRef) { this.datasetRef = datasetRef; }
public DatacenterSource getSource() { return source; }
public void setSource(DatacenterSource source) { this.source = source; }
public DatacenterCatalog getCatalog() { return catalog; }
public void setCatalog(DatacenterCatalog catalog) { this.catalog = catalog; }
public DatacenterTable getTable() { return table; }
public void setTable(DatacenterTable table) { this.table = table; }
public List<DatacenterTableField> getFields() { return fields; }
public void setFields(List<DatacenterTableField> fields) { this.fields = fields; }
public List<DatacenterDatasetVersion> getVersions() { return versions; }
public void setVersions(List<DatacenterDatasetVersion> versions) { this.versions = versions; }
public List<DatacenterDerivedTable> getUpstreamLineage() { return upstreamLineage; }
public void setUpstreamLineage(List<DatacenterDerivedTable> upstreamLineage) { this.upstreamLineage = upstreamLineage; }
public List<DatacenterDerivedTable> getDownstreamLineage() { return downstreamLineage; }
public void setDownstreamLineage(List<DatacenterDerivedTable> downstreamLineage) { this.downstreamLineage = downstreamLineage; }
}

View File

@@ -0,0 +1,23 @@
package tech.easyflow.datacenter.execution.model;
public class DatacenterSqlQueryRequest {
private DatasetRef datasetRef;
private String sql;
public DatasetRef getDatasetRef() {
return datasetRef;
}
public void setDatasetRef(DatasetRef datasetRef) {
this.datasetRef = datasetRef;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}

View File

@@ -0,0 +1,25 @@
package tech.easyflow.datacenter.execution.model;
import java.math.BigInteger;
public class DatasetRef {
private BigInteger sourceId;
private BigInteger catalogId;
private String catalogName;
private BigInteger tableId;
private String tableName;
private BigInteger versionId;
public BigInteger getSourceId() { return sourceId; }
public void setSourceId(BigInteger sourceId) { this.sourceId = sourceId; }
public BigInteger getCatalogId() { return catalogId; }
public void setCatalogId(BigInteger catalogId) { this.catalogId = catalogId; }
public String getCatalogName() { return catalogName; }
public void setCatalogName(String catalogName) { this.catalogName = catalogName; }
public BigInteger getTableId() { return tableId; }
public void setTableId(BigInteger tableId) { this.tableId = tableId; }
public String getTableName() { return tableName; }
public void setTableName(String tableName) { this.tableName = tableName; }
public BigInteger getVersionId() { return versionId; }
public void setVersionId(BigInteger versionId) { this.versionId = versionId; }
}

View File

@@ -0,0 +1,18 @@
package tech.easyflow.datacenter.execution.service;
import com.mybatisflex.core.paginate.Page;
import com.mybatisflex.core.row.Row;
import tech.easyflow.datacenter.execution.model.DatacenterQueryRequest;
import tech.easyflow.datacenter.execution.model.DatacenterSchemaResponse;
import tech.easyflow.datacenter.execution.model.DatacenterSqlQueryRequest;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import java.util.List;
public interface DatacenterDatasetQueryService {
Page<Row> queryPage(DatacenterQueryRequest request);
List<Row> queryBySql(DatacenterSqlQueryRequest request);
DatacenterSchemaResponse getSchema(DatasetRef datasetRef);
}

View File

@@ -0,0 +1,13 @@
package tech.easyflow.datacenter.execution.service;
import com.alibaba.fastjson2.JSONObject;
import tech.easyflow.common.entity.LoginAccount;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import java.math.BigInteger;
public interface DatacenterDatasetWriteService {
void saveRow(DatasetRef datasetRef, JSONObject data, LoginAccount account);
void deleteRow(DatasetRef datasetRef, BigInteger id, LoginAccount account);
}

View File

@@ -0,0 +1,311 @@
package tech.easyflow.datacenter.execution.service.impl;
import com.mybatisflex.core.paginate.Page;
import com.mybatisflex.core.query.QueryWrapper;
import com.mybatisflex.core.row.Row;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.connector.DatacenterConnector;
import tech.easyflow.datacenter.connector.DatacenterConnectorRegistry;
import tech.easyflow.datacenter.entity.DatacenterTable;
import tech.easyflow.datacenter.entity.DatacenterTableField;
import tech.easyflow.datacenter.execution.model.DatacenterQueryRequest;
import tech.easyflow.datacenter.execution.model.DatacenterSchemaResponse;
import tech.easyflow.datacenter.execution.model.DatacenterSqlQueryRequest;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import tech.easyflow.datacenter.execution.service.DatacenterDatasetQueryService;
import tech.easyflow.datacenter.mapper.DatacenterCatalogMapper;
import tech.easyflow.datacenter.mapper.DatacenterDatasetVersionMapper;
import tech.easyflow.datacenter.mapper.DatacenterDerivedTableMapper;
import tech.easyflow.datacenter.mapper.DatacenterTableMapper;
import tech.easyflow.datacenter.meta.entity.DatacenterCatalog;
import tech.easyflow.datacenter.meta.entity.DatacenterDatasetVersion;
import tech.easyflow.datacenter.meta.entity.DatacenterDerivedTable;
import tech.easyflow.datacenter.meta.entity.DatacenterSource;
import tech.easyflow.datacenter.meta.service.DatacenterDatasetRegistryService;
import tech.easyflow.datacenter.utils.SqlSupportUtils;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@Service
public class DatacenterDatasetQueryServiceImpl implements DatacenterDatasetQueryService {
@Resource
private DatacenterDatasetRegistryService registryService;
@Resource
private DatacenterConnectorRegistry connectorRegistry;
@Resource
private DatacenterTableMapper tableMapper;
@Resource
private DatacenterCatalogMapper catalogMapper;
@Resource
private DatacenterDatasetVersionMapper datasetVersionMapper;
@Resource
private DatacenterDerivedTableMapper derivedTableMapper;
@Override
public Page<Row> queryPage(DatacenterQueryRequest request) {
if (request == null || request.getDatasetRef() == null) {
throw new BusinessException("datasetRef 不能为空");
}
normalizePage(request);
DatacenterTable table = resolveTable(request.getDatasetRef());
DatacenterSource source = registryService.getSourceRequired(table.getSourceId());
DatacenterCatalog catalog = registryService.getCatalogById(table.getCatalogId());
DatacenterTable queryTable = resolveQueryTable(table, request.getDatasetRef());
validateRequest(queryTable, request, source);
request.getDatasetRef().setSourceId(table.getSourceId());
request.getDatasetRef().setCatalogId(table.getCatalogId());
request.getDatasetRef().setTableId(table.getId());
request.getDatasetRef().setTableName(table.getTableName());
if (catalog != null) {
request.getDatasetRef().setCatalogName(catalog.getCatalogName());
}
DatacenterConnector connector = connectorRegistry.getConnector(source.getSourceType());
return connector.queryPage(source, queryTable, request);
}
@Override
public List<Row> queryBySql(DatacenterSqlQueryRequest request) {
if (request == null || request.getDatasetRef() == null) {
throw new BusinessException("datasetRef 不能为空");
}
String sql = trimToNull(request.getSql());
if (!StringUtils.hasText(sql)) {
throw new BusinessException("SQL 不能为空");
}
DatasetRef datasetRef = request.getDatasetRef();
if (datasetRef.getSourceId() == null) {
throw new BusinessException("缺少连接服务配置");
}
DatacenterSource source = registryService.getSourceRequired(datasetRef.getSourceId());
BigInteger catalogId = resolveRequestedCatalogId(datasetRef);
List<DatacenterTable> managedTables = registryService.listManagedTables(datasetRef.getSourceId(), catalogId);
if (CollectionUtils.isEmpty(managedTables)) {
throw new BusinessException("当前连接下没有已接入表");
}
SqlSupportUtils.ResolvedSql resolvedSql = SqlSupportUtils.resolve(
sql,
managedTables.stream().map(this::toManagedSqlTable).toList()
);
DatacenterConnector connector = connectorRegistry.getConnector(source.getSourceType());
return connector.queryBySql(source, resolvedSql.getExecutableSql());
}
@Override
public DatacenterSchemaResponse getSchema(DatasetRef datasetRef) {
DatacenterTable table = resolveTable(datasetRef);
DatacenterSource source = registryService.getSourceRequired(table.getSourceId());
DatacenterCatalog catalog = registryService.getCatalogById(table.getCatalogId());
DatacenterSchemaResponse response = new DatacenterSchemaResponse();
response.setDatasetRef(registryService.resolveDatasetRef(table.getId()));
response.setSource(source);
response.setCatalog(catalog);
response.setTable(table);
response.setFields(table.getFields());
response.setVersions(listVersions(table.getId()));
response.setUpstreamLineage(listUpstream(table.getId()));
response.setDownstreamLineage(listDownstream(table.getId()));
return response;
}
private DatacenterTable resolveTable(DatasetRef datasetRef) {
if (datasetRef.getTableId() != null) {
return registryService.getTableWithFields(datasetRef.getTableId());
}
if (datasetRef.getSourceId() == null || !StringUtils.hasText(datasetRef.getTableName())) {
throw new BusinessException("缺少数据集定位信息");
}
QueryWrapper wrapper = QueryWrapper.create();
wrapper.eq(DatacenterTable::getSourceId, datasetRef.getSourceId());
wrapper.eq(DatacenterTable::getTableName, datasetRef.getTableName().trim());
boolean hasCatalogCondition = false;
if (datasetRef.getCatalogId() != null) {
wrapper.eq(DatacenterTable::getCatalogId, datasetRef.getCatalogId());
hasCatalogCondition = true;
} else if (StringUtils.hasText(datasetRef.getCatalogName())) {
DatacenterCatalog catalog = resolveCatalog(datasetRef.getSourceId(), datasetRef.getCatalogName().trim());
wrapper.eq(DatacenterTable::getCatalogId, catalog.getId());
hasCatalogCondition = true;
}
List<DatacenterTable> tables = tableMapper.selectListByQuery(wrapper);
if (CollectionUtils.isEmpty(tables)) {
throw new BusinessException("数据集不存在: " + datasetRef.getTableName());
}
if (!hasCatalogCondition && tables.size() > 1) {
throw new BusinessException("数据集存在重名表,请指定库名: " + datasetRef.getTableName());
}
return registryService.getTableWithFields(tables.get(0).getId());
}
private DatacenterCatalog resolveCatalog(java.math.BigInteger sourceId, String catalogName) {
QueryWrapper wrapper = QueryWrapper.create();
wrapper.eq(DatacenterCatalog::getSourceId, sourceId);
wrapper.eq(DatacenterCatalog::getCatalogName, catalogName);
List<DatacenterCatalog> catalogs = catalogMapper.selectListByQuery(wrapper);
if (CollectionUtils.isEmpty(catalogs)) {
throw new BusinessException("库不存在: " + catalogName);
}
if (catalogs.size() > 1) {
throw new BusinessException("库存在重复配置,请检查: " + catalogName);
}
return catalogs.get(0);
}
private BigInteger resolveRequestedCatalogId(DatasetRef datasetRef) {
if (datasetRef == null) {
return null;
}
if (datasetRef.getCatalogId() != null) {
return datasetRef.getCatalogId();
}
if (StringUtils.hasText(datasetRef.getCatalogName())) {
DatacenterCatalog catalog = resolveCatalog(datasetRef.getSourceId(), datasetRef.getCatalogName().trim());
return catalog.getId();
}
return null;
}
private SqlSupportUtils.ManagedTable toManagedSqlTable(DatacenterTable table) {
DatacenterCatalog catalog = registryService.getCatalogById(table.getCatalogId());
return new SqlSupportUtils.ManagedTable(
catalog == null ? null : catalog.getCatalogName(),
table.getTableName(),
resolvePhysicalTableName(table)
);
}
private String resolvePhysicalTableName(DatacenterTable table) {
if (table == null) {
return null;
}
if ("EXTERNAL_TABLE".equals(table.getTableKind()) || "EXTERNAL_VIEW".equals(table.getTableKind())) {
return trimToNull(table.getActualTable()) == null ? table.getTableName() : table.getActualTable().trim();
}
String materializedTable = trimToNull(table.getMaterializedTable());
if (materializedTable != null) {
return materializedTable;
}
String actualTable = trimToNull(table.getActualTable());
return actualTable != null ? actualTable : table.getTableName();
}
private void normalizePage(DatacenterQueryRequest request) {
if (request.getPageNumber() == null || request.getPageNumber() < 1L) {
request.setPageNumber(1L);
}
if (request.getPageSize() == null || request.getPageSize() < 1L) {
throw new BusinessException("pageSize 必须大于 0");
}
}
private DatacenterTable resolveQueryTable(DatacenterTable table, DatasetRef datasetRef) {
if (datasetRef == null || datasetRef.getVersionId() == null) {
return table;
}
DatacenterDatasetVersion version = datasetVersionMapper.selectOneById(datasetRef.getVersionId());
if (version == null || !table.getId().equals(version.getTableId())) {
throw new BusinessException("数据集版本不存在: " + datasetRef.getVersionId());
}
DatacenterTable queryTable = new DatacenterTable();
queryTable.setId(table.getId());
queryTable.setSourceId(table.getSourceId());
queryTable.setCatalogId(table.getCatalogId());
queryTable.setTableName(table.getTableName());
queryTable.setTableDesc(table.getTableDesc());
queryTable.setActualTable(table.getActualTable());
queryTable.setMaterializedTable(version.getMaterializedTable());
queryTable.setAccessMode(table.getAccessMode());
queryTable.setTableKind(table.getTableKind());
queryTable.setVersioningEnabled(table.getVersioningEnabled());
queryTable.setCapabilitiesJson(table.getCapabilitiesJson());
queryTable.setFields(table.getFields());
return queryTable;
}
private void validateRequest(DatacenterTable table, DatacenterQueryRequest request, DatacenterSource source) {
Map<String, DatacenterTableField> fieldMap = new LinkedHashMap<>();
for (DatacenterTableField field : table.getFields()) {
fieldMap.put(field.getFieldName(), field);
}
if (!CollectionUtils.isEmpty(request.getSelectedColumns())) {
for (String column : request.getSelectedColumns()) {
DatacenterTableField field = fieldMap.get(column);
if (field == null || !isEnabled(field.getQueryable())) {
throw new BusinessException("字段不可查询: " + column);
}
}
} else {
request.setSelectedColumns(
table.getFields().stream()
.filter(field -> isEnabled(field.getQueryable()))
.map(DatacenterTableField::getFieldName)
.toList()
);
}
if (!CollectionUtils.isEmpty(request.getFilters())) {
request.getFilters().forEach(filter -> {
DatacenterTableField field = fieldMap.get(filter.getColumn());
if (field == null || !isEnabled(field.getQueryable())) {
throw new BusinessException("字段不可过滤: " + filter.getColumn());
}
});
}
if (!CollectionUtils.isEmpty(request.getSorts())) {
request.getSorts().forEach(sort -> {
DatacenterTableField field = fieldMap.get(sort.getColumn());
if (field == null || !isEnabled(field.getSortable())) {
throw new BusinessException("字段不可排序: " + sort.getColumn());
}
});
}
if (request.getWhere() != null && !request.getWhere().isBlank()) {
boolean allowLegacyWhere = "PROJECT_MYSQL".equals(source.getSourceType())
|| "MYSQL".equals(source.getSourceType())
|| "POSTGRESQL".equals(source.getSourceType());
if (!allowLegacyWhere) {
throw new BusinessException("当前数据源仅支持结构化 DSL 查询");
}
}
}
private boolean isEnabled(Integer value) {
return value == null || value == 1;
}
private String trimToNull(String value) {
if (!StringUtils.hasText(value)) {
return null;
}
return value.trim();
}
private List<DatacenterDatasetVersion> listVersions(java.math.BigInteger tableId) {
QueryWrapper wrapper = QueryWrapper.create();
wrapper.eq(DatacenterDatasetVersion::getTableId, tableId);
wrapper.orderBy("version_no desc");
return datasetVersionMapper.selectListByQuery(wrapper);
}
private List<DatacenterDerivedTable> listUpstream(java.math.BigInteger tableId) {
QueryWrapper wrapper = QueryWrapper.create();
wrapper.eq(DatacenterDerivedTable::getDerivedTableId, tableId);
wrapper.orderBy("created desc");
return derivedTableMapper.selectListByQuery(wrapper);
}
private List<DatacenterDerivedTable> listDownstream(java.math.BigInteger tableId) {
QueryWrapper wrapper = QueryWrapper.create();
wrapper.eq(DatacenterDerivedTable::getSourceTableId, tableId);
wrapper.orderBy("created desc");
return derivedTableMapper.selectListByQuery(wrapper);
}
}

View File

@@ -0,0 +1,48 @@
package tech.easyflow.datacenter.execution.service.impl;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.stereotype.Service;
import tech.easyflow.common.entity.LoginAccount;
import tech.easyflow.common.web.exceptions.BusinessException;
import tech.easyflow.datacenter.connector.DatacenterConnector;
import tech.easyflow.datacenter.connector.DatacenterConnectorRegistry;
import tech.easyflow.datacenter.entity.DatacenterTable;
import tech.easyflow.datacenter.execution.model.DatasetRef;
import tech.easyflow.datacenter.meta.entity.DatacenterSource;
import tech.easyflow.datacenter.meta.service.DatacenterDatasetRegistryService;
import tech.easyflow.datacenter.execution.service.DatacenterDatasetWriteService;
import javax.annotation.Resource;
import java.math.BigInteger;
@Service
public class DatacenterDatasetWriteServiceImpl implements DatacenterDatasetWriteService {
@Resource
private DatacenterDatasetRegistryService registryService;
@Resource
private DatacenterConnectorRegistry connectorRegistry;
@Override
public void saveRow(DatasetRef datasetRef, JSONObject data, LoginAccount account) {
DatacenterTable table = resolveTable(datasetRef);
DatacenterSource source = registryService.getSourceRequired(table.getSourceId());
DatacenterConnector connector = connectorRegistry.getConnector(source.getSourceType());
connector.saveRow(source, table, data, account);
}
@Override
public void deleteRow(DatasetRef datasetRef, BigInteger id, LoginAccount account) {
DatacenterTable table = resolveTable(datasetRef);
DatacenterSource source = registryService.getSourceRequired(table.getSourceId());
DatacenterConnector connector = connectorRegistry.getConnector(source.getSourceType());
connector.deleteRow(source, table, id, account);
}
private DatacenterTable resolveTable(DatasetRef datasetRef) {
if (datasetRef == null || datasetRef.getTableId() == null) {
throw new BusinessException("缺少 tableId");
}
return registryService.getTableWithFields(datasetRef.getTableId());
}
}