diff --git a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/DocumentChunkController.java b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/DocumentChunkController.java index 176a4bc..92142b7 100644 --- a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/DocumentChunkController.java +++ b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/DocumentChunkController.java @@ -9,6 +9,7 @@ import tech.easyflow.ai.entity.Model; import tech.easyflow.ai.service.DocumentChunkService; import tech.easyflow.ai.service.DocumentCollectionService; import tech.easyflow.ai.service.ModelService; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.common.annotation.UsePermission; import tech.easyflow.common.domain.Result; import tech.easyflow.common.web.controller.BaseCurdController; @@ -93,22 +94,26 @@ public class DocumentChunkController extends BaseCurdController metadata = new HashMap<>(); + metadata.put("keywords", documentChunk.getMetadataKeyWords()); + metadata.put("questions", documentChunk.getMetadataQuestions()); + document.setMetadataMap(metadata); + StoreResult result = documentStore.update(document, options); // 更新已有记录 + return Result.ok(result); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); } - EmbeddingModel embeddingModel = model.toEmbeddingModel(); - documentStore.setEmbeddingModel(embeddingModel); - StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); - Document document = Document.of(documentChunk.getContent()); - document.setId(documentChunk.getId()); - Map metadata = new HashMap<>(); - metadata.put("keywords", documentChunk.getMetadataKeyWords()); - metadata.put("questions", documentChunk.getMetadataQuestions()); - document.setMetadataMap(metadata); - StoreResult result = documentStore.update(document, options); // 更新已有记录 - return Result.ok(result); } return Result.ok(false); } @@ -135,19 +140,23 @@ public class DocumentChunkController extends BaseCurdController deleteList = new ArrayList<>(); - deleteList.add(chunkId); - documentStore.delete(deleteList, options); - documentChunkService.removeChunk(knowledge, chunkId); + try { + // 设置向量模型 + Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); + if (model == null) { + return Result.fail(4, "知识库没有配置向量模型"); + } + EmbeddingModel embeddingModel = model.toEmbeddingModel(); + documentStore.setEmbeddingModel(embeddingModel); + StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); + List deleteList = new ArrayList<>(); + deleteList.add(chunkId); + documentStore.delete(deleteList, options); + documentChunkService.removeChunk(knowledge, chunkId); - return super.remove(chunkId); + return super.remove(chunkId); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); + } } } diff --git a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/ShareKnowledgeController.java b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/ShareKnowledgeController.java index c447be4..9718b6e 100644 --- a/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/ShareKnowledgeController.java +++ b/easyflow-api/easyflow-api-admin/src/main/java/tech/easyflow/admin/controller/ai/ShareKnowledgeController.java @@ -42,6 +42,7 @@ import tech.easyflow.ai.service.KnowledgeEmbeddingService; import tech.easyflow.ai.service.KnowledgeShareAuditService; import tech.easyflow.ai.service.KnowledgeShareService; import tech.easyflow.ai.service.ModelService; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.ai.vo.FaqImportResultVo; import tech.easyflow.ai.vo.KnowledgeShareAuthContext; import tech.easyflow.ai.vo.KnowledgeShareViewDetail; @@ -520,19 +521,23 @@ public class ShareKnowledgeController { if (documentStore == null) { return Result.fail(2, "知识库没有配置向量库"); } - Model model = modelService.getModelInstance(context.getKnowledge().getVectorEmbedModelId()); - if (model == null) { - return Result.fail(3, "知识库没有配置向量模型"); + try { + Model model = modelService.getModelInstance(context.getKnowledge().getVectorEmbedModelId()); + if (model == null) { + return Result.fail(3, "知识库没有配置向量模型"); + } + EmbeddingModel embeddingModel = model.toEmbeddingModel(); + documentStore.setEmbeddingModel(embeddingModel); + StoreOptions options = StoreOptions.ofCollectionName(context.getKnowledge().getVectorStoreCollection()); + com.easyagents.core.document.Document doc = com.easyagents.core.document.Document.of(documentChunk.getContent()); + doc.setId(documentChunk.getId()); + StoreResult result = documentStore.update(doc, options); + audit(context, "更新分享文档 Chunk", "KNOWLEDGE_SHARE_URL_WRITE", true, + auditDetail("knowledgeId", context.getKnowledge().getId(), "chunkId", documentChunk.getId())); + return Result.ok(result); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); } - EmbeddingModel embeddingModel = model.toEmbeddingModel(); - documentStore.setEmbeddingModel(embeddingModel); - StoreOptions options = StoreOptions.ofCollectionName(context.getKnowledge().getVectorStoreCollection()); - com.easyagents.core.document.Document doc = com.easyagents.core.document.Document.of(documentChunk.getContent()); - doc.setId(documentChunk.getId()); - StoreResult result = documentStore.update(doc, options); - audit(context, "更新分享文档 Chunk", "KNOWLEDGE_SHARE_URL_WRITE", true, - auditDetail("knowledgeId", context.getKnowledge().getId(), "chunkId", documentChunk.getId())); - return Result.ok(result); } return Result.ok(false); } @@ -559,17 +564,21 @@ public class ShareKnowledgeController { if (documentStore == null) { return Result.fail(2, "知识库没有配置向量库"); } - Model model = modelService.getModelInstance(context.getKnowledge().getVectorEmbedModelId()); - if (model == null) { - return Result.fail(3, "知识库没有配置向量模型"); + try { + Model model = modelService.getModelInstance(context.getKnowledge().getVectorEmbedModelId()); + if (model == null) { + return Result.fail(3, "知识库没有配置向量模型"); + } + documentStore.setEmbeddingModel(model.toEmbeddingModel()); + StoreOptions options = StoreOptions.ofCollectionName(context.getKnowledge().getVectorStoreCollection()); + documentStore.delete(Collections.singletonList(chunkId), options); + documentChunkService.removeById(chunkId); + audit(context, "删除分享文档 Chunk", "KNOWLEDGE_SHARE_URL_WRITE", true, + auditDetail("knowledgeId", context.getKnowledge().getId(), "chunkId", chunkId)); + return Result.ok(true); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); } - documentStore.setEmbeddingModel(model.toEmbeddingModel()); - StoreOptions options = StoreOptions.ofCollectionName(context.getKnowledge().getVectorStoreCollection()); - documentStore.delete(Collections.singletonList(chunkId), options); - documentChunkService.removeById(chunkId); - audit(context, "删除分享文档 Chunk", "KNOWLEDGE_SHARE_URL_WRITE", true, - auditDetail("knowledgeId", context.getKnowledge().getId(), "chunkId", chunkId)); - return Result.ok(true); } /** diff --git a/easyflow-api/easyflow-api-public/src/main/java/tech/easyflow/publicapi/controller/PublicKnowledgeShareController.java b/easyflow-api/easyflow-api-public/src/main/java/tech/easyflow/publicapi/controller/PublicKnowledgeShareController.java index 4505ae7..2ebd87b 100644 --- a/easyflow-api/easyflow-api-public/src/main/java/tech/easyflow/publicapi/controller/PublicKnowledgeShareController.java +++ b/easyflow-api/easyflow-api-public/src/main/java/tech/easyflow/publicapi/controller/PublicKnowledgeShareController.java @@ -36,6 +36,7 @@ import tech.easyflow.ai.service.KnowledgeShareAuditService; import tech.easyflow.ai.service.KnowledgeSharePermissionService; import tech.easyflow.ai.service.ModelService; import tech.easyflow.ai.service.impl.KnowledgeSharePermissionServiceImpl; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.ai.vo.FaqImportResultVo; import tech.easyflow.common.domain.Result; import tech.easyflow.common.filestorage.FileStorageService; @@ -342,18 +343,22 @@ public class PublicKnowledgeShareController { if (documentStore == null) { return Result.fail(2, "知识库没有配置向量库"); } - Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); - if (model == null) { - return Result.fail(3, "知识库没有配置向量模型"); + try { + Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); + if (model == null) { + return Result.fail(3, "知识库没有配置向量模型"); + } + EmbeddingModel embeddingModel = model.toEmbeddingModel(); + documentStore.setEmbeddingModel(embeddingModel); + StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); + com.easyagents.core.document.Document doc = com.easyagents.core.document.Document.of(documentChunk.getContent()); + doc.setId(current.getId()); + StoreResult result = documentStore.update(doc, options); + audit(apiKey, "API更新文档 Chunk", "KNOWLEDGE_API_SHARE_WRITE", request.getRequestURI(), Map.of("knowledgeId", knowledgeId, "chunkId", documentChunk.getId())); + return Result.ok(result); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); } - EmbeddingModel embeddingModel = model.toEmbeddingModel(); - documentStore.setEmbeddingModel(embeddingModel); - StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); - com.easyagents.core.document.Document doc = com.easyagents.core.document.Document.of(documentChunk.getContent()); - doc.setId(current.getId()); - StoreResult result = documentStore.update(doc, options); - audit(apiKey, "API更新文档 Chunk", "KNOWLEDGE_API_SHARE_WRITE", request.getRequestURI(), Map.of("knowledgeId", knowledgeId, "chunkId", documentChunk.getId())); - return Result.ok(result); } return Result.ok(false); } @@ -376,16 +381,20 @@ public class PublicKnowledgeShareController { if (documentStore == null) { return Result.fail(2, "知识库没有配置向量库"); } - Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); - if (model == null) { - return Result.fail(3, "知识库没有配置向量模型"); + try { + Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); + if (model == null) { + return Result.fail(3, "知识库没有配置向量模型"); + } + documentStore.setEmbeddingModel(model.toEmbeddingModel()); + StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); + documentStore.delete(Collections.singletonList(chunkId), options); + documentChunkService.removeById(chunkId); + audit(apiKey, "API删除文档 Chunk", "KNOWLEDGE_API_SHARE_WRITE", request.getRequestURI(), Map.of("knowledgeId", knowledgeId, "chunkId", chunkId)); + return Result.ok(true); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); } - documentStore.setEmbeddingModel(model.toEmbeddingModel()); - StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); - documentStore.delete(Collections.singletonList(chunkId), options); - documentChunkService.removeById(chunkId); - audit(apiKey, "API删除文档 Chunk", "KNOWLEDGE_API_SHARE_WRITE", request.getRequestURI(), Map.of("knowledgeId", knowledgeId, "chunkId", chunkId)); - return Result.ok(true); } /** diff --git a/easyflow-commons/easyflow-common-analytical-db/pom.xml b/easyflow-commons/easyflow-common-analytical-db/pom.xml index 88e0325..7773fb1 100644 --- a/easyflow-commons/easyflow-common-analytical-db/pom.xml +++ b/easyflow-commons/easyflow-common-analytical-db/pom.xml @@ -25,6 +25,11 @@ org.springframework.boot spring-boot-autoconfigure + + org.springframework.boot + spring-boot-actuator + ${spring-boot.version} + com.clickhouse clickhouse-jdbc diff --git a/easyflow-commons/easyflow-common-analytical-db/src/main/java/tech/easyflow/common/analyticaldb/support/AnalyticalDBHealthIndicator.java b/easyflow-commons/easyflow-common-analytical-db/src/main/java/tech/easyflow/common/analyticaldb/support/AnalyticalDBHealthIndicator.java new file mode 100644 index 0000000..b5ad9cd --- /dev/null +++ b/easyflow-commons/easyflow-common-analytical-db/src/main/java/tech/easyflow/common/analyticaldb/support/AnalyticalDBHealthIndicator.java @@ -0,0 +1,41 @@ +package tech.easyflow.common.analyticaldb.support; + +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.stereotype.Component; + +/** + * 分析数据库健康检查。 + */ +@Component("analyticalDbHealthIndicator") +public class AnalyticalDBHealthIndicator implements HealthIndicator { + + private final AnalyticalDBHealthSupport healthSupport; + + /** + * 创建分析数据库健康检查器。 + * + * @param healthSupport 分析数据库健康检查支持 + */ + public AnalyticalDBHealthIndicator(AnalyticalDBHealthSupport healthSupport) { + this.healthSupport = healthSupport; + } + + /** + * 检查分析数据库是否可用。 + * + * @return 健康状态 + */ + @Override + public Health health() { + if (!healthSupport.enabled()) { + return Health.up().withDetail("enabled", false).build(); + } + try { + healthSupport.selfCheck(); + return Health.up().withDetail("enabled", true).build(); + } catch (Exception e) { + return Health.down(e).withDetail("enabled", true).build(); + } + } +} diff --git a/easyflow-commons/easyflow-common-audio/src/main/java/tech/easyflow/common/audio/config/AudioThreadPoolProperties.java b/easyflow-commons/easyflow-common-audio/src/main/java/tech/easyflow/common/audio/config/AudioThreadPoolProperties.java new file mode 100644 index 0000000..2a44770 --- /dev/null +++ b/easyflow-commons/easyflow-common-audio/src/main/java/tech/easyflow/common/audio/config/AudioThreadPoolProperties.java @@ -0,0 +1,30 @@ +package tech.easyflow.common.audio.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * 音频模块线程池配置。 + */ +@ConfigurationProperties(prefix = "easyflow.thread-pool.scheduler") +public class AudioThreadPoolProperties { + + private int poolSize = 4; + + /** + * 获取调度线程池大小。 + * + * @return 调度线程池大小 + */ + public int getPoolSize() { + return poolSize; + } + + /** + * 设置调度线程池大小。 + * + * @param poolSize 调度线程池大小 + */ + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } +} diff --git a/easyflow-commons/easyflow-common-audio/src/main/java/tech/easyflow/common/audio/socket/SchedulingConfig.java b/easyflow-commons/easyflow-common-audio/src/main/java/tech/easyflow/common/audio/socket/SchedulingConfig.java index b03bd00..8aa55bb 100644 --- a/easyflow-commons/easyflow-common-audio/src/main/java/tech/easyflow/common/audio/socket/SchedulingConfig.java +++ b/easyflow-commons/easyflow-common-audio/src/main/java/tech/easyflow/common/audio/socket/SchedulingConfig.java @@ -1,19 +1,38 @@ package tech.easyflow.common.audio.socket; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import tech.easyflow.common.audio.config.AudioThreadPoolProperties; @Configuration @EnableScheduling +@EnableConfigurationProperties(AudioThreadPoolProperties.class) public class SchedulingConfig { + private final AudioThreadPoolProperties properties; + + /** + * 创建音频调度配置。 + * + * @param properties 音频调度线程池配置 + */ + public SchedulingConfig(AudioThreadPoolProperties properties) { + this.properties = properties; + } + + /** + * 创建调度线程池。 + * + * @return 调度线程池 + */ @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - scheduler.setPoolSize(10); + scheduler.setPoolSize(properties.getPoolSize()); scheduler.setThreadNamePrefix("scheduled-task-"); scheduler.setDaemon(true); scheduler.initialize(); diff --git a/easyflow-commons/easyflow-common-mq/pom.xml b/easyflow-commons/easyflow-common-mq/pom.xml index 710be55..c6f0de6 100644 --- a/easyflow-commons/easyflow-common-mq/pom.xml +++ b/easyflow-commons/easyflow-common-mq/pom.xml @@ -22,5 +22,10 @@ jackson-databind ${jackson.version} + + org.apache.commons + commons-pool2 + 2.11.1 + diff --git a/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQConfiguration.java b/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQConfiguration.java index 92ab304..921af5d 100644 --- a/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQConfiguration.java +++ b/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQConfiguration.java @@ -9,7 +9,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.StringRedisTemplate; import tech.easyflow.common.mq.core.MQConsumerContainer; import tech.easyflow.common.mq.core.MQConsumerHandler; @@ -24,6 +26,10 @@ import tech.easyflow.common.mq.redis.RedisMQProducer; import tech.easyflow.common.mq.redis.RedisStreamKeySupport; import tech.easyflow.common.mq.support.MQHealthSupport; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import io.lettuce.core.api.StatefulConnection; + import java.util.List; @Configuration @@ -43,11 +49,27 @@ public class MQConfiguration { if (redisProperties.getPassword() != null) { configuration.setPassword(RedisPassword.of(redisProperties.getPassword())); } - LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration); + LettuceClientConfiguration clientConfiguration = createClientConfiguration(redisProperties, mqProperties); + LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, clientConfiguration); connectionFactory.afterPropertiesSet(); return new MQRedisResources(connectionFactory, new StringRedisTemplate(connectionFactory)); } + private LettuceClientConfiguration createClientConfiguration(RedisProperties redisProperties, + MQProperties mqProperties) { + MQProperties.Redis.Pool pool = mqProperties.getRedis().getPool(); + GenericObjectPoolConfig> poolConfig = new GenericObjectPoolConfig<>(); + poolConfig.setMaxTotal(pool.getMaxActive()); + poolConfig.setMaxIdle(pool.getMaxIdle()); + poolConfig.setMinIdle(pool.getMinIdle()); + LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder = + LettucePoolingClientConfiguration.builder().poolConfig(poolConfig); + if (redisProperties.getTimeout() != null) { + builder.commandTimeout(redisProperties.getTimeout()); + } + return builder.build(); + } + @Bean(name = "mqRedisConnectionFactory", autowireCandidate = false, defaultCandidate = false) @ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true) public LettuceConnectionFactory mqRedisConnectionFactory(MQRedisResources mqRedisResources) { diff --git a/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQProperties.java b/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQProperties.java index 3bebbf8..33cf0c7 100644 --- a/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQProperties.java +++ b/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/config/MQProperties.java @@ -40,6 +40,8 @@ public class MQProperties { private Duration consumerBlockTimeout = Duration.ofMillis(2000); private Duration pendingClaimIdle = Duration.ofMillis(60000); private int maxRetry = 16; + private ConsumerExecutor consumerExecutor = new ConsumerExecutor(); + private Pool pool = new Pool(); public int getDatabase() { return database; @@ -96,5 +98,98 @@ public class MQProperties { public void setMaxRetry(int maxRetry) { this.maxRetry = maxRetry; } + + public ConsumerExecutor getConsumerExecutor() { + return consumerExecutor; + } + + public void setConsumerExecutor(ConsumerExecutor consumerExecutor) { + this.consumerExecutor = consumerExecutor; + } + + public Pool getPool() { + return pool; + } + + public void setPool(Pool pool) { + this.pool = pool; + } + + /** + * Redis MQ 消费线程池配置。 + */ + public static class ConsumerExecutor { + + private int coreSize = 4; + private int maxSize = 12; + private int queueCapacity = 64; + private int keepAliveSeconds = 60; + + public int getCoreSize() { + return coreSize; + } + + public void setCoreSize(int coreSize) { + this.coreSize = coreSize; + } + + public int getMaxSize() { + return maxSize; + } + + public void setMaxSize(int maxSize) { + this.maxSize = maxSize; + } + + public int getQueueCapacity() { + return queueCapacity; + } + + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + public int getKeepAliveSeconds() { + return keepAliveSeconds; + } + + public void setKeepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; + } + } + + /** + * Redis MQ 连接池配置。 + */ + public static class Pool { + + private int maxActive = 12; + private int maxIdle = 8; + private int minIdle = 1; + + public int getMaxActive() { + return maxActive; + } + + public void setMaxActive(int maxActive) { + this.maxActive = maxActive; + } + + public int getMaxIdle() { + return maxIdle; + } + + public void setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + } + + public int getMinIdle() { + return minIdle; + } + + public void setMinIdle(int minIdle) { + this.minIdle = minIdle; + } + } } } diff --git a/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/redis/RedisMQConsumerContainer.java b/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/redis/RedisMQConsumerContainer.java index 2e28ed2..d5beb8b 100644 --- a/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/redis/RedisMQConsumerContainer.java +++ b/easyflow-commons/easyflow-common-mq/src/main/java/tech/easyflow/common/mq/redis/RedisMQConsumerContainer.java @@ -30,9 +30,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifecycle { @@ -45,7 +47,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec private final MQDeadLetterService deadLetterService; private final RedisStreamKeySupport keySupport; private final List handlers; - private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final ExecutorService executorService; private volatile boolean running; @@ -63,6 +65,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec this.deadLetterService = deadLetterService; this.keySupport = keySupport; this.handlers = handlers; + this.executorService = createExecutor(properties, handlers); } @Override @@ -77,7 +80,12 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec int currentShard = shard; LOG.info("启动 MQ 消费线程: topic={}, group={}, shard={}, handler={}", subscription.getTopic(), subscription.getConsumerGroup(), currentShard, handler.getClass().getSimpleName()); - executorService.submit(() -> consumeLoop(handler, subscription, currentShard)); + try { + executorService.submit(() -> consumeLoop(handler, subscription, currentShard)); + } catch (RuntimeException e) { + running = false; + throw e; + } } } } @@ -108,6 +116,42 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec stop(); } + private ExecutorService createExecutor(MQProperties properties, List handlers) { + MQProperties.Redis.ConsumerExecutor config = properties.getRedis().getConsumerExecutor(); + int consumerTaskCount = handlers.stream() + .map(MQConsumerHandler::subscription) + .filter(Objects::nonNull) + .mapToInt(subscription -> Math.max(subscription.getShardCount(), 1)) + .sum(); + if (config.getCoreSize() > config.getMaxSize()) { + throw new IllegalStateException("Redis MQ 消费线程池配置错误:core-size 不能大于 max-size"); + } + if (consumerTaskCount > config.getMaxSize()) { + throw new IllegalStateException("Redis MQ 消费线程池配置错误:max-size=" + + config.getMaxSize() + " 小于消费循环数 " + consumerTaskCount + + ",请调大 easyflow.mq.redis.consumer-executor.max-size"); + } + int coreSize = Math.max(config.getCoreSize(), consumerTaskCount); + int maxSize = config.getMaxSize(); + AtomicInteger threadIndex = new AtomicInteger(1); + ThreadPoolExecutor executor = new ThreadPoolExecutor( + coreSize, + maxSize, + config.getKeepAliveSeconds(), + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(config.getQueueCapacity()), + task -> { + Thread thread = new Thread(task); + thread.setName("redis-mq-consumer-" + threadIndex.getAndIncrement()); + thread.setDaemon(false); + return thread; + }, + new ThreadPoolExecutor.AbortPolicy() + ); + executor.allowCoreThreadTimeOut(true); + return executor; + } + private void consumeLoop(MQConsumerHandler handler, MQSubscription subscription, int shard) { String streamKey = keySupport.streamKey(subscription.getTopic(), shard); String consumerName = subscription.getConsumerGroup() + "-" + shard; diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentModuleConfig.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentModuleConfig.java index a913a3e..0e5a3dc 100644 --- a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentModuleConfig.java +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentModuleConfig.java @@ -3,12 +3,14 @@ package tech.easyflow.agent.config; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; /** * Agent 模块自动配置。 */ @AutoConfiguration @MapperScan("tech.easyflow.agent.mapper") +@ComponentScan("tech.easyflow.agent") @EnableConfigurationProperties(AgentRuntimeProperties.class) public class AgentModuleConfig { } diff --git a/easyflow-modules/easyflow-module-ai/pom.xml b/easyflow-modules/easyflow-module-ai/pom.xml index daa868a..d8e63f0 100644 --- a/easyflow-modules/easyflow-module-ai/pom.xml +++ b/easyflow-modules/easyflow-module-ai/pom.xml @@ -115,6 +115,11 @@ tech.easyflow easyflow-common-mq + + org.springframework.boot + spring-boot-actuator + ${spring-boot.version} + com.easyagents diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/AiModuleConfig.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/AiModuleConfig.java index 2644e7f..fb38aa9 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/AiModuleConfig.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/AiModuleConfig.java @@ -2,8 +2,16 @@ package tech.easyflow.ai.config; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; +import tech.easyflow.ai.documentimport.task.DocumentImportParseMonitorProperties; @MapperScan("tech.easyflow.ai.mapper") +@ComponentScan("tech.easyflow.ai") +@EnableConfigurationProperties({ + DocumentImportParseMonitorProperties.class, + RagHealthProperties.class +}) @AutoConfiguration public class AiModuleConfig { diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/CachedHealthIndicatorSupport.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/CachedHealthIndicatorSupport.java new file mode 100644 index 0000000..fcdcf6c --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/CachedHealthIndicatorSupport.java @@ -0,0 +1,84 @@ +package tech.easyflow.ai.config; + +import org.springframework.boot.actuate.health.Health; + +import java.time.Clock; +import java.time.Duration; + +/** + * 健康检查短缓存支持。 + */ +public abstract class CachedHealthIndicatorSupport { + + private final RagHealthProperties properties; + private final Clock clock; + private volatile CacheEntry cacheEntry; + + /** + * 创建健康检查缓存支持。 + * + * @param properties RAG 健康检查配置 + */ + protected CachedHealthIndicatorSupport(RagHealthProperties properties) { + this(properties, Clock.systemUTC()); + } + + /** + * 创建健康检查缓存支持。 + * + * @param properties RAG 健康检查配置 + * @param clock 时钟 + */ + protected CachedHealthIndicatorSupport(RagHealthProperties properties, Clock clock) { + this.properties = properties; + this.clock = clock; + } + + /** + * 执行带短缓存的健康检查。 + * + * @return 健康状态 + */ + protected Health cachedHealth() { + long now = clock.millis(); + CacheEntry current = cacheEntry; + if (current != null && current.expireAtMillis > now) { + return current.health; + } + synchronized (this) { + current = cacheEntry; + if (current != null && current.expireAtMillis > now) { + return current.health; + } + Health health = doHealthCheck(); + cacheEntry = new CacheEntry(health, now + cacheTtlMillis()); + return health; + } + } + + /** + * 执行实际健康检查。 + * + * @return 健康状态 + */ + protected abstract Health doHealthCheck(); + + private long cacheTtlMillis() { + Duration cacheTtl = properties.getCacheTtl(); + if (cacheTtl == null || cacheTtl.isZero() || cacheTtl.isNegative()) { + return 0L; + } + return cacheTtl.toMillis(); + } + + private static class CacheEntry { + + private final Health health; + private final long expireAtMillis; + + private CacheEntry(Health health, long expireAtMillis) { + this.health = health; + this.expireAtMillis = expireAtMillis; + } + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/EasyFlowThreadPoolProperties.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/EasyFlowThreadPoolProperties.java new file mode 100644 index 0000000..cc699a4 --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/EasyFlowThreadPoolProperties.java @@ -0,0 +1,124 @@ +package tech.easyflow.ai.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * EasyFlow 业务线程池配置。 + */ +@ConfigurationProperties(prefix = "easyflow.thread-pool") +public class EasyFlowThreadPoolProperties { + + private Pool sse = new Pool(4, 16, 2000, 30, true); + private Pool documentImport = new Pool(2, 4, 200, 60, true); + + /** + * 获取 SSE 线程池配置。 + * + * @return SSE 线程池配置 + */ + public Pool getSse() { + return sse; + } + + /** + * 设置 SSE 线程池配置。 + * + * @param sse SSE 线程池配置 + */ + public void setSse(Pool sse) { + this.sse = sse; + } + + /** + * 获取文档导入线程池配置。 + * + * @return 文档导入线程池配置 + */ + public Pool getDocumentImport() { + return documentImport; + } + + /** + * 设置文档导入线程池配置。 + * + * @param documentImport 文档导入线程池配置 + */ + public void setDocumentImport(Pool documentImport) { + this.documentImport = documentImport; + } + + /** + * 线程池配置项。 + */ + public static class Pool { + + private int coreSize; + private int maxSize; + private int queueCapacity; + private int keepAliveSeconds; + private boolean allowCoreThreadTimeout; + + /** + * 创建默认线程池配置。 + */ + public Pool() { + } + + /** + * 创建线程池配置。 + * + * @param coreSize 核心线程数 + * @param maxSize 最大线程数 + * @param queueCapacity 队列容量 + * @param keepAliveSeconds 空闲线程存活时间 + * @param allowCoreThreadTimeout 是否允许核心线程超时 + */ + public Pool(int coreSize, int maxSize, int queueCapacity, int keepAliveSeconds, boolean allowCoreThreadTimeout) { + this.coreSize = coreSize; + this.maxSize = maxSize; + this.queueCapacity = queueCapacity; + this.keepAliveSeconds = keepAliveSeconds; + this.allowCoreThreadTimeout = allowCoreThreadTimeout; + } + + public int getCoreSize() { + return coreSize; + } + + public void setCoreSize(int coreSize) { + this.coreSize = coreSize; + } + + public int getMaxSize() { + return maxSize; + } + + public void setMaxSize(int maxSize) { + this.maxSize = maxSize; + } + + public int getQueueCapacity() { + return queueCapacity; + } + + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + public int getKeepAliveSeconds() { + return keepAliveSeconds; + } + + public void setKeepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; + } + + public boolean isAllowCoreThreadTimeout() { + return allowCoreThreadTimeout; + } + + public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) { + this.allowCoreThreadTimeout = allowCoreThreadTimeout; + } + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagHealthIndicator.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagHealthIndicator.java new file mode 100644 index 0000000..2bbd356 --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagHealthIndicator.java @@ -0,0 +1,138 @@ +package tech.easyflow.ai.config; + +import com.easyagents.engine.es.ElasticSearcher; +import com.easyagents.search.engine.service.DocumentSearcher; +import com.easyagents.store.milvus.MilvusVectorStore; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.stereotype.Component; +import tech.easyflow.ai.rag.KeywordEngineType; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; +import tech.easyflow.common.util.SpringContextUtil; +import tech.easyflow.common.util.StringUtil; + +import java.io.File; + +/** + * RAG 依赖中间件健康检查。 + */ +public class RagHealthIndicator { + + /** + * Milvus 健康检查。 + */ + @Component("ragMilvusHealthIndicator") + public static class RagMilvusHealthIndicator extends CachedHealthIndicatorSupport implements HealthIndicator { + + private final AiMilvusConfig aiMilvusConfig; + + /** + * 创建 Milvus 健康检查器。 + * + * @param aiMilvusConfig Milvus 配置 + * @param healthProperties RAG 健康检查配置 + */ + public RagMilvusHealthIndicator(AiMilvusConfig aiMilvusConfig, RagHealthProperties healthProperties) { + super(healthProperties); + this.aiMilvusConfig = aiMilvusConfig; + } + + /** + * 检查 Milvus 是否可连接。 + * + * @return 健康状态 + */ + @Override + public Health health() { + return cachedHealth(); + } + + @Override + protected Health doHealthCheck() { + MilvusVectorStore vectorStore = null; + try { + vectorStore = new MilvusVectorStore( + aiMilvusConfig.copyForCollection("__rag_health_probe__") + ); + if (vectorStore.checkAvailable()) { + return Health.up().withDetail("uri", aiMilvusConfig.getUri()).build(); + } + return Health.down().withDetail("uri", aiMilvusConfig.getUri()).build(); + } catch (Exception e) { + return Health.down(e).withDetail("uri", aiMilvusConfig.getUri()).build(); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(vectorStore); + } + } + } + + /** + * 关键词检索健康检查。 + */ + @Component("ragKeywordSearchHealthIndicator") + public static class RagKeywordSearchHealthIndicator extends CachedHealthIndicatorSupport implements HealthIndicator { + + private final SearcherFactory searcherFactory; + private final AiLuceneConfig aiLuceneConfig; + + /** + * 创建关键词检索健康检查器。 + * + * @param searcherFactory 检索器工厂 + * @param aiLuceneConfig Lucene 配置 + * @param healthProperties RAG 健康检查配置 + */ + public RagKeywordSearchHealthIndicator(SearcherFactory searcherFactory, + AiLuceneConfig aiLuceneConfig, + RagHealthProperties healthProperties) { + super(healthProperties); + this.searcherFactory = searcherFactory; + this.aiLuceneConfig = aiLuceneConfig; + } + + /** + * 检查当前关键词检索引擎是否可用。 + * + * @return 健康状态 + */ + @Override + public Health health() { + return cachedHealth(); + } + + @Override + protected Health doHealthCheck() { + KeywordEngineType engineType = KeywordEngineType.from(SpringContextUtil.getProperty("rag.engine", "ES")); + if (engineType == KeywordEngineType.LUCENE) { + return checkLuceneDirectory(engineType); + } + DocumentSearcher searcher = searcherFactory.getSearcher(); + if (searcher instanceof ElasticSearcher elasticSearcher && elasticSearcher.checkAvailable()) { + return Health.up().withDetail("engine", engineType.name()).build(); + } + return Health.down().withDetail("engine", engineType.name()).build(); + } + + private Health checkLuceneDirectory(KeywordEngineType engineType) { + String indexDirPath = aiLuceneConfig.getIndexDirPath(); + if (StringUtil.noText(indexDirPath)) { + return Health.down() + .withDetail("engine", engineType.name()) + .withDetail("reason", "Lucene 索引目录未配置") + .build(); + } + File indexDir = new File(indexDirPath); + if (indexDir.exists() && indexDir.isDirectory() && indexDir.canRead() && indexDir.canWrite()) { + return Health.up() + .withDetail("engine", engineType.name()) + .withDetail("indexDir", indexDirPath) + .build(); + } + return Health.down() + .withDetail("engine", engineType.name()) + .withDetail("indexDir", indexDirPath) + .withDetail("reason", "Lucene 索引目录不可读写") + .build(); + } + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagHealthProperties.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagHealthProperties.java new file mode 100644 index 0000000..4a392df --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagHealthProperties.java @@ -0,0 +1,32 @@ +package tech.easyflow.ai.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.time.Duration; + +/** + * RAG 健康检查配置。 + */ +@ConfigurationProperties(prefix = "easyflow.ai.rag.health") +public class RagHealthProperties { + + private Duration cacheTtl = Duration.ofSeconds(5); + + /** + * 获取健康检查结果缓存时间。 + * + * @return 缓存时间 + */ + public Duration getCacheTtl() { + return cacheTtl; + } + + /** + * 设置健康检查结果缓存时间。 + * + * @param cacheTtl 缓存时间 + */ + public void setCacheTtl(Duration cacheTtl) { + this.cacheTtl = cacheTtl == null || cacheTtl.isNegative() ? Duration.ofSeconds(5) : cacheTtl; + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagInfrastructureValidator.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagInfrastructureValidator.java index d149d95..c914cc2 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagInfrastructureValidator.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/RagInfrastructureValidator.java @@ -1,8 +1,5 @@ package tech.easyflow.ai.config; -import com.easyagents.engine.es.ElasticSearcher; -import com.easyagents.search.engine.service.DocumentSearcher; -import com.easyagents.store.milvus.MilvusVectorStore; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.stereotype.Component; import tech.easyflow.ai.rag.KeywordEngineType; @@ -16,9 +13,6 @@ import java.io.File; @Component public class RagInfrastructureValidator implements SmartInitializingSingleton { - private static final int STARTUP_CHECK_RETRY_TIMES = 10; - private static final long STARTUP_CHECK_RETRY_INTERVAL_MS = 1000L; - @Resource private AiMilvusConfig aiMilvusConfig; @@ -26,31 +20,21 @@ public class RagInfrastructureValidator implements SmartInitializingSingleton { private AiLuceneConfig aiLuceneConfig; @Resource - private SearcherFactory searcherFactory; + private AiEsConfig aiEsConfig; + /** + * 校验 RAG 基础配置。 + */ @Override public void afterSingletonsInstantiated() { - validateMilvus(); + validateMilvusConfig(); validateKeywordSearcher(); } - private void validateMilvus() { - Exception lastException = null; - for (int i = 0; i < STARTUP_CHECK_RETRY_TIMES; i++) { - try { - MilvusVectorStore vectorStore = new MilvusVectorStore(aiMilvusConfig.copyForCollection("__rag_boot_probe__")); - if (vectorStore.checkAvailable()) { - return; - } - } catch (Exception e) { - lastException = e; - } - sleepBeforeRetry(); + private void validateMilvusConfig() { + if (StringUtil.noText(aiMilvusConfig.getUri())) { + throw new BusinessException("Milvus uri 未配置,请检查 rag.milvus.uri"); } - if (lastException != null) { - throw new BusinessException("Milvus 服务不可用,项目启动失败,请检查 rag.milvus 配置与服务状态: " + lastException.getMessage()); - } - throw new BusinessException("Milvus 服务不可用,项目启动失败,请检查 rag.milvus 配置与服务状态"); } private void validateKeywordSearcher() { @@ -61,21 +45,12 @@ public class RagInfrastructureValidator implements SmartInitializingSingleton { validateLuceneDirectory(); return; } - - DocumentSearcher searcher = searcherFactory.getSearcher(); - if (!(searcher instanceof ElasticSearcher) || !checkElasticAvailable((ElasticSearcher) searcher)) { - throw new BusinessException("ES 服务不可用,项目启动失败,请检查 rag.engine 与 rag.searcher.elastic 配置"); + if (StringUtil.noText(aiEsConfig.getHost())) { + throw new BusinessException("ES 地址未配置,请检查 rag.searcher.elastic.host"); } - } - - private boolean checkElasticAvailable(ElasticSearcher elasticSearcher) { - for (int i = 0; i < STARTUP_CHECK_RETRY_TIMES; i++) { - if (elasticSearcher.checkAvailable()) { - return true; - } - sleepBeforeRetry(); + if (StringUtil.noText(aiEsConfig.getIndexName())) { + throw new BusinessException("ES 索引未配置,请检查 rag.searcher.elastic.indexName"); } - return false; } private void validateLuceneDirectory() { @@ -92,12 +67,4 @@ public class RagInfrastructureValidator implements SmartInitializingSingleton { } } - private void sleepBeforeRetry() { - try { - Thread.sleep(STARTUP_CHECK_RETRY_INTERVAL_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new BusinessException("中间件启动校验被中断"); - } - } } diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java index 5e12bc2..daed5af 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java @@ -2,15 +2,28 @@ package tech.easyflow.ai.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import tech.easyflow.common.web.exceptions.BusinessException; @Configuration +@EnableConfigurationProperties(EasyFlowThreadPoolProperties.class) public class ThreadPoolConfig { private static final Logger log = LoggerFactory.getLogger(ThreadPoolConfig.class); + private final EasyFlowThreadPoolProperties properties; + + /** + * 创建线程池配置。 + * + * @param properties 线程池配置属性 + */ + public ThreadPoolConfig(EasyFlowThreadPoolProperties properties) { + this.properties = properties; + } + /** * 创建 SSE 消息发送线程池。 * @@ -19,11 +32,12 @@ public class ThreadPoolConfig { @Bean(name = "sseThreadPool") public ThreadPoolTaskExecutor sseThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - int cpuCoreNum = Runtime.getRuntime().availableProcessors(); // 获取CPU核心数(4核返回4) - executor.setCorePoolSize(cpuCoreNum * 2); // 核心线程数 - executor.setMaxPoolSize(cpuCoreNum * 10); // 最大线程数(峰值时扩容,避免线程过多导致上下文切换) - executor.setQueueCapacity(8000); // 任务队列容量 - executor.setKeepAliveSeconds(30); // 空闲线程存活时间:30秒(非核心线程空闲后销毁,节省资源) + EasyFlowThreadPoolProperties.Pool pool = properties.getSse(); + executor.setCorePoolSize(pool.getCoreSize()); + executor.setMaxPoolSize(pool.getMaxSize()); + executor.setQueueCapacity(pool.getQueueCapacity()); + executor.setKeepAliveSeconds(pool.getKeepAliveSeconds()); + executor.setAllowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); executor.setThreadNamePrefix("sse-sender-"); // 拒绝策略 @@ -47,11 +61,12 @@ public class ThreadPoolConfig { @Bean(name = "documentImportTaskExecutor") public ThreadPoolTaskExecutor documentImportTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - int cpuCoreNum = Runtime.getRuntime().availableProcessors(); - executor.setCorePoolSize(Math.max(2, cpuCoreNum)); - executor.setMaxPoolSize(Math.max(4, cpuCoreNum * 2)); - executor.setQueueCapacity(200); - executor.setKeepAliveSeconds(60); + EasyFlowThreadPoolProperties.Pool pool = properties.getDocumentImport(); + executor.setCorePoolSize(pool.getCoreSize()); + executor.setMaxPoolSize(pool.getMaxSize()); + executor.setQueueCapacity(pool.getQueueCapacity()); + executor.setKeepAliveSeconds(pool.getKeepAliveSeconds()); + executor.setAllowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); executor.setThreadNamePrefix("document-import-"); executor.setRejectedExecutionHandler((runnable, executorService) -> { log.error("文档导入线程池过载!核心线程数:{},最大线程数:{},队列任务数:{}", diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportParseMonitor.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportParseMonitor.java index 956443a..ab062cb 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportParseMonitor.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportParseMonitor.java @@ -24,8 +24,8 @@ public class DocumentImportParseMonitor { * 定时收敛运行中的桥接解析任务状态。 */ @Scheduled( - fixedDelayString = "${easyflow.ai.document-import.parse-monitor.fixed-delay:3000}", - initialDelayString = "${easyflow.ai.document-import.parse-monitor.initial-delay:5000}" + fixedDelayString = "${easyflow.ai.document-import.parse-monitor.fixed-delay:10000}", + initialDelayString = "${easyflow.ai.document-import.parse-monitor.initial-delay:10000}" ) public void reconcileRunningParseTasks() { appService.monitorRunningParseTasks(); diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportParseMonitorProperties.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportParseMonitorProperties.java new file mode 100644 index 0000000..f61bf49 --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportParseMonitorProperties.java @@ -0,0 +1,30 @@ +package tech.easyflow.ai.documentimport.task; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * 文档解析任务监控配置。 + */ +@ConfigurationProperties(prefix = "easyflow.ai.document-import.parse-monitor") +public class DocumentImportParseMonitorProperties { + + private int batchSize = 10; + + /** + * 获取单次监控批量。 + * + * @return 单次监控批量 + */ + public int getBatchSize() { + return batchSize; + } + + /** + * 设置单次监控批量。 + * + * @param batchSize 单次监控批量 + */ + public void setBatchSize(int batchSize) { + this.batchSize = batchSize <= 0 ? 10 : batchSize; + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportTaskStatusStreamService.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportTaskStatusStreamService.java index dec2542..a54b242 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportTaskStatusStreamService.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/DocumentImportTaskStatusStreamService.java @@ -5,13 +5,17 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.web.context.request.async.AsyncRequestNotUsableException; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import tech.easyflow.ai.documentimport.DocumentImportKeys; import tech.easyflow.ai.entity.Document; import tech.easyflow.ai.mapper.DocumentMapper; import tech.easyflow.common.web.exceptions.BusinessException; import javax.annotation.Resource; +import java.io.IOException; import java.math.BigInteger; import java.time.Duration; import java.util.LinkedHashMap; @@ -28,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; @Service public class DocumentImportTaskStatusStreamService { + private static final Logger LOG = LoggerFactory.getLogger(DocumentImportTaskStatusStreamService.class); private static final long SSE_TIMEOUT_MS = Duration.ofMinutes(30).toMillis(); private final Map> knowledgeEmitters = new ConcurrentHashMap>(); @@ -134,6 +139,9 @@ public class DocumentImportTaskStatusStreamService { private void sendAsync(String topicKey, SseEmitter emitter, String eventName, Map payload) { sseThreadPool.execute(() -> { + if (!isEmitterRegistered(topicKey, emitter)) { + return; + } try { emitter.send( SseEmitter.event() @@ -142,14 +150,29 @@ public class DocumentImportTaskStatusStreamService { ); } catch (Exception e) { removeEmitter(topicKey, emitter); - try { - emitter.completeWithError(e); - } catch (Exception ignored) { + if (isClientDisconnected(e)) { + LOG.debug("文档导入状态流客户端已断开: topicKey={}, eventName={}, message={}", + topicKey, eventName, e.getMessage()); + return; } + LOG.warn("文档导入状态流推送失败: topicKey={}, eventName={}", topicKey, eventName, e); + completeQuietly(emitter); } }); } + /** + * 判断指定 SSE 连接是否仍注册在主题下,避免已清理连接继续被异步任务写入。 + * + * @param topicKey 主题键 + * @param emitter SSE 连接 + * @return 是否仍处于注册状态 + */ + private boolean isEmitterRegistered(String topicKey, SseEmitter emitter) { + Set emitters = knowledgeEmitters.get(topicKey); + return emitters != null && emitters.contains(emitter); + } + private void removeEmitter(String topicKey, SseEmitter emitter) { Set emitters = knowledgeEmitters.get(topicKey); if (emitters == null) { @@ -161,6 +184,46 @@ public class DocumentImportTaskStatusStreamService { } } + /** + * 判断异常是否由客户端断开 SSE 连接导致。 + * + * @param throwable 异常 + * @return 是否为客户端断连 + */ + private boolean isClientDisconnected(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof AsyncRequestNotUsableException || current instanceof IOException) { + return true; + } + String message = current.getMessage(); + if (message != null) { + String lowerMessage = message.toLowerCase(); + if (lowerMessage.contains("broken pipe") + || lowerMessage.contains("connection reset") + || lowerMessage.contains("response not usable") + || lowerMessage.contains("client abort")) { + return true; + } + } + current = current.getCause(); + } + return false; + } + + /** + * 安静关闭 SSE 连接。 + * + * @param emitter SSE 连接 + */ + private void completeQuietly(SseEmitter emitter) { + try { + emitter.complete(); + } catch (Exception e) { + LOG.debug("关闭文档导入状态流失败: message={}", e.getMessage()); + } + } + private String toTopicKey(BigInteger knowledgeId) { return String.valueOf(knowledgeId); } diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/KnowledgeDocumentImportTaskAppService.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/KnowledgeDocumentImportTaskAppService.java index 478305a..e963a43 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/KnowledgeDocumentImportTaskAppService.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/documentimport/task/KnowledgeDocumentImportTaskAppService.java @@ -56,6 +56,7 @@ import tech.easyflow.ai.service.DocumentChunkService; import tech.easyflow.ai.service.DocumentCollectionService; import tech.easyflow.ai.service.DocumentImportTaskService; import tech.easyflow.ai.service.ModelService; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.common.domain.Result; import tech.easyflow.common.filestorage.FileStorageService; import tech.easyflow.common.util.FileUtil; @@ -92,7 +93,6 @@ import java.util.regex.Pattern; public class KnowledgeDocumentImportTaskAppService { private static final Logger LOG = LoggerFactory.getLogger(KnowledgeDocumentImportTaskAppService.class); - private static final int PARSE_MONITOR_BATCH_SIZE = 20; private static final int INDEX_BATCH_SIZE = 20; private static final String SOURCE_RANGES_KEY = "sourceRanges"; private static final String KNOWLEDGE_PARSE_IMAGE_CATEGORY = "knowledge-parse"; @@ -122,6 +122,9 @@ public class KnowledgeDocumentImportTaskAppService { @Resource private DocumentImportTaskService documentImportTaskService; + @Resource + private DocumentImportParseMonitorProperties parseMonitorProperties; + @Resource private DocumentImportPreviewService documentImportPreviewService; @@ -403,7 +406,7 @@ public class KnowledgeDocumentImportTaskAppService { .eq(DocumentImportTask::getPhase, DocumentImportTaskPhase.PARSE.name()) .eq(DocumentImportTask::getStatus, DocumentImportTaskStatus.RUNNING.name()) .orderBy(DocumentImportTask::getModified, true) - .limit(PARSE_MONITOR_BATCH_SIZE); + .limit(parseMonitorProperties.getBatchSize()); List runningTasks = documentImportTaskService.list(queryWrapper); if (runningTasks == null || runningTasks.isEmpty()) { return; @@ -516,6 +519,8 @@ public class KnowledgeDocumentImportTaskAppService { rollbackStoredChunks(taskId, document.getId(), storeContext, storedChunks); } markIndexFailed(task, document, truncateError(e.getMessage())); + } finally { + closeStoreContext(storeContext); } } @@ -2123,26 +2128,31 @@ public class KnowledgeDocumentImportTaskAppService { if (documentStore == null) { throw new BusinessException("向量数据库配置错误"); } - Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); - if (model == null) { - throw new BusinessException("该知识库未配置向量模型"); - } - EmbeddingModel embeddingModel = model.toEmbeddingModel(); - documentStore.setEmbeddingModel(embeddingModel); + try { + Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); + if (model == null) { + throw new BusinessException("该知识库未配置向量模型"); + } + EmbeddingModel embeddingModel = model.toEmbeddingModel(); + documentStore.setEmbeddingModel(embeddingModel); - StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); - EmbeddingOptions embeddingOptions = new EmbeddingOptions(); - embeddingOptions.setModel(model.getModelName()); - embeddingOptions.setDimensions(knowledge.getDimensionOfVectorModel()); - options.setEmbeddingOptions(embeddingOptions); - options.setIndexName(options.getCollectionName()); - return new StoreExecutionContext( - knowledge, - embeddingModel, - documentStore, - options, - searcherFactory.getSearcher() - ); + StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); + EmbeddingOptions embeddingOptions = new EmbeddingOptions(); + embeddingOptions.setModel(model.getModelName()); + embeddingOptions.setDimensions(knowledge.getDimensionOfVectorModel()); + options.setEmbeddingOptions(embeddingOptions); + options.setIndexName(options.getCollectionName()); + return new StoreExecutionContext( + knowledge, + embeddingModel, + documentStore, + options, + searcherFactory.getSearcher() + ); + } catch (RuntimeException e) { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); + throw e; + } } private void storeDocumentChunks(StoreExecutionContext storeContext, List documentChunks) { @@ -2221,6 +2231,13 @@ public class KnowledgeDocumentImportTaskAppService { } } + private void closeStoreContext(StoreExecutionContext storeContext) { + if (storeContext == null) { + return; + } + DocumentStoreLifecycleSupport.closeQuietly(storeContext.documentStore); + } + private void clearPersistedChunks(BigInteger documentId) { if (documentId == null) { return; diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentCollectionServiceImpl.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentCollectionServiceImpl.java index f0694d9..a9a32f6 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentCollectionServiceImpl.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentCollectionServiceImpl.java @@ -31,6 +31,7 @@ import tech.easyflow.ai.mapper.FaqItemMapper; import tech.easyflow.ai.rag.KnowledgeRetrievalRequest; import tech.easyflow.ai.service.DocumentCollectionService; import tech.easyflow.ai.service.ModelService; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.ai.utils.CustomBeanUtils; import tech.easyflow.ai.utils.RegexUtils; import tech.easyflow.common.util.StringUtil; @@ -283,34 +284,38 @@ public class DocumentCollectionServiceImpl extends ServiceImpl documents = documentStore.search(wrapper, options); - List result = documents == null ? Collections.emptyList() : documents; - LOG.info( - "Knowledge vector search completed, knowledgeId={}, collectionName={}, query={}, limit={}, minSimilarity={}, hitCount={}, hits={}", - documentCollection.getId(), - documentCollection.getVectorStoreCollection(), - keyword, - docRecallMaxNum, - minSimilarity, - result.size(), - summarizeDocuments(result) - ); - return result; + StoreOptions options = StoreOptions.ofCollectionName(documentCollection.getVectorStoreCollection()); + options.setIndexName(documentCollection.getVectorStoreCollection()); + List documents = documentStore.search(wrapper, options); + List result = documents == null ? Collections.emptyList() : documents; + LOG.info( + "Knowledge vector search completed, knowledgeId={}, collectionName={}, query={}, limit={}, minSimilarity={}, hitCount={}, hits={}", + documentCollection.getId(), + documentCollection.getVectorStoreCollection(), + keyword, + docRecallMaxNum, + minSimilarity, + result.size(), + summarizeDocuments(result) + ); + return result; + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); + } } private List searchKeywordDocuments(DocumentCollection documentCollection, String keyword, int docRecallMaxNum) { diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentServiceImpl.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentServiceImpl.java index a759f7e..14ea261 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentServiceImpl.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/DocumentServiceImpl.java @@ -43,6 +43,7 @@ import tech.easyflow.ai.service.DocumentChunkService; import tech.easyflow.ai.service.DocumentCollectionService; import tech.easyflow.ai.service.DocumentService; import tech.easyflow.ai.service.ModelService; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.common.ai.rag.ExcelDocumentSplitter; import tech.easyflow.common.domain.Result; import tech.easyflow.common.filestorage.FileStorageService; @@ -154,34 +155,38 @@ public class DocumentServiceImpl extends ServiceImpl i return false; } - Model model = modelService.getById(knowledge.getVectorEmbedModelId()); - if (model == null) { - return false; + try { + Model model = modelService.getById(knowledge.getVectorEmbedModelId()); + if (model == null) { + return false; + } + // 设置向量模型 + StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); + EmbeddingOptions embeddingOptions = new EmbeddingOptions(); + embeddingOptions.setModel(model.getModelName()); + options.setEmbeddingOptions(embeddingOptions); + options.setCollectionName(knowledge.getVectorStoreCollection()); + // 查询文本分割表tb_document_chunk中对应的有哪些数据,找出来删除 + QueryWrapper queryWrapper = QueryWrapper.create() + .select(DOCUMENT_CHUNK.ID).eq(DocumentChunk::getDocumentId, id); + List chunkIds = documentChunkMapper.selectListByQueryAs(queryWrapper, BigInteger.class); + documentStore.delete(chunkIds, options); + // 删除搜索引擎中的数据 + DocumentSearcher searcher = searcherFactory.getSearcher(); + if (searcher != null) { + chunkIds.forEach(searcher::deleteDocument); + } + int ck = documentChunkMapper.deleteByQuery(QueryWrapper.create().eq(DocumentChunk::getDocumentId, id)); + if (ck < 0) { + return false; + } + // 再删除指定路径下的文件 + Document document = documentMapper.selectOneByQuery(queryWrapperDocument); + storageService.delete(document.getDocumentPath()); + return true; + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); } - // 设置向量模型 - StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); - EmbeddingOptions embeddingOptions = new EmbeddingOptions(); - embeddingOptions.setModel(model.getModelName()); - options.setEmbeddingOptions(embeddingOptions); - options.setCollectionName(knowledge.getVectorStoreCollection()); - // 查询文本分割表tb_document_chunk中对应的有哪些数据,找出来删除 - QueryWrapper queryWrapper = QueryWrapper.create() - .select(DOCUMENT_CHUNK.ID).eq(DocumentChunk::getDocumentId, id); - List chunkIds = documentChunkMapper.selectListByQueryAs(queryWrapper, BigInteger.class); - documentStore.delete(chunkIds, options); - // 删除搜索引擎中的数据 - DocumentSearcher searcher = searcherFactory.getSearcher(); - if (searcher != null) { - chunkIds.forEach(searcher::deleteDocument); - } - int ck = documentChunkMapper.deleteByQuery(QueryWrapper.create().eq(DocumentChunk::getDocumentId, id)); - if (ck < 0) { - return false; - } - // 再删除指定路径下的文件 - Document document = documentMapper.selectOneByQuery(queryWrapperDocument); - storageService.delete(document.getDocumentPath()); - return true; } @@ -286,8 +291,8 @@ public class DocumentServiceImpl extends ServiceImpl i } StoreExecutionContext storeContext = prepareStoreContext(document); - storeDocumentChunks(storeContext, validChunks); try { + storeDocumentChunks(storeContext, validChunks); persistDocumentWithChunks(document, validChunks); updateKnowledgeAfterStore(storeContext); return Result.ok(); @@ -296,14 +301,20 @@ public class DocumentServiceImpl extends ServiceImpl i rollbackStoredChunks(storeContext, validChunks); Log.error("保存文档失败: documentId={}, title={}", document.getId(), document.getTitle(), e); throw new BusinessException("保存失败:" + e.getMessage()); + } finally { + closeStoreContext(storeContext); } } protected Boolean storeDocument(Document entity, List documentChunks) { StoreExecutionContext storeContext = prepareStoreContext(entity); - storeDocumentChunks(storeContext, documentChunks); - updateKnowledgeAfterStore(storeContext); - return true; + try { + storeDocumentChunks(storeContext, documentChunks); + updateKnowledgeAfterStore(storeContext); + return true; + } finally { + closeStoreContext(storeContext); + } } @Override @@ -430,14 +441,16 @@ public class DocumentServiceImpl extends ServiceImpl i } StoreExecutionContext storeContext = prepareStoreContext(document); - storeDocumentChunks(storeContext, session.getDocumentChunks()); try { + storeDocumentChunks(storeContext, session.getDocumentChunks()); persistDocumentWithChunks(document, session.getDocumentChunks()); updateKnowledgeAfterStore(storeContext); } catch (Exception e) { cleanupPersistedDocument(document); rollbackStoredChunks(storeContext, session.getDocumentChunks()); throw new BusinessException("提交导入失败:" + e.getMessage()); + } finally { + closeStoreContext(storeContext); } } @@ -751,24 +764,28 @@ public class DocumentServiceImpl extends ServiceImpl i if (documentStore == null) { throw new BusinessException("向量数据库配置错误"); } + try { + Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); + if (model == null) { + throw new BusinessException("该知识库未配置大模型"); + } + EmbeddingModel embeddingModel = model.toEmbeddingModel(); + documentStore.setEmbeddingModel(embeddingModel); - Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); - if (model == null) { - throw new BusinessException("该知识库未配置大模型"); + StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); + EmbeddingOptions embeddingOptions = new EmbeddingOptions(); + embeddingOptions.setModel(model.getModelName()); + embeddingOptions.setDimensions(knowledge.getDimensionOfVectorModel()); + options.setEmbeddingOptions(embeddingOptions); + options.setIndexName(options.getCollectionName()); + + DocumentSearcher searcher = null; + searcher = searcherFactory.getSearcher(); + return new StoreExecutionContext(knowledge, model, embeddingModel, documentStore, options, searcher); + } catch (RuntimeException e) { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); + throw e; } - EmbeddingModel embeddingModel = model.toEmbeddingModel(); - documentStore.setEmbeddingModel(embeddingModel); - - StoreOptions options = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); - EmbeddingOptions embeddingOptions = new EmbeddingOptions(); - embeddingOptions.setModel(model.getModelName()); - embeddingOptions.setDimensions(knowledge.getDimensionOfVectorModel()); - options.setEmbeddingOptions(embeddingOptions); - options.setIndexName(options.getCollectionName()); - - DocumentSearcher searcher = null; - searcher = searcherFactory.getSearcher(); - return new StoreExecutionContext(knowledge, model, embeddingModel, documentStore, options, searcher); } private void storeDocumentChunks(StoreExecutionContext storeContext, List documentChunks) { @@ -841,6 +858,13 @@ public class DocumentServiceImpl extends ServiceImpl i } } + private void closeStoreContext(StoreExecutionContext storeContext) { + if (storeContext == null) { + return; + } + DocumentStoreLifecycleSupport.closeQuietly(storeContext.documentStore); + } + private void persistDocumentWithChunks(Document document, List chunks) { this.getMapper().insert(document); AtomicInteger sort = new AtomicInteger(1); diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/FaqItemServiceImpl.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/FaqItemServiceImpl.java index fe73546..339f0be 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/FaqItemServiceImpl.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/FaqItemServiceImpl.java @@ -40,6 +40,7 @@ import tech.easyflow.ai.service.DocumentCollectionService; import tech.easyflow.ai.service.FaqCategoryService; import tech.easyflow.ai.service.FaqItemService; import tech.easyflow.ai.service.ModelService; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.ai.vo.FaqImportErrorRowVo; import tech.easyflow.ai.vo.FaqImportResultVo; import tech.easyflow.common.util.StringUtil; @@ -348,29 +349,37 @@ public class FaqItemServiceImpl extends ServiceImpl impl private void storeToVector(DocumentCollection collection, FaqItem entity, boolean isUpdate) { PreparedStore preparedStore = prepareStore(collection); - com.easyagents.core.document.Document doc = toSearchDocument(entity); - StoreResult result = isUpdate - ? preparedStore.documentStore.update(doc, preparedStore.storeOptions) - : preparedStore.documentStore.store(Collections.singletonList(doc), preparedStore.storeOptions); - if (result == null || !result.isSuccess()) { - throw new BusinessException("FAQ向量化失败"); - } - - DocumentSearcher searcher = searcherFactory.getSearcher(); - if (searcher != null) { - if (isUpdate) { - searcher.deleteDocument(entity.getId()); + try { + com.easyagents.core.document.Document doc = toSearchDocument(entity); + StoreResult result = isUpdate + ? preparedStore.documentStore.update(doc, preparedStore.storeOptions) + : preparedStore.documentStore.store(Collections.singletonList(doc), preparedStore.storeOptions); + if (result == null || !result.isSuccess()) { + throw new BusinessException("FAQ向量化失败"); } - searcher.addDocument(doc); + + DocumentSearcher searcher = searcherFactory.getSearcher(); + if (searcher != null) { + if (isUpdate) { + searcher.deleteDocument(entity.getId()); + } + searcher.addDocument(doc); + } + markCollectionEmbedded(collection, preparedStore.embeddingModel); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(preparedStore.documentStore); } - markCollectionEmbedded(collection, preparedStore.embeddingModel); } private void removeFromVector(DocumentCollection collection, FaqItem entity) { PreparedStore preparedStore = prepareStore(collection); - boolean deleteSuccess = deleteFromVectorStore(preparedStore.documentStore, preparedStore.storeOptions, entity.getId()); - if (!deleteSuccess) { - throw new BusinessException("FAQ向量删除失败"); + try { + boolean deleteSuccess = deleteFromVectorStore(preparedStore.documentStore, preparedStore.storeOptions, entity.getId()); + if (!deleteSuccess) { + throw new BusinessException("FAQ向量删除失败"); + } + } finally { + DocumentStoreLifecycleSupport.closeQuietly(preparedStore.documentStore); } DocumentSearcher searcher = searcherFactory.getSearcher(); @@ -413,20 +422,25 @@ public class FaqItemServiceImpl extends ServiceImpl impl if (documentStore == null) { throw new BusinessException("向量数据库配置错误"); } - Model model = modelService.getModelInstance(collection.getVectorEmbedModelId()); - if (model == null) { - throw new BusinessException("该知识库未配置向量模型"); - } - EmbeddingModel embeddingModel = model.toEmbeddingModel(); - documentStore.setEmbeddingModel(embeddingModel); + try { + Model model = modelService.getModelInstance(collection.getVectorEmbedModelId()); + if (model == null) { + throw new BusinessException("该知识库未配置向量模型"); + } + EmbeddingModel embeddingModel = model.toEmbeddingModel(); + documentStore.setEmbeddingModel(embeddingModel); - StoreOptions options = StoreOptions.ofCollectionName(collection.getVectorStoreCollection()); - EmbeddingOptions embeddingOptions = new EmbeddingOptions(); - embeddingOptions.setModel(model.getModelName()); - embeddingOptions.setDimensions(collection.getDimensionOfVectorModel()); - options.setEmbeddingOptions(embeddingOptions); - options.setIndexName(options.getCollectionName()); - return new PreparedStore(documentStore, options, embeddingModel); + StoreOptions options = StoreOptions.ofCollectionName(collection.getVectorStoreCollection()); + EmbeddingOptions embeddingOptions = new EmbeddingOptions(); + embeddingOptions.setModel(model.getModelName()); + embeddingOptions.setDimensions(collection.getDimensionOfVectorModel()); + options.setEmbeddingOptions(embeddingOptions); + options.setIndexName(options.getCollectionName()); + return new PreparedStore(documentStore, options, embeddingModel); + } catch (RuntimeException e) { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); + throw e; + } } private com.easyagents.core.document.Document toSearchDocument(FaqItem entity) { diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/KnowledgeEmbeddingServiceImpl.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/KnowledgeEmbeddingServiceImpl.java index 1eaf661..8c9ef0f 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/KnowledgeEmbeddingServiceImpl.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/service/impl/KnowledgeEmbeddingServiceImpl.java @@ -15,6 +15,7 @@ import tech.easyflow.ai.service.DocumentCollectionService; import tech.easyflow.ai.service.FaqItemService; import tech.easyflow.ai.service.KnowledgeEmbeddingService; import tech.easyflow.ai.service.ModelService; +import tech.easyflow.ai.support.DocumentStoreLifecycleSupport; import tech.easyflow.common.web.exceptions.BusinessException; import javax.annotation.Resource; @@ -50,20 +51,24 @@ public class KnowledgeEmbeddingServiceImpl implements KnowledgeEmbeddingService if (documentStore == null) { throw new BusinessException("知识库没有配置向量库"); } - Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); - if (model == null) { - throw new BusinessException("知识库没有配置向量模型"); - } - EmbeddingModel embeddingModel = model.toEmbeddingModel(); - documentStore.setEmbeddingModel(embeddingModel); - StoreOptions storeOptions = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); - storeOptions.setIndexName(knowledge.getVectorStoreCollection()); + try { + Model model = modelService.getModelInstance(knowledge.getVectorEmbedModelId()); + if (model == null) { + throw new BusinessException("知识库没有配置向量模型"); + } + EmbeddingModel embeddingModel = model.toEmbeddingModel(); + documentStore.setEmbeddingModel(embeddingModel); + StoreOptions storeOptions = StoreOptions.ofCollectionName(knowledge.getVectorStoreCollection()); + storeOptions.setIndexName(knowledge.getVectorStoreCollection()); - if (knowledge.isFaqCollection()) { - rebuildFaqVectors(knowledge, documentStore, storeOptions, embeddingModel); - return; + if (knowledge.isFaqCollection()) { + rebuildFaqVectors(knowledge, documentStore, storeOptions, embeddingModel); + return; + } + rebuildDocumentVectors(knowledge, documentStore, storeOptions, embeddingModel); + } finally { + DocumentStoreLifecycleSupport.closeQuietly(documentStore); } - rebuildDocumentVectors(knowledge, documentStore, storeOptions, embeddingModel); } private void rebuildDocumentVectors( @@ -153,4 +158,3 @@ public class KnowledgeEmbeddingServiceImpl implements KnowledgeEmbeddingService documentCollectionService.updateById(update); } } - diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/support/DocumentStoreLifecycleSupport.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/support/DocumentStoreLifecycleSupport.java new file mode 100644 index 0000000..bce8d1a --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/support/DocumentStoreLifecycleSupport.java @@ -0,0 +1,32 @@ +package tech.easyflow.ai.support; + +import com.easyagents.core.store.DocumentStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 文档向量库生命周期辅助工具。 + */ +public final class DocumentStoreLifecycleSupport { + + private static final Logger LOG = LoggerFactory.getLogger(DocumentStoreLifecycleSupport.class); + + private DocumentStoreLifecycleSupport() { + } + + /** + * 关闭支持关闭语义的文档向量库。 + * + * @param documentStore 文档向量库实例 + */ + public static void closeQuietly(DocumentStore documentStore) { + if (!(documentStore instanceof AutoCloseable)) { + return; + } + try { + ((AutoCloseable) documentStore).close(); + } catch (Exception e) { + LOG.warn("关闭文档向量库连接失败: store={}", documentStore.getClass().getSimpleName(), e); + } + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/config/CachedHealthIndicatorSupportTest.java b/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/config/CachedHealthIndicatorSupportTest.java new file mode 100644 index 0000000..7461070 --- /dev/null +++ b/easyflow-modules/easyflow-module-ai/src/test/java/tech/easyflow/ai/config/CachedHealthIndicatorSupportTest.java @@ -0,0 +1,93 @@ +package tech.easyflow.ai.config; + +import org.junit.Assert; +import org.junit.Test; +import org.springframework.boot.actuate.health.Health; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 健康检查短缓存测试。 + */ +public class CachedHealthIndicatorSupportTest { + + /** + * 验证 TTL 内重复健康检查复用缓存。 + */ + @Test + public void shouldReuseHealthWithinCacheTtl() { + RagHealthProperties properties = new RagHealthProperties(); + properties.setCacheTtl(Duration.ofSeconds(5)); + MutableClock clock = new MutableClock(); + CountingHealthIndicator indicator = new CountingHealthIndicator(properties, clock); + + indicator.cachedHealth(); + indicator.cachedHealth(); + + Assert.assertEquals(1, indicator.count()); + } + + /** + * 验证 TTL 过期后重新执行健康检查。 + */ + @Test + public void shouldRefreshHealthAfterCacheExpired() { + RagHealthProperties properties = new RagHealthProperties(); + properties.setCacheTtl(Duration.ofSeconds(5)); + MutableClock clock = new MutableClock(); + CountingHealthIndicator indicator = new CountingHealthIndicator(properties, clock); + + indicator.cachedHealth(); + clock.plus(Duration.ofSeconds(6)); + indicator.cachedHealth(); + + Assert.assertEquals(2, indicator.count()); + } + + private static class CountingHealthIndicator extends CachedHealthIndicatorSupport { + + private final AtomicInteger counter = new AtomicInteger(); + + private CountingHealthIndicator(RagHealthProperties properties, Clock clock) { + super(properties, clock); + } + + @Override + protected Health doHealthCheck() { + counter.incrementAndGet(); + return Health.up().build(); + } + + private int count() { + return counter.get(); + } + } + + private static class MutableClock extends Clock { + + private Instant instant = Instant.parse("2026-05-25T00:00:00Z"); + + @Override + public ZoneId getZone() { + return ZoneId.of("UTC"); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + + @Override + public Instant instant() { + return instant; + } + + private void plus(Duration duration) { + instant = instant.plus(duration); + } + } +} diff --git a/easyflow-modules/easyflow-module-approval/src/main/java/tech/easyflow/approval/config/ApprovalModuleConfig.java b/easyflow-modules/easyflow-module-approval/src/main/java/tech/easyflow/approval/config/ApprovalModuleConfig.java index c32eb84..819e201 100644 --- a/easyflow-modules/easyflow-module-approval/src/main/java/tech/easyflow/approval/config/ApprovalModuleConfig.java +++ b/easyflow-modules/easyflow-module-approval/src/main/java/tech/easyflow/approval/config/ApprovalModuleConfig.java @@ -2,11 +2,13 @@ package tech.easyflow.approval.config; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.ComponentScan; /** * 审批模块配置。 */ @MapperScan("tech.easyflow.approval.mapper") +@ComponentScan("tech.easyflow.approval") @AutoConfiguration public class ApprovalModuleConfig { } diff --git a/easyflow-modules/easyflow-module-auth/src/main/java/tech/easyflow/auth/config/AuthModuleConfig.java b/easyflow-modules/easyflow-module-auth/src/main/java/tech/easyflow/auth/config/AuthModuleConfig.java index 8b0fcb8..3cf9aea 100644 --- a/easyflow-modules/easyflow-module-auth/src/main/java/tech/easyflow/auth/config/AuthModuleConfig.java +++ b/easyflow-modules/easyflow-module-auth/src/main/java/tech/easyflow/auth/config/AuthModuleConfig.java @@ -1,8 +1,10 @@ package tech.easyflow.auth.config; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.ComponentScan; @AutoConfiguration +@ComponentScan("tech.easyflow.auth") public class AuthModuleConfig { public AuthModuleConfig() { diff --git a/easyflow-modules/easyflow-module-autoconfig/src/main/java/tech/easyflow/autoconfig/config/AutoConfig.java b/easyflow-modules/easyflow-module-autoconfig/src/main/java/tech/easyflow/autoconfig/config/AutoConfig.java index 4396f5a..4f46ae1 100644 --- a/easyflow-modules/easyflow-module-autoconfig/src/main/java/tech/easyflow/autoconfig/config/AutoConfig.java +++ b/easyflow-modules/easyflow-module-autoconfig/src/main/java/tech/easyflow/autoconfig/config/AutoConfig.java @@ -4,7 +4,14 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration(proxyBeanMethods = false) -@ComponentScan({"tech.easyflow"}) +@ComponentScan({ + "tech.easyflow.admin", + "tech.easyflow.usercenter", + "tech.easyflow.publicapi", + "tech.easyflow.common", + "tech.easyflow.core", + "tech.easyflow.autoconfig" +}) @org.springframework.boot.autoconfigure.AutoConfiguration public class AutoConfig { public AutoConfig() { diff --git a/easyflow-modules/easyflow-module-autoconfig/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/easyflow-modules/easyflow-module-autoconfig/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 2e27626..2191017 100644 --- a/easyflow-modules/easyflow-module-autoconfig/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/easyflow-modules/easyflow-module-autoconfig/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,10 @@ -tech.easyflow.autoconfig.config.AutoConfig \ No newline at end of file +tech.easyflow.autoconfig.config.AutoConfig +tech.easyflow.ai.config.AiModuleConfig +tech.easyflow.agent.config.AgentModuleConfig +tech.easyflow.approval.config.ApprovalModuleConfig +tech.easyflow.auth.config.AuthModuleConfig +tech.easyflow.chatlog.config.ChatlogModuleConfig +tech.easyflow.datacenter.config.DatacenterModuleConfig +tech.easyflow.job.config.JobModuleConfig +tech.easyflow.log.config.LogModuleConfig +tech.easyflow.system.config.SysModuleConfig diff --git a/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/config/ChatlogModuleConfig.java b/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/config/ChatlogModuleConfig.java index 87059d6..108cd52 100644 --- a/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/config/ChatlogModuleConfig.java +++ b/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/config/ChatlogModuleConfig.java @@ -2,8 +2,10 @@ package tech.easyflow.chatlog.config; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.ComponentScan; @AutoConfiguration @MapperScan("tech.easyflow.chatlog.mapper") +@ComponentScan("tech.easyflow.chatlog") public class ChatlogModuleConfig { } diff --git a/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/ChatSyncService.java b/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/ChatSyncService.java index d834e66..cc53ea2 100644 --- a/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/ChatSyncService.java +++ b/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/ChatSyncService.java @@ -12,5 +12,8 @@ public interface ChatSyncService { void maintainMysqlTables(); + /** + * 执行启动期必要的 MySQL 表准备。 + */ void startupCheck(); } diff --git a/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/impl/ChatSyncServiceImpl.java b/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/impl/ChatSyncServiceImpl.java index 7956def..c2dccac 100644 --- a/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/impl/ChatSyncServiceImpl.java +++ b/easyflow-modules/easyflow-module-chatlog/src/main/java/tech/easyflow/chatlog/service/impl/ChatSyncServiceImpl.java @@ -173,9 +173,6 @@ public class ChatSyncServiceImpl implements ChatSyncService { @Override public void startupCheck() { tableManager.ensureCurrentAndNextMonth(); - if (analyticalDBRepository.enabled()) { - analyticalDBRepository.selfCheck(); - } } private void clearExpiredSessions() { diff --git a/easyflow-modules/easyflow-module-datacenter/src/main/java/tech/easyflow/datacenter/config/DatacenterModuleConfig.java b/easyflow-modules/easyflow-module-datacenter/src/main/java/tech/easyflow/datacenter/config/DatacenterModuleConfig.java index b91df92..01b8048 100644 --- a/easyflow-modules/easyflow-module-datacenter/src/main/java/tech/easyflow/datacenter/config/DatacenterModuleConfig.java +++ b/easyflow-modules/easyflow-module-datacenter/src/main/java/tech/easyflow/datacenter/config/DatacenterModuleConfig.java @@ -1,10 +1,12 @@ package tech.easyflow.datacenter.config; import org.mybatis.spring.annotation.MapperScan; -import org.springframework.context.annotation.Configuration; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.ComponentScan; -@Configuration +@AutoConfiguration @MapperScan("tech.easyflow.datacenter.mapper") +@ComponentScan("tech.easyflow.datacenter") public class DatacenterModuleConfig { public DatacenterModuleConfig() { diff --git a/easyflow-modules/easyflow-module-job/src/main/java/tech/easyflow/job/config/JobModuleConfig.java b/easyflow-modules/easyflow-module-job/src/main/java/tech/easyflow/job/config/JobModuleConfig.java index c7ac9c0..653a95a 100644 --- a/easyflow-modules/easyflow-module-job/src/main/java/tech/easyflow/job/config/JobModuleConfig.java +++ b/easyflow-modules/easyflow-module-job/src/main/java/tech/easyflow/job/config/JobModuleConfig.java @@ -1,10 +1,12 @@ package tech.easyflow.job.config; import org.mybatis.spring.annotation.MapperScan; -import org.springframework.context.annotation.Configuration; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.ComponentScan; -@Configuration +@AutoConfiguration @MapperScan("tech.easyflow.job.mapper") +@ComponentScan("tech.easyflow.job") public class JobModuleConfig { public JobModuleConfig() { diff --git a/easyflow-modules/easyflow-module-log/src/main/java/tech/easyflow/log/config/LogModuleConfig.java b/easyflow-modules/easyflow-module-log/src/main/java/tech/easyflow/log/config/LogModuleConfig.java index 91330d7..3ee2a22 100644 --- a/easyflow-modules/easyflow-module-log/src/main/java/tech/easyflow/log/config/LogModuleConfig.java +++ b/easyflow-modules/easyflow-module-log/src/main/java/tech/easyflow/log/config/LogModuleConfig.java @@ -1,14 +1,16 @@ package tech.easyflow.log.config; import org.mybatis.spring.annotation.MapperScan; -import org.springframework.context.annotation.Configuration; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.ComponentScan; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import tech.easyflow.log.reporter.ActionLogReporterProperties; import tech.easyflow.log.reporter.ActionReportInterceptor; @MapperScan("tech.easyflow.log.mapper") -@Configuration +@AutoConfiguration +@ComponentScan("tech.easyflow.log") public class LogModuleConfig implements WebMvcConfigurer { private final ActionLogReporterProperties logProperties; diff --git a/easyflow-modules/easyflow-module-system/src/main/java/tech/easyflow/system/config/SysModuleConfig.java b/easyflow-modules/easyflow-module-system/src/main/java/tech/easyflow/system/config/SysModuleConfig.java index bef4afa..1fc5ab1 100644 --- a/easyflow-modules/easyflow-module-system/src/main/java/tech/easyflow/system/config/SysModuleConfig.java +++ b/easyflow-modules/easyflow-module-system/src/main/java/tech/easyflow/system/config/SysModuleConfig.java @@ -2,8 +2,10 @@ package tech.easyflow.system.config; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.ComponentScan; @MapperScan("tech.easyflow.system.mapper") +@ComponentScan("tech.easyflow.system") @AutoConfiguration public class SysModuleConfig { } diff --git a/easyflow-starter/easyflow-starter-all/src/main/resources/application-prod.yml b/easyflow-starter/easyflow-starter-all/src/main/resources/application-prod.yml index 5483297..c6128c6 100644 --- a/easyflow-starter/easyflow-starter-all/src/main/resources/application-prod.yml +++ b/easyflow-starter/easyflow-starter-all/src/main/resources/application-prod.yml @@ -6,6 +6,13 @@ spring: url: jdbc:mysql://127.0.0.1:23306/easyflow?useInformationSchema=true&characterEncoding=utf-8 username: easyflow password: root + hikari: + maximum-pool-size: 20 + minimum-idle: 4 + connection-timeout: 5000 + validation-timeout: 3000 + idle-timeout: 600000 + max-lifetime: 1800000 data: redis: host: 127.0.0.1 @@ -37,6 +44,15 @@ easyflow: consumer-block-timeout: 2000ms pending-claim-idle: 60000ms max-retry: 16 + consumer-executor: + core-size: 4 + max-size: 12 + queue-capacity: 64 + keep-alive-seconds: 60 + pool: + max-active: 12 + max-idle: 8 + min-idle: 1 analytical-db: enabled: true url: ${EASYFLOW_ANALYTICAL_DB_URL:jdbc:clickhouse://127.0.0.1:8123/easyflow?jdbc_ignore_unsupported_values=true&socket_timeout=30000&compress=false&ssl=false} @@ -58,3 +74,27 @@ easyflow: validate-on-migrate: true storage: type: xFileStorage + ai: + rag: + health: + cache-ttl: 5s + document-import: + parse-monitor: + fixed-delay: 10000 + initial-delay: 10000 + batch-size: 10 + thread-pool: + sse: + core-size: 4 + max-size: 16 + queue-capacity: 2000 + keep-alive-seconds: 30 + allow-core-thread-timeout: true + document-import: + core-size: 2 + max-size: 4 + queue-capacity: 200 + keep-alive-seconds: 60 + allow-core-thread-timeout: true + scheduler: + pool-size: 4 diff --git a/easyflow-starter/easyflow-starter-all/src/main/resources/application.yml b/easyflow-starter/easyflow-starter-all/src/main/resources/application.yml index 069e419..8e2982b 100644 --- a/easyflow-starter/easyflow-starter-all/src/main/resources/application.yml +++ b/easyflow-starter/easyflow-starter-all/src/main/resources/application.yml @@ -21,6 +21,13 @@ spring: url: jdbc:mysql://127.0.0.1:33306/easyflow?useInformationSchema=true&characterEncoding=utf-8 username: root password: root + hikari: + maximum-pool-size: 12 + minimum-idle: 2 + connection-timeout: 5000 + validation-timeout: 3000 + idle-timeout: 600000 + max-lifetime: 1800000 flyway: enabled: true locations: classpath:db/migration/mysql @@ -69,7 +76,7 @@ spring: tablePrefix: TB_QRTZ_ driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate threadPool: - threadCount: 20 + threadCount: 8 threadPriority: 5 threads: virtual: @@ -104,6 +111,15 @@ easyflow: consumer-block-timeout: 2000ms pending-claim-idle: 60000ms max-retry: 16 + consumer-executor: + core-size: 4 + max-size: 12 + queue-capacity: 64 + keep-alive-seconds: 60 + pool: + max-active: 12 + max-idle: 8 + min-idle: 1 analytical-db: # 是否启用分析数据库 enabled: true @@ -148,6 +164,30 @@ easyflow: root: /Users/slience/postgraduate/easyflow/attachment # 后端接口地址,用于拼接完整 url prefix: http://localhost:8111/attachment + ai: + rag: + health: + cache-ttl: 5s + document-import: + parse-monitor: + fixed-delay: 10000 + initial-delay: 10000 + batch-size: 10 + thread-pool: + sse: + core-size: 4 + max-size: 16 + queue-capacity: 2000 + keep-alive-seconds: 30 + allow-core-thread-timeout: true + document-import: + core-size: 2 + max-size: 4 + queue-capacity: 200 + keep-alive-seconds: 60 + allow-core-thread-timeout: true + scheduler: + pool-size: 4 # xFileStorage存储文件配置 # 文档:https://x-file-storage.xuyanwu.cn/ @@ -211,9 +251,9 @@ jetcache: valueEncoder: java valueDecoder: java poolConfig: - minIdle: 5 - maxIdle: 20 - maxTotal: 50 + minIdle: 1 + maxIdle: 12 + maxTotal: 32 host: ${spring.data.redis.host} port: ${spring.data.redis.port} password: ${spring.data.redis.password}