perf: 收敛后端资源与健康检查开销
- 缩小模块扫描范围并显式注册各业务模块自动配置 - 增加可配置线程池、MQ 连接池与消费线程池,降低默认资源占用 - 将 RAG 与分析库中间件探活下沉到健康检查并增加短缓存 - 补齐文档向量库生命周期释放与 SSE 断连清理
This commit is contained in:
@@ -9,7 +9,9 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisPassword;
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import tech.easyflow.common.mq.core.MQConsumerContainer;
|
||||
import tech.easyflow.common.mq.core.MQConsumerHandler;
|
||||
@@ -24,6 +26,10 @@ import tech.easyflow.common.mq.redis.RedisMQProducer;
|
||||
import tech.easyflow.common.mq.redis.RedisStreamKeySupport;
|
||||
import tech.easyflow.common.mq.support.MQHealthSupport;
|
||||
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
|
||||
import io.lettuce.core.api.StatefulConnection;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Configuration
|
||||
@@ -43,11 +49,27 @@ public class MQConfiguration {
|
||||
if (redisProperties.getPassword() != null) {
|
||||
configuration.setPassword(RedisPassword.of(redisProperties.getPassword()));
|
||||
}
|
||||
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration);
|
||||
LettuceClientConfiguration clientConfiguration = createClientConfiguration(redisProperties, mqProperties);
|
||||
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, clientConfiguration);
|
||||
connectionFactory.afterPropertiesSet();
|
||||
return new MQRedisResources(connectionFactory, new StringRedisTemplate(connectionFactory));
|
||||
}
|
||||
|
||||
private LettuceClientConfiguration createClientConfiguration(RedisProperties redisProperties,
|
||||
MQProperties mqProperties) {
|
||||
MQProperties.Redis.Pool pool = mqProperties.getRedis().getPool();
|
||||
GenericObjectPoolConfig<StatefulConnection<?, ?>> poolConfig = new GenericObjectPoolConfig<>();
|
||||
poolConfig.setMaxTotal(pool.getMaxActive());
|
||||
poolConfig.setMaxIdle(pool.getMaxIdle());
|
||||
poolConfig.setMinIdle(pool.getMinIdle());
|
||||
LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder =
|
||||
LettucePoolingClientConfiguration.builder().poolConfig(poolConfig);
|
||||
if (redisProperties.getTimeout() != null) {
|
||||
builder.commandTimeout(redisProperties.getTimeout());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Bean(name = "mqRedisConnectionFactory", autowireCandidate = false, defaultCandidate = false)
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public LettuceConnectionFactory mqRedisConnectionFactory(MQRedisResources mqRedisResources) {
|
||||
|
||||
@@ -40,6 +40,8 @@ public class MQProperties {
|
||||
private Duration consumerBlockTimeout = Duration.ofMillis(2000);
|
||||
private Duration pendingClaimIdle = Duration.ofMillis(60000);
|
||||
private int maxRetry = 16;
|
||||
private ConsumerExecutor consumerExecutor = new ConsumerExecutor();
|
||||
private Pool pool = new Pool();
|
||||
|
||||
public int getDatabase() {
|
||||
return database;
|
||||
@@ -96,5 +98,98 @@ public class MQProperties {
|
||||
public void setMaxRetry(int maxRetry) {
|
||||
this.maxRetry = maxRetry;
|
||||
}
|
||||
|
||||
public ConsumerExecutor getConsumerExecutor() {
|
||||
return consumerExecutor;
|
||||
}
|
||||
|
||||
public void setConsumerExecutor(ConsumerExecutor consumerExecutor) {
|
||||
this.consumerExecutor = consumerExecutor;
|
||||
}
|
||||
|
||||
public Pool getPool() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
public void setPool(Pool pool) {
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Redis MQ 消费线程池配置。
|
||||
*/
|
||||
public static class ConsumerExecutor {
|
||||
|
||||
private int coreSize = 4;
|
||||
private int maxSize = 12;
|
||||
private int queueCapacity = 64;
|
||||
private int keepAliveSeconds = 60;
|
||||
|
||||
public int getCoreSize() {
|
||||
return coreSize;
|
||||
}
|
||||
|
||||
public void setCoreSize(int coreSize) {
|
||||
this.coreSize = coreSize;
|
||||
}
|
||||
|
||||
public int getMaxSize() {
|
||||
return maxSize;
|
||||
}
|
||||
|
||||
public void setMaxSize(int maxSize) {
|
||||
this.maxSize = maxSize;
|
||||
}
|
||||
|
||||
public int getQueueCapacity() {
|
||||
return queueCapacity;
|
||||
}
|
||||
|
||||
public void setQueueCapacity(int queueCapacity) {
|
||||
this.queueCapacity = queueCapacity;
|
||||
}
|
||||
|
||||
public int getKeepAliveSeconds() {
|
||||
return keepAliveSeconds;
|
||||
}
|
||||
|
||||
public void setKeepAliveSeconds(int keepAliveSeconds) {
|
||||
this.keepAliveSeconds = keepAliveSeconds;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Redis MQ 连接池配置。
|
||||
*/
|
||||
public static class Pool {
|
||||
|
||||
private int maxActive = 12;
|
||||
private int maxIdle = 8;
|
||||
private int minIdle = 1;
|
||||
|
||||
public int getMaxActive() {
|
||||
return maxActive;
|
||||
}
|
||||
|
||||
public void setMaxActive(int maxActive) {
|
||||
this.maxActive = maxActive;
|
||||
}
|
||||
|
||||
public int getMaxIdle() {
|
||||
return maxIdle;
|
||||
}
|
||||
|
||||
public void setMaxIdle(int maxIdle) {
|
||||
this.maxIdle = maxIdle;
|
||||
}
|
||||
|
||||
public int getMinIdle() {
|
||||
return minIdle;
|
||||
}
|
||||
|
||||
public void setMinIdle(int minIdle) {
|
||||
this.minIdle = minIdle;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,9 +30,11 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifecycle {
|
||||
|
||||
@@ -45,7 +47,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
private final MQDeadLetterService deadLetterService;
|
||||
private final RedisStreamKeySupport keySupport;
|
||||
private final List<MQConsumerHandler> handlers;
|
||||
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
private final ExecutorService executorService;
|
||||
|
||||
private volatile boolean running;
|
||||
|
||||
@@ -63,6 +65,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
this.deadLetterService = deadLetterService;
|
||||
this.keySupport = keySupport;
|
||||
this.handlers = handlers;
|
||||
this.executorService = createExecutor(properties, handlers);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -77,7 +80,12 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
int currentShard = shard;
|
||||
LOG.info("启动 MQ 消费线程: topic={}, group={}, shard={}, handler={}",
|
||||
subscription.getTopic(), subscription.getConsumerGroup(), currentShard, handler.getClass().getSimpleName());
|
||||
executorService.submit(() -> consumeLoop(handler, subscription, currentShard));
|
||||
try {
|
||||
executorService.submit(() -> consumeLoop(handler, subscription, currentShard));
|
||||
} catch (RuntimeException e) {
|
||||
running = false;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -108,6 +116,42 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
stop();
|
||||
}
|
||||
|
||||
private ExecutorService createExecutor(MQProperties properties, List<MQConsumerHandler> handlers) {
|
||||
MQProperties.Redis.ConsumerExecutor config = properties.getRedis().getConsumerExecutor();
|
||||
int consumerTaskCount = handlers.stream()
|
||||
.map(MQConsumerHandler::subscription)
|
||||
.filter(Objects::nonNull)
|
||||
.mapToInt(subscription -> Math.max(subscription.getShardCount(), 1))
|
||||
.sum();
|
||||
if (config.getCoreSize() > config.getMaxSize()) {
|
||||
throw new IllegalStateException("Redis MQ 消费线程池配置错误:core-size 不能大于 max-size");
|
||||
}
|
||||
if (consumerTaskCount > config.getMaxSize()) {
|
||||
throw new IllegalStateException("Redis MQ 消费线程池配置错误:max-size="
|
||||
+ config.getMaxSize() + " 小于消费循环数 " + consumerTaskCount
|
||||
+ ",请调大 easyflow.mq.redis.consumer-executor.max-size");
|
||||
}
|
||||
int coreSize = Math.max(config.getCoreSize(), consumerTaskCount);
|
||||
int maxSize = config.getMaxSize();
|
||||
AtomicInteger threadIndex = new AtomicInteger(1);
|
||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||||
coreSize,
|
||||
maxSize,
|
||||
config.getKeepAliveSeconds(),
|
||||
TimeUnit.SECONDS,
|
||||
new ArrayBlockingQueue<>(config.getQueueCapacity()),
|
||||
task -> {
|
||||
Thread thread = new Thread(task);
|
||||
thread.setName("redis-mq-consumer-" + threadIndex.getAndIncrement());
|
||||
thread.setDaemon(false);
|
||||
return thread;
|
||||
},
|
||||
new ThreadPoolExecutor.AbortPolicy()
|
||||
);
|
||||
executor.allowCoreThreadTimeOut(true);
|
||||
return executor;
|
||||
}
|
||||
|
||||
private void consumeLoop(MQConsumerHandler handler, MQSubscription subscription, int shard) {
|
||||
String streamKey = keySupport.streamKey(subscription.getTopic(), shard);
|
||||
String consumerName = subscription.getConsumerGroup() + "-" + shard;
|
||||
|
||||
Reference in New Issue
Block a user