feat: 增强多实例分布式部署兼容
- 增加定时任务分布式锁并覆盖 chatlog、文档导入和 Agent HITL 过期扫描 - 增强 Redis MQ 多实例 consumer 标识、pending reclaim 和单条处理能力 - 增加文档导入状态 Redis 广播和 Agent HITL 跨节点路由确认
This commit is contained in:
@@ -39,7 +39,23 @@
|
||||
<artifactId>fastjson</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-aop</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<version>5.12.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package tech.easyflow.common.cache;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* Spring 定时任务 Redis 分布式锁。
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface DistributedScheduledLock {
|
||||
|
||||
/**
|
||||
* 获取锁使用的 Redis key。
|
||||
*
|
||||
* @return Redis 锁 key
|
||||
*/
|
||||
String key();
|
||||
|
||||
/**
|
||||
* 等待锁的秒数。
|
||||
*
|
||||
* @return 等待锁的秒数
|
||||
*/
|
||||
long waitSeconds() default 0L;
|
||||
|
||||
/**
|
||||
* 锁租约秒数。
|
||||
*
|
||||
* @return 锁租约秒数
|
||||
*/
|
||||
long leaseSeconds() default 300L;
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
package tech.easyflow.common.cache;
|
||||
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* 定时任务分布式锁切面。
|
||||
*/
|
||||
@Aspect
|
||||
@Component
|
||||
public class DistributedScheduledLockAspect {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DistributedScheduledLockAspect.class);
|
||||
|
||||
private final RedisLockExecutor redisLockExecutor;
|
||||
private final ScheduledExecutorService renewExecutor;
|
||||
|
||||
/**
|
||||
* 创建定时任务分布式锁切面。
|
||||
*
|
||||
* @param redisLockExecutor Redis 分布式锁执行器
|
||||
*/
|
||||
public DistributedScheduledLockAspect(RedisLockExecutor redisLockExecutor) {
|
||||
this.redisLockExecutor = redisLockExecutor;
|
||||
this.renewExecutor = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new DistributedScheduledLockThreadFactory()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 拦截带分布式调度锁的定时任务。
|
||||
*
|
||||
* @param joinPoint 切点
|
||||
* @param lock 锁注解
|
||||
* @return 原方法返回值;未抢到锁时返回 null
|
||||
* @throws Throwable 原方法执行异常或 Redis 访问异常
|
||||
*/
|
||||
@Around("@annotation(lock)")
|
||||
public Object around(ProceedingJoinPoint joinPoint, DistributedScheduledLock lock) throws Throwable {
|
||||
Duration waitTimeout = Duration.ofSeconds(Math.max(lock.waitSeconds(), 0L));
|
||||
Duration leaseTimeout = Duration.ofSeconds(Math.max(lock.leaseSeconds(), 1L));
|
||||
RedisLockExecutor.LockHandle handle = redisLockExecutor.tryAcquire(lock.key(), waitTimeout, leaseTimeout);
|
||||
if (handle == null) {
|
||||
LOG.info("定时任务分布式锁已被其他实例持有,跳过本轮执行: lockKey={}, method={}",
|
||||
lock.key(), joinPoint.getSignature().toShortString());
|
||||
return null;
|
||||
}
|
||||
ScheduledFuture<?> renewTask = scheduleRenew(lock.key(), handle, leaseTimeout);
|
||||
try {
|
||||
return joinPoint.proceed();
|
||||
} finally {
|
||||
renewTask.cancel(false);
|
||||
handle.release();
|
||||
}
|
||||
}
|
||||
|
||||
private ScheduledFuture<?> scheduleRenew(String lockKey,
|
||||
RedisLockExecutor.LockHandle handle,
|
||||
Duration leaseTimeout) {
|
||||
long renewIntervalMillis = Math.max(leaseTimeout.toMillis() / 3L, 1000L);
|
||||
return renewExecutor.scheduleWithFixedDelay(() -> {
|
||||
if (!handle.renew()) {
|
||||
LOG.warn("定时任务分布式锁续期失败: lockKey={}", lockKey);
|
||||
}
|
||||
}, renewIntervalMillis, renewIntervalMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭调度锁续期线程池。
|
||||
*/
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
renewExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
/**
|
||||
* 调度锁续期线程工厂。
|
||||
*/
|
||||
private static final class DistributedScheduledLockThreadFactory implements ThreadFactory {
|
||||
|
||||
private final AtomicInteger index = new AtomicInteger(1);
|
||||
|
||||
/**
|
||||
* 创建续期线程。
|
||||
*
|
||||
* @param runnable 线程任务
|
||||
* @return 续期线程
|
||||
*/
|
||||
@Override
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable);
|
||||
thread.setName("distributed-scheduled-lock-renew-" + index.getAndIncrement());
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,9 @@ import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Redis 分布式锁执行器。
|
||||
*/
|
||||
@Component
|
||||
public class RedisLockExecutor {
|
||||
|
||||
@@ -42,6 +45,14 @@ public class RedisLockExecutor {
|
||||
@Autowired
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 在分布式锁保护下执行无返回任务。
|
||||
*
|
||||
* @param lockKey 锁 key
|
||||
* @param waitTimeout 等待锁的最大时间
|
||||
* @param leaseTimeout 锁租约时间
|
||||
* @param task 业务任务
|
||||
*/
|
||||
public void executeWithLock(String lockKey, Duration waitTimeout, Duration leaseTimeout, Runnable task) {
|
||||
executeWithLock(lockKey, waitTimeout, leaseTimeout, () -> {
|
||||
task.run();
|
||||
@@ -49,6 +60,16 @@ public class RedisLockExecutor {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 在分布式锁保护下执行有返回任务。
|
||||
*
|
||||
* @param lockKey 锁 key
|
||||
* @param waitTimeout 等待锁的最大时间
|
||||
* @param leaseTimeout 锁租约时间
|
||||
* @param task 业务任务
|
||||
* @param <T> 返回类型
|
||||
* @return 任务返回值
|
||||
*/
|
||||
public <T> T executeWithLock(String lockKey, Duration waitTimeout, Duration leaseTimeout, Supplier<T> task) {
|
||||
LockHandle handle = acquire(lockKey, waitTimeout, leaseTimeout);
|
||||
try {
|
||||
@@ -70,24 +91,46 @@ public class RedisLockExecutor {
|
||||
* @return 锁句柄
|
||||
*/
|
||||
public LockHandle acquire(String lockKey, Duration waitTimeout, Duration leaseTimeout) {
|
||||
LockHandle handle = tryAcquire(lockKey, waitTimeout, leaseTimeout);
|
||||
if (handle == null) {
|
||||
throw new IllegalStateException("获取分布式锁失败,请稍后重试,lockKey=" + lockKey);
|
||||
}
|
||||
return handle;
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试获取显式释放的分布式锁句柄。
|
||||
*
|
||||
* <p>返回 {@code null} 表示锁当前被其他节点持有。Redis 访问失败或等待过程被中断仍会抛出异常,
|
||||
* 调用方可据此区分“正常跳过”和“基础设施异常”。</p>
|
||||
*
|
||||
* @param lockKey 锁 key
|
||||
* @param waitTimeout 等待时间
|
||||
* @param leaseTimeout 租约时间
|
||||
* @return 获取成功时返回锁句柄,否则返回 null
|
||||
*/
|
||||
public LockHandle tryAcquire(String lockKey, Duration waitTimeout, Duration leaseTimeout) {
|
||||
String lockValue = UUID.randomUUID().toString();
|
||||
boolean acquired = false;
|
||||
long deadline = System.nanoTime() + waitTimeout.toNanos();
|
||||
try {
|
||||
while (System.nanoTime() <= deadline) {
|
||||
do {
|
||||
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, leaseTimeout);
|
||||
if (Boolean.TRUE.equals(success)) {
|
||||
acquired = true;
|
||||
break;
|
||||
}
|
||||
if (System.nanoTime() >= deadline) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(RETRY_INTERVAL_MILLIS);
|
||||
}
|
||||
} while (System.nanoTime() <= deadline);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException("等待分布式锁被中断,lockKey=" + lockKey, e);
|
||||
}
|
||||
if (!acquired) {
|
||||
throw new IllegalStateException("获取分布式锁失败,请稍后重试,lockKey=" + lockKey);
|
||||
return null;
|
||||
}
|
||||
return new LockHandle(lockKey, lockValue, leaseTimeout);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
package tech.easyflow.common.cache;
|
||||
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.Signature;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.data.redis.core.script.RedisScript;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* {@link DistributedScheduledLockAspect} 回归测试。
|
||||
*/
|
||||
public class DistributedScheduledLockAspectTest {
|
||||
|
||||
/**
|
||||
* 验证未抢到调度锁时跳过原方法。
|
||||
*
|
||||
* @throws Throwable 切面执行异常
|
||||
*/
|
||||
@Test
|
||||
public void aroundShouldSkipTaskWhenLockIsHeld() throws Throwable {
|
||||
RedisLockExecutor executor = createExecutor(false);
|
||||
DistributedScheduledLockAspect aspect = new DistributedScheduledLockAspect(executor);
|
||||
AtomicInteger proceedCount = new AtomicInteger();
|
||||
|
||||
Object result = aspect.around(
|
||||
mockJoinPoint(proceedCount),
|
||||
annotatedMethod("lockedTask").getAnnotation(DistributedScheduledLock.class)
|
||||
);
|
||||
|
||||
Assert.assertNull(result);
|
||||
Assert.assertEquals(0, proceedCount.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证抢到调度锁时执行原方法并释放锁。
|
||||
*
|
||||
* @throws Throwable 切面执行异常
|
||||
*/
|
||||
@Test
|
||||
public void aroundShouldProceedAndReleaseWhenLockAcquired() throws Throwable {
|
||||
RedisLockExecutor executor = createExecutor(true);
|
||||
DistributedScheduledLockAspect aspect = new DistributedScheduledLockAspect(executor);
|
||||
AtomicInteger proceedCount = new AtomicInteger();
|
||||
|
||||
Object result = aspect.around(
|
||||
mockJoinPoint(proceedCount),
|
||||
annotatedMethod("lockedTask").getAnnotation(DistributedScheduledLock.class)
|
||||
);
|
||||
|
||||
Assert.assertEquals("ok", result);
|
||||
Assert.assertEquals(1, proceedCount.get());
|
||||
}
|
||||
|
||||
@DistributedScheduledLock(key = "easyflow:test:scheduled", leaseSeconds = 30L)
|
||||
private void lockedTask() {
|
||||
}
|
||||
|
||||
private Method annotatedMethod(String methodName) throws NoSuchMethodException {
|
||||
Method method = DistributedScheduledLockAspectTest.class.getDeclaredMethod(methodName);
|
||||
method.setAccessible(true);
|
||||
return method;
|
||||
}
|
||||
|
||||
private ProceedingJoinPoint mockJoinPoint(AtomicInteger proceedCount) throws Throwable {
|
||||
ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
|
||||
Signature signature = Mockito.mock(Signature.class);
|
||||
Mockito.when(signature.toShortString()).thenReturn("lockedTask()");
|
||||
Mockito.when(joinPoint.getSignature()).thenReturn(signature);
|
||||
Mockito.when(joinPoint.proceed()).thenAnswer(invocation -> {
|
||||
proceedCount.incrementAndGet();
|
||||
return "ok";
|
||||
});
|
||||
return joinPoint;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private RedisLockExecutor createExecutor(boolean acquired) throws Exception {
|
||||
StringRedisTemplate redisTemplate = Mockito.mock(StringRedisTemplate.class);
|
||||
ValueOperations<String, String> valueOperations = Mockito.mock(ValueOperations.class);
|
||||
Mockito.when(valueOperations.setIfAbsent(
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.any(Duration.class)
|
||||
)).thenReturn(acquired);
|
||||
Mockito.when(redisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
Mockito.when(redisTemplate.execute(
|
||||
ArgumentMatchers.<RedisScript<Long>>any(),
|
||||
ArgumentMatchers.<List<String>>any(),
|
||||
ArgumentMatchers.<Object[]>any()
|
||||
)).thenReturn(1L);
|
||||
|
||||
RedisLockExecutor executor = new RedisLockExecutor();
|
||||
Field field = RedisLockExecutor.class.getDeclaredField("stringRedisTemplate");
|
||||
field.setAccessible(true);
|
||||
field.set(executor, redisTemplate);
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package tech.easyflow.common.cache;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.data.redis.core.script.RedisScript;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* {@link RedisLockExecutor} 回归测试。
|
||||
*/
|
||||
public class RedisLockExecutorTest {
|
||||
|
||||
/**
|
||||
* 验证锁被占用时返回 null,便于调度任务跳过本轮执行。
|
||||
*
|
||||
* @throws Exception 反射注入异常
|
||||
*/
|
||||
@Test
|
||||
public void tryAcquireShouldReturnNullWhenLockIsHeld() throws Exception {
|
||||
StringRedisTemplate redisTemplate = Mockito.mock(StringRedisTemplate.class);
|
||||
ValueOperations<String, String> valueOperations = mockValueOperations(false);
|
||||
Mockito.when(redisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
|
||||
RedisLockExecutor executor = new RedisLockExecutor();
|
||||
setRedisTemplate(executor, redisTemplate);
|
||||
|
||||
RedisLockExecutor.LockHandle handle = executor.tryAcquire(
|
||||
"easyflow:test:lock",
|
||||
Duration.ZERO,
|
||||
Duration.ofSeconds(30)
|
||||
);
|
||||
|
||||
Assert.assertNull(handle);
|
||||
Mockito.verify(valueOperations).setIfAbsent(
|
||||
ArgumentMatchers.eq("easyflow:test:lock"),
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.eq(Duration.ofSeconds(30))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证锁获取成功后释放会执行 owner token 校验脚本。
|
||||
*
|
||||
* @throws Exception 反射注入异常
|
||||
*/
|
||||
@Test
|
||||
public void acquiredHandleShouldReleaseLockWithOwnerToken() throws Exception {
|
||||
StringRedisTemplate redisTemplate = Mockito.mock(StringRedisTemplate.class);
|
||||
ValueOperations<String, String> valueOperations = mockValueOperations(true);
|
||||
Mockito.when(redisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
Mockito.when(redisTemplate.execute(
|
||||
ArgumentMatchers.<RedisScript<Long>>any(),
|
||||
ArgumentMatchers.<List<String>>any(),
|
||||
ArgumentMatchers.<Object[]>any()
|
||||
)).thenReturn(1L);
|
||||
|
||||
RedisLockExecutor executor = new RedisLockExecutor();
|
||||
setRedisTemplate(executor, redisTemplate);
|
||||
|
||||
RedisLockExecutor.LockHandle handle = executor.tryAcquire(
|
||||
"easyflow:test:lock",
|
||||
Duration.ZERO,
|
||||
Duration.ofSeconds(30)
|
||||
);
|
||||
|
||||
Assert.assertNotNull(handle);
|
||||
handle.release();
|
||||
Mockito.verify(redisTemplate).execute(
|
||||
ArgumentMatchers.<RedisScript<Long>>any(),
|
||||
ArgumentMatchers.eq(List.of("easyflow:test:lock")),
|
||||
ArgumentMatchers.<Object[]>any()
|
||||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private ValueOperations<String, String> mockValueOperations(boolean acquired) {
|
||||
ValueOperations<String, String> valueOperations = Mockito.mock(ValueOperations.class);
|
||||
Mockito.when(valueOperations.setIfAbsent(
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.any(Duration.class)
|
||||
)).thenReturn(acquired);
|
||||
return valueOperations;
|
||||
}
|
||||
|
||||
private void setRedisTemplate(RedisLockExecutor executor, StringRedisTemplate redisTemplate) throws Exception {
|
||||
Field field = RedisLockExecutor.class.getDeclaredField("stringRedisTemplate");
|
||||
field.setAccessible(true);
|
||||
field.set(executor, redisTemplate);
|
||||
}
|
||||
}
|
||||
@@ -27,5 +27,17 @@
|
||||
<artifactId>commons-pool2</artifactId>
|
||||
<version>2.11.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<version>5.12.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package tech.easyflow.common.mq.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* EasyFlow MQ 配置。
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "easyflow.mq")
|
||||
public class MQProperties {
|
||||
|
||||
@@ -35,6 +39,7 @@ public class MQProperties {
|
||||
|
||||
private int database = 1;
|
||||
private String streamPrefix = "easyflow:mq";
|
||||
private String consumerInstanceId = defaultConsumerInstanceId();
|
||||
private int chatPersistShardCount = 4;
|
||||
private int consumerBatchSize = 200;
|
||||
private Duration consumerBlockTimeout = Duration.ofMillis(2000);
|
||||
@@ -59,6 +64,26 @@ public class MQProperties {
|
||||
this.streamPrefix = streamPrefix;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 Redis Stream 消费实例 ID。
|
||||
*
|
||||
* @return 消费实例 ID
|
||||
*/
|
||||
public String getConsumerInstanceId() {
|
||||
return consumerInstanceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置 Redis Stream 消费实例 ID。
|
||||
*
|
||||
* @param consumerInstanceId 消费实例 ID
|
||||
*/
|
||||
public void setConsumerInstanceId(String consumerInstanceId) {
|
||||
this.consumerInstanceId = StringUtils.hasText(consumerInstanceId)
|
||||
? consumerInstanceId.trim()
|
||||
: defaultConsumerInstanceId();
|
||||
}
|
||||
|
||||
public int getChatPersistShardCount() {
|
||||
return chatPersistShardCount;
|
||||
}
|
||||
@@ -191,5 +216,13 @@ public class MQProperties {
|
||||
this.minIdle = minIdle;
|
||||
}
|
||||
}
|
||||
|
||||
private static String defaultConsumerInstanceId() {
|
||||
String hostName = System.getenv("HOSTNAME");
|
||||
if (StringUtils.hasText(hostName)) {
|
||||
return hostName.trim();
|
||||
}
|
||||
return java.util.UUID.randomUUID().toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ public class MQSubscription {
|
||||
private String topic;
|
||||
private String consumerGroup;
|
||||
private int shardCount;
|
||||
private boolean batchEnabled = true;
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
@@ -29,4 +30,22 @@ public class MQSubscription {
|
||||
public void setShardCount(int shardCount) {
|
||||
this.shardCount = shardCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否启用批量消费。
|
||||
*
|
||||
* @return true 表示启用批量消费
|
||||
*/
|
||||
public boolean isBatchEnabled() {
|
||||
return batchEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置是否启用批量消费。
|
||||
*
|
||||
* @param batchEnabled 是否启用批量消费
|
||||
*/
|
||||
public void setBatchEnabled(boolean batchEnabled) {
|
||||
this.batchEnabled = batchEnabled;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifecycle {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RedisMQConsumerContainer.class);
|
||||
private static final Pattern UNSAFE_CONSUMER_NAME_CHARS = Pattern.compile("[^A-Za-z0-9_.-]");
|
||||
|
||||
private final RedisConnectionFactory redisConnectionFactory;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
@@ -154,13 +156,24 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
|
||||
private void consumeLoop(MQConsumerHandler handler, MQSubscription subscription, int shard) {
|
||||
String streamKey = keySupport.streamKey(subscription.getTopic(), shard);
|
||||
String consumerName = subscription.getConsumerGroup() + "-" + shard;
|
||||
String consumerName = buildConsumerName(subscription.getConsumerGroup(), shard);
|
||||
ensureConsumerGroup(streamKey, subscription.getConsumerGroup());
|
||||
LOG.info("MQ 消费循环已启动: topic={}, group={}, shard={}, consumer={}, streamKey={}, handler={}",
|
||||
subscription.getTopic(), subscription.getConsumerGroup(), shard, consumerName, streamKey, handler.getClass().getSimpleName());
|
||||
while (running) {
|
||||
try {
|
||||
reclaimPending(streamKey, subscription.getConsumerGroup(), consumerName);
|
||||
List<MapRecord<String, Object, Object>> pendingRecords =
|
||||
reclaimPending(streamKey, subscription.getConsumerGroup(), consumerName);
|
||||
if (!pendingRecords.isEmpty()) {
|
||||
List<MQMessage> pendingMessages = toMessages(streamKey, pendingRecords);
|
||||
if (!pendingMessages.isEmpty()) {
|
||||
LOG.info("MQ 收到重领 pending 消息批次: topic={}, group={}, shard={}, consumer={}, streamKey={}, count={}",
|
||||
subscription.getTopic(), subscription.getConsumerGroup(), shard, consumerName,
|
||||
streamKey, pendingMessages.size());
|
||||
handleMessages(handler, subscription, streamKey, subscription.getConsumerGroup(), pendingMessages);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
|
||||
Consumer.from(subscription.getConsumerGroup(), consumerName),
|
||||
StreamReadOptions.empty()
|
||||
@@ -177,7 +190,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
}
|
||||
LOG.info("MQ 收到消息批次: topic={}, group={}, shard={}, consumer={}, streamKey={}, count={}",
|
||||
subscription.getTopic(), subscription.getConsumerGroup(), shard, consumerName, streamKey, messages.size());
|
||||
handleMessages(handler, streamKey, subscription.getConsumerGroup(), messages);
|
||||
handleMessages(handler, subscription, streamKey, subscription.getConsumerGroup(), messages);
|
||||
} catch (Exception exception) {
|
||||
LOG.error("MQ 消费循环异常: topic={}, group={}, shard={}, consumer={}, streamKey={}, handler={}",
|
||||
subscription.getTopic(),
|
||||
@@ -192,7 +205,20 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
}
|
||||
}
|
||||
|
||||
private void reclaimPending(String streamKey, String group, String consumerName) {
|
||||
/**
|
||||
* 构建 Redis Stream consumer name。
|
||||
*
|
||||
* @param consumerGroup 消费组
|
||||
* @param shard 分片序号
|
||||
* @return consumer name
|
||||
*/
|
||||
String buildConsumerName(String consumerGroup, int shard) {
|
||||
String instanceId = properties.getRedis().getConsumerInstanceId();
|
||||
String safeInstanceId = UNSAFE_CONSUMER_NAME_CHARS.matcher(instanceId).replaceAll("-");
|
||||
return consumerGroup + "-" + shard + "-" + safeInstanceId;
|
||||
}
|
||||
|
||||
List<MapRecord<String, Object, Object>> reclaimPending(String streamKey, String group, String consumerName) {
|
||||
Duration idle = properties.getRedis().getPendingClaimIdle();
|
||||
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
|
||||
RedisStreamCommands.XPendingOptions options = RedisStreamCommands.XPendingOptions
|
||||
@@ -200,7 +226,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
var pendingMessages = connection.streamCommands()
|
||||
.xPending(streamKey.getBytes(StandardCharsets.UTF_8), group, options);
|
||||
if (pendingMessages == null || pendingMessages.isEmpty()) {
|
||||
return;
|
||||
return List.of();
|
||||
}
|
||||
List<RecordId> ids = new ArrayList<>();
|
||||
for (PendingMessage pendingMessage : pendingMessages) {
|
||||
@@ -209,15 +235,16 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
}
|
||||
}
|
||||
if (ids.isEmpty()) {
|
||||
return;
|
||||
return List.of();
|
||||
}
|
||||
stringRedisTemplate.opsForStream().claim(
|
||||
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().claim(
|
||||
streamKey,
|
||||
group,
|
||||
consumerName,
|
||||
idle,
|
||||
ids.toArray(new RecordId[0])
|
||||
);
|
||||
return records == null ? List.of() : records;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,7 +260,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
}
|
||||
}
|
||||
|
||||
private List<MQMessage> toMessages(String streamKey, List<MapRecord<String, Object, Object>> records) {
|
||||
List<MQMessage> toMessages(String streamKey, List<MapRecord<String, Object, Object>> records) {
|
||||
List<MQMessage> messages = new ArrayList<>(records.size());
|
||||
for (MapRecord<String, Object, Object> record : records) {
|
||||
Object payload = record.getValue().get("payload");
|
||||
@@ -269,7 +296,15 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
}
|
||||
}
|
||||
|
||||
private void handleMessages(MQConsumerHandler handler, String streamKey, String group, List<MQMessage> messages) throws Exception {
|
||||
void handleMessages(MQConsumerHandler handler,
|
||||
MQSubscription subscription,
|
||||
String streamKey,
|
||||
String group,
|
||||
List<MQMessage> messages) throws Exception {
|
||||
if (!subscription.isBatchEnabled()) {
|
||||
handleMessagesIndividually(handler, streamKey, group, messages);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
LOG.info("MQ 开始批量处理消息: group={}, streamKey={}, count={}, handler={}",
|
||||
group, streamKey, messages.size(), handler.getClass().getSimpleName());
|
||||
@@ -288,6 +323,13 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
}
|
||||
}
|
||||
|
||||
handleMessagesIndividually(handler, streamKey, group, messages);
|
||||
}
|
||||
|
||||
private void handleMessagesIndividually(MQConsumerHandler handler,
|
||||
String streamKey,
|
||||
String group,
|
||||
List<MQMessage> messages) {
|
||||
for (MQMessage message : messages) {
|
||||
try {
|
||||
LOG.info("MQ 开始单条处理消息: group={}, streamKey={}, messageId={}, handler={}",
|
||||
|
||||
@@ -0,0 +1,175 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisStreamCommands;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.MapRecord;
|
||||
import org.springframework.data.redis.connection.stream.PendingMessage;
|
||||
import org.springframework.data.redis.connection.stream.PendingMessages;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.core.StreamOperations;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import tech.easyflow.common.mq.config.MQProperties;
|
||||
import tech.easyflow.common.mq.core.MQConsumerHandler;
|
||||
import tech.easyflow.common.mq.core.MQDeadLetterService;
|
||||
import tech.easyflow.common.mq.core.MQMessage;
|
||||
import tech.easyflow.common.mq.core.MQMessageConverter;
|
||||
import tech.easyflow.common.mq.core.MQSubscription;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* {@link RedisMQConsumerContainer} 回归测试。
|
||||
*/
|
||||
public class RedisMQConsumerContainerTest {
|
||||
|
||||
/**
|
||||
* 验证 consumer name 包含稳定实例 ID,且消费组名称不被改变。
|
||||
*/
|
||||
@Test
|
||||
public void buildConsumerNameShouldAppendSanitizedInstanceId() {
|
||||
MQProperties properties = new MQProperties();
|
||||
properties.getRedis().setConsumerInstanceId("node/a:1");
|
||||
RedisMQConsumerContainer container = new RedisMQConsumerContainer(
|
||||
null,
|
||||
null,
|
||||
properties,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
List.of()
|
||||
);
|
||||
|
||||
String consumerName = container.buildConsumerName("chat-persist", 2);
|
||||
|
||||
Assert.assertEquals("chat-persist-2-node-a-1", consumerName);
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证关闭批量消费后,容器按单条处理并独立确认消息。
|
||||
*
|
||||
* @throws Exception 消息处理异常
|
||||
*/
|
||||
@Test
|
||||
public void handleMessagesShouldProcessIndividuallyWhenBatchDisabled() throws Exception {
|
||||
StringRedisTemplate redisTemplate = Mockito.mock(StringRedisTemplate.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
StreamOperations<String, Object, Object> streamOperations = Mockito.mock(StreamOperations.class);
|
||||
Mockito.when(redisTemplate.opsForStream()).thenReturn(streamOperations);
|
||||
RecordingHandler handler = new RecordingHandler();
|
||||
MQSubscription subscription = new MQSubscription();
|
||||
subscription.setBatchEnabled(false);
|
||||
RedisMQConsumerContainer container = container(redisTemplate, null);
|
||||
MQMessage first = message("message-1", "1-0");
|
||||
MQMessage second = message("message-2", "2-0");
|
||||
|
||||
container.handleMessages(handler, subscription, "stream-1", "group-1", List.of(first, second));
|
||||
|
||||
Assert.assertEquals(List.of(List.of("message-1"), List.of("message-2")), handler.calls);
|
||||
Mockito.verify(streamOperations).acknowledge("stream-1", "group-1", "1-0");
|
||||
Mockito.verify(streamOperations).acknowledge("stream-1", "group-1", "2-0");
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证 pending 消息被 claim 后可以转换为 MQ 消息继续消费。
|
||||
*/
|
||||
@Test
|
||||
public void reclaimPendingShouldReturnClaimedRecordsForConsumption() {
|
||||
StringRedisTemplate redisTemplate = Mockito.mock(StringRedisTemplate.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
StreamOperations<String, Object, Object> streamOperations = Mockito.mock(StreamOperations.class);
|
||||
Mockito.when(redisTemplate.opsForStream()).thenReturn(streamOperations);
|
||||
RedisConnectionFactory connectionFactory = Mockito.mock(RedisConnectionFactory.class);
|
||||
RedisConnection connection = Mockito.mock(RedisConnection.class);
|
||||
RedisStreamCommands streamCommands = Mockito.mock(RedisStreamCommands.class);
|
||||
Mockito.when(connectionFactory.getConnection()).thenReturn(connection);
|
||||
Mockito.when(connection.streamCommands()).thenReturn(streamCommands);
|
||||
PendingMessage pendingMessage = new PendingMessage(
|
||||
RecordId.of("1-0"), Consumer.from("group-1", "old-consumer"), Duration.ofMinutes(2), 1);
|
||||
Mockito.when(streamCommands.xPending(
|
||||
ArgumentMatchers.eq("stream-1".getBytes(java.nio.charset.StandardCharsets.UTF_8)),
|
||||
ArgumentMatchers.eq("group-1"),
|
||||
ArgumentMatchers.any(RedisStreamCommands.XPendingOptions.class)))
|
||||
.thenReturn(new PendingMessages("group-1", List.of(pendingMessage)));
|
||||
Map<Object, Object> payload = Map.of("payload", "message-1");
|
||||
MapRecord<String, Object, Object> record = MapRecord
|
||||
.create("stream-1", payload)
|
||||
.withId(RecordId.of("1-0"));
|
||||
Mockito.when(streamOperations.claim(
|
||||
ArgumentMatchers.eq("stream-1"),
|
||||
ArgumentMatchers.eq("group-1"),
|
||||
ArgumentMatchers.eq("consumer-1"),
|
||||
ArgumentMatchers.any(Duration.class),
|
||||
ArgumentMatchers.any(RecordId[].class)))
|
||||
.thenReturn(List.of(record));
|
||||
RedisMQConsumerContainer container = container(redisTemplate, connectionFactory);
|
||||
|
||||
List<MapRecord<String, Object, Object>> records =
|
||||
container.reclaimPending("stream-1", "group-1", "consumer-1");
|
||||
List<MQMessage> messages = container.toMessages("stream-1", records);
|
||||
|
||||
Assert.assertEquals(1, records.size());
|
||||
Assert.assertEquals(1, messages.size());
|
||||
Assert.assertEquals("message-1", messages.get(0).getMessageId());
|
||||
Assert.assertEquals("1-0", messages.get(0).getStreamMessageId());
|
||||
}
|
||||
|
||||
private RedisMQConsumerContainer container(StringRedisTemplate redisTemplate,
|
||||
RedisConnectionFactory connectionFactory) {
|
||||
MQProperties properties = new MQProperties();
|
||||
return new RedisMQConsumerContainer(
|
||||
connectionFactory,
|
||||
redisTemplate,
|
||||
properties,
|
||||
new PlainMessageConverter(),
|
||||
Mockito.mock(MQDeadLetterService.class),
|
||||
null,
|
||||
List.of()
|
||||
);
|
||||
}
|
||||
|
||||
private MQMessage message(String messageId, String streamMessageId) {
|
||||
MQMessage message = new MQMessage();
|
||||
message.setMessageId(messageId);
|
||||
message.setStreamMessageId(streamMessageId);
|
||||
return message;
|
||||
}
|
||||
|
||||
private static final class RecordingHandler implements MQConsumerHandler {
|
||||
|
||||
private final List<List<String>> calls = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public MQSubscription subscription() {
|
||||
return new MQSubscription();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(List<MQMessage> messages) {
|
||||
calls.add(messages.stream().map(MQMessage::getMessageId).toList());
|
||||
}
|
||||
}
|
||||
|
||||
private static final class PlainMessageConverter implements MQMessageConverter {
|
||||
|
||||
@Override
|
||||
public String serialize(MQMessage message) {
|
||||
return message.getMessageId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MQMessage deserialize(String payload) {
|
||||
MQMessage message = new MQMessage();
|
||||
message.setMessageId(payload);
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user