feat: 落地聊天记录异步持久化基础设施
- 新增 chatlog 模块、AnalyticalDB 公共层与 common-mq Redis Streams 实现 - 建立 Redis 热态、MySQL 热数据、AnalyticalDB 历史查询与同步链路 - 收紧聊天记录幂等、摘要时序与持久化失败语义
This commit is contained in:
26
easyflow-commons/easyflow-common-mq/pom.xml
Normal file
26
easyflow-commons/easyflow-common-mq/pom.xml
Normal file
@@ -0,0 +1,26 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>tech.easyflow</groupId>
|
||||
<artifactId>easyflow-commons</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
|
||||
<name>easyflow-common-mq</name>
|
||||
<artifactId>easyflow-common-mq</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,129 @@
|
||||
package tech.easyflow.common.mq.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisPassword;
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import tech.easyflow.common.mq.core.MQConsumerContainer;
|
||||
import tech.easyflow.common.mq.core.MQConsumerHandler;
|
||||
import tech.easyflow.common.mq.core.MQDeadLetterHandler;
|
||||
import tech.easyflow.common.mq.core.MQDeadLetterService;
|
||||
import tech.easyflow.common.mq.core.MQMessageConverter;
|
||||
import tech.easyflow.common.mq.core.MQProducer;
|
||||
import tech.easyflow.common.mq.redis.JacksonMQMessageConverter;
|
||||
import tech.easyflow.common.mq.redis.RedisMQConsumerContainer;
|
||||
import tech.easyflow.common.mq.redis.RedisMQDeadLetterService;
|
||||
import tech.easyflow.common.mq.redis.RedisMQProducer;
|
||||
import tech.easyflow.common.mq.redis.RedisStreamKeySupport;
|
||||
import tech.easyflow.common.mq.support.MQHealthSupport;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(MQProperties.class)
|
||||
public class MQConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "close")
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public MQRedisResources mqRedisResources(RedisProperties redisProperties, MQProperties mqProperties) {
|
||||
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
|
||||
configuration.setHostName(redisProperties.getHost());
|
||||
configuration.setPort(redisProperties.getPort());
|
||||
configuration.setDatabase(mqProperties.getRedis().getDatabase());
|
||||
if (redisProperties.getUsername() != null) {
|
||||
configuration.setUsername(redisProperties.getUsername());
|
||||
}
|
||||
if (redisProperties.getPassword() != null) {
|
||||
configuration.setPassword(RedisPassword.of(redisProperties.getPassword()));
|
||||
}
|
||||
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration);
|
||||
connectionFactory.afterPropertiesSet();
|
||||
return new MQRedisResources(connectionFactory, new StringRedisTemplate(connectionFactory));
|
||||
}
|
||||
|
||||
@Bean(name = "mqRedisConnectionFactory", autowireCandidate = false, defaultCandidate = false)
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public LettuceConnectionFactory mqRedisConnectionFactory(MQRedisResources mqRedisResources) {
|
||||
return mqRedisResources.connectionFactory();
|
||||
}
|
||||
|
||||
@Bean(name = "mqStringRedisTemplate", autowireCandidate = false, defaultCandidate = false)
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public StringRedisTemplate mqStringRedisTemplate(MQRedisResources mqRedisResources) {
|
||||
return mqRedisResources.stringRedisTemplate();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public RedisStreamKeySupport redisStreamKeySupport(MQProperties mqProperties) {
|
||||
return new RedisStreamKeySupport(mqProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public MQMessageConverter mqMessageConverter(ObjectMapper objectMapper) {
|
||||
return new JacksonMQMessageConverter(objectMapper);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public MQDeadLetterService mqDeadLetterService(MQRedisResources mqRedisResources,
|
||||
MQMessageConverter mqMessageConverter,
|
||||
RedisStreamKeySupport redisStreamKeySupport,
|
||||
ObjectProvider<MQDeadLetterHandler> handlersProvider) {
|
||||
List<MQDeadLetterHandler> handlers = handlersProvider.orderedStream().toList();
|
||||
return new RedisMQDeadLetterService(
|
||||
mqRedisResources.stringRedisTemplate(),
|
||||
mqMessageConverter,
|
||||
redisStreamKeySupport,
|
||||
handlers
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public MQProducer mqProducer(MQRedisResources mqRedisResources,
|
||||
MQProperties mqProperties,
|
||||
MQMessageConverter mqMessageConverter,
|
||||
RedisStreamKeySupport redisStreamKeySupport) {
|
||||
return new RedisMQProducer(
|
||||
mqRedisResources.stringRedisTemplate(),
|
||||
mqProperties,
|
||||
mqMessageConverter,
|
||||
redisStreamKeySupport
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public MQHealthSupport mqHealthSupport(MQRedisResources mqRedisResources) {
|
||||
return new MQHealthSupport(mqRedisResources.connectionFactory());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "easyflow.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public MQConsumerContainer mqConsumerContainer(MQRedisResources mqRedisResources,
|
||||
MQProperties mqProperties,
|
||||
MQMessageConverter mqMessageConverter,
|
||||
MQDeadLetterService mqDeadLetterService,
|
||||
RedisStreamKeySupport redisStreamKeySupport,
|
||||
ObjectProvider<MQConsumerHandler> handlersProvider) {
|
||||
List<MQConsumerHandler> handlers = handlersProvider.orderedStream().toList();
|
||||
return new RedisMQConsumerContainer(
|
||||
mqRedisResources.connectionFactory(),
|
||||
mqRedisResources.stringRedisTemplate(),
|
||||
mqProperties,
|
||||
mqMessageConverter,
|
||||
mqDeadLetterService,
|
||||
redisStreamKeySupport,
|
||||
handlers
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
package tech.easyflow.common.mq.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
@ConfigurationProperties(prefix = "easyflow.mq")
|
||||
public class MQProperties {
|
||||
|
||||
private boolean enabled = true;
|
||||
private String type = "redis";
|
||||
private final Redis redis = new Redis();
|
||||
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public Redis getRedis() {
|
||||
return redis;
|
||||
}
|
||||
|
||||
public static class Redis {
|
||||
|
||||
private int database = 1;
|
||||
private String streamPrefix = "easyflow:mq";
|
||||
private int chatPersistShardCount = 4;
|
||||
private int consumerBatchSize = 200;
|
||||
private Duration consumerBlockTimeout = Duration.ofMillis(2000);
|
||||
private Duration pendingClaimIdle = Duration.ofMillis(60000);
|
||||
private int maxRetry = 16;
|
||||
|
||||
public int getDatabase() {
|
||||
return database;
|
||||
}
|
||||
|
||||
public void setDatabase(int database) {
|
||||
this.database = database;
|
||||
}
|
||||
|
||||
public String getStreamPrefix() {
|
||||
return streamPrefix;
|
||||
}
|
||||
|
||||
public void setStreamPrefix(String streamPrefix) {
|
||||
this.streamPrefix = streamPrefix;
|
||||
}
|
||||
|
||||
public int getChatPersistShardCount() {
|
||||
return chatPersistShardCount;
|
||||
}
|
||||
|
||||
public void setChatPersistShardCount(int chatPersistShardCount) {
|
||||
this.chatPersistShardCount = chatPersistShardCount;
|
||||
}
|
||||
|
||||
public int getConsumerBatchSize() {
|
||||
return consumerBatchSize;
|
||||
}
|
||||
|
||||
public void setConsumerBatchSize(int consumerBatchSize) {
|
||||
this.consumerBatchSize = consumerBatchSize;
|
||||
}
|
||||
|
||||
public Duration getConsumerBlockTimeout() {
|
||||
return consumerBlockTimeout;
|
||||
}
|
||||
|
||||
public void setConsumerBlockTimeout(Duration consumerBlockTimeout) {
|
||||
this.consumerBlockTimeout = consumerBlockTimeout;
|
||||
}
|
||||
|
||||
public Duration getPendingClaimIdle() {
|
||||
return pendingClaimIdle;
|
||||
}
|
||||
|
||||
public void setPendingClaimIdle(Duration pendingClaimIdle) {
|
||||
this.pendingClaimIdle = pendingClaimIdle;
|
||||
}
|
||||
|
||||
public int getMaxRetry() {
|
||||
return maxRetry;
|
||||
}
|
||||
|
||||
public void setMaxRetry(int maxRetry) {
|
||||
this.maxRetry = maxRetry;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package tech.easyflow.common.mq.config;
|
||||
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
|
||||
public record MQRedisResources(LettuceConnectionFactory connectionFactory,
|
||||
StringRedisTemplate stringRedisTemplate) implements AutoCloseable {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
connectionFactory.destroy();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface MQAcknowledger {
|
||||
|
||||
void acknowledge(List<MQMessage> messages);
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
public interface MQConsumerContainer {
|
||||
|
||||
boolean isRunning();
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface MQConsumerHandler {
|
||||
|
||||
MQSubscription subscription();
|
||||
|
||||
void handle(List<MQMessage> messages) throws Exception;
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
public interface MQDeadLetterHandler {
|
||||
|
||||
boolean supports(String topic);
|
||||
|
||||
void handle(MQMessage message, String reason);
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
public interface MQDeadLetterService {
|
||||
|
||||
void deadLetter(MQMessage message, String reason);
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class MQMessage implements Serializable {
|
||||
|
||||
private String messageId;
|
||||
private String topic;
|
||||
private String key;
|
||||
private String body;
|
||||
private Date createdAt;
|
||||
private int retryCount;
|
||||
private String streamKey;
|
||||
private String streamMessageId;
|
||||
private Map<String, String> headers = new LinkedHashMap<>();
|
||||
|
||||
public String getMessageId() {
|
||||
return messageId;
|
||||
}
|
||||
|
||||
public void setMessageId(String messageId) {
|
||||
this.messageId = messageId;
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public void setKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public String getBody() {
|
||||
return body;
|
||||
}
|
||||
|
||||
public void setBody(String body) {
|
||||
this.body = body;
|
||||
}
|
||||
|
||||
public Date getCreatedAt() {
|
||||
return createdAt;
|
||||
}
|
||||
|
||||
public void setCreatedAt(Date createdAt) {
|
||||
this.createdAt = createdAt;
|
||||
}
|
||||
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
|
||||
public void setRetryCount(int retryCount) {
|
||||
this.retryCount = retryCount;
|
||||
}
|
||||
|
||||
public String getStreamKey() {
|
||||
return streamKey;
|
||||
}
|
||||
|
||||
public void setStreamKey(String streamKey) {
|
||||
this.streamKey = streamKey;
|
||||
}
|
||||
|
||||
public String getStreamMessageId() {
|
||||
return streamMessageId;
|
||||
}
|
||||
|
||||
public void setStreamMessageId(String streamMessageId) {
|
||||
this.streamMessageId = streamMessageId;
|
||||
}
|
||||
|
||||
public Map<String, String> getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
|
||||
public void setHeaders(Map<String, String> headers) {
|
||||
this.headers = headers == null ? new LinkedHashMap<>() : new LinkedHashMap<>(headers);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
public interface MQMessageConverter {
|
||||
|
||||
String serialize(MQMessage message);
|
||||
|
||||
MQMessage deserialize(String payload);
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
public interface MQProducer {
|
||||
|
||||
String send(MQMessage message);
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package tech.easyflow.common.mq.core;
|
||||
|
||||
public class MQSubscription {
|
||||
|
||||
private String topic;
|
||||
private String consumerGroup;
|
||||
private int shardCount;
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public String getConsumerGroup() {
|
||||
return consumerGroup;
|
||||
}
|
||||
|
||||
public void setConsumerGroup(String consumerGroup) {
|
||||
this.consumerGroup = consumerGroup;
|
||||
}
|
||||
|
||||
public int getShardCount() {
|
||||
return shardCount;
|
||||
}
|
||||
|
||||
public void setShardCount(int shardCount) {
|
||||
this.shardCount = shardCount;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package tech.easyflow.common.mq.exception;
|
||||
|
||||
public class MQException extends RuntimeException {
|
||||
|
||||
public MQException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public MQException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import tech.easyflow.common.mq.core.MQMessage;
|
||||
import tech.easyflow.common.mq.core.MQMessageConverter;
|
||||
import tech.easyflow.common.mq.exception.MQException;
|
||||
|
||||
public class JacksonMQMessageConverter implements MQMessageConverter {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public JacksonMQMessageConverter(ObjectMapper objectMapper) {
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String serialize(MQMessage message) {
|
||||
try {
|
||||
return objectMapper.writeValueAsString(message);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new MQException("MQ 消息序列化失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public MQMessage deserialize(String payload) {
|
||||
try {
|
||||
return objectMapper.readValue(payload, MQMessage.class);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new MQException("MQ 消息反序列化失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,259 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisStreamCommands;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.MapRecord;
|
||||
import org.springframework.data.redis.connection.stream.PendingMessage;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamReadOptions;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import tech.easyflow.common.mq.config.MQProperties;
|
||||
import tech.easyflow.common.mq.core.MQAcknowledger;
|
||||
import tech.easyflow.common.mq.core.MQConsumerContainer;
|
||||
import tech.easyflow.common.mq.core.MQConsumerHandler;
|
||||
import tech.easyflow.common.mq.core.MQDeadLetterService;
|
||||
import tech.easyflow.common.mq.core.MQMessage;
|
||||
import tech.easyflow.common.mq.core.MQMessageConverter;
|
||||
import tech.easyflow.common.mq.core.MQSubscription;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifecycle {
|
||||
|
||||
private final RedisConnectionFactory redisConnectionFactory;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final MQProperties properties;
|
||||
private final MQMessageConverter messageConverter;
|
||||
private final MQDeadLetterService deadLetterService;
|
||||
private final RedisStreamKeySupport keySupport;
|
||||
private final List<MQConsumerHandler> handlers;
|
||||
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
|
||||
private volatile boolean running;
|
||||
|
||||
public RedisMQConsumerContainer(RedisConnectionFactory redisConnectionFactory,
|
||||
StringRedisTemplate stringRedisTemplate,
|
||||
MQProperties properties,
|
||||
MQMessageConverter messageConverter,
|
||||
MQDeadLetterService deadLetterService,
|
||||
RedisStreamKeySupport keySupport,
|
||||
List<MQConsumerHandler> handlers) {
|
||||
this.redisConnectionFactory = redisConnectionFactory;
|
||||
this.stringRedisTemplate = stringRedisTemplate;
|
||||
this.properties = properties;
|
||||
this.messageConverter = messageConverter;
|
||||
this.deadLetterService = deadLetterService;
|
||||
this.keySupport = keySupport;
|
||||
this.handlers = handlers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (running) {
|
||||
return;
|
||||
}
|
||||
running = true;
|
||||
for (MQConsumerHandler handler : handlers) {
|
||||
MQSubscription subscription = handler.subscription();
|
||||
for (int shard = 0; shard < Math.max(subscription.getShardCount(), 1); shard++) {
|
||||
int currentShard = shard;
|
||||
executorService.submit(() -> consumeLoop(handler, subscription, currentShard));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
running = false;
|
||||
executorService.shutdownNow();
|
||||
try {
|
||||
executorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
stop();
|
||||
}
|
||||
|
||||
private void consumeLoop(MQConsumerHandler handler, MQSubscription subscription, int shard) {
|
||||
String streamKey = keySupport.streamKey(subscription.getTopic(), shard);
|
||||
String consumerName = subscription.getConsumerGroup() + "-" + shard;
|
||||
ensureConsumerGroup(streamKey, subscription.getConsumerGroup());
|
||||
while (running) {
|
||||
try {
|
||||
reclaimPending(streamKey, subscription.getConsumerGroup(), consumerName);
|
||||
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
|
||||
Consumer.from(subscription.getConsumerGroup(), consumerName),
|
||||
StreamReadOptions.empty()
|
||||
.count(properties.getRedis().getConsumerBatchSize())
|
||||
.block(properties.getRedis().getConsumerBlockTimeout()),
|
||||
StreamOffset.create(streamKey, org.springframework.data.redis.connection.stream.ReadOffset.lastConsumed())
|
||||
);
|
||||
if (records == null || records.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
List<MQMessage> messages = toMessages(streamKey, records);
|
||||
if (messages.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
handleMessages(handler, streamKey, subscription.getConsumerGroup(), messages);
|
||||
} catch (Exception ignored) {
|
||||
sleepSilently(1000L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void reclaimPending(String streamKey, String group, String consumerName) {
|
||||
Duration idle = properties.getRedis().getPendingClaimIdle();
|
||||
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
|
||||
RedisStreamCommands.XPendingOptions options = RedisStreamCommands.XPendingOptions
|
||||
.range(Range.unbounded(), (long) properties.getRedis().getConsumerBatchSize());
|
||||
var pendingMessages = connection.streamCommands()
|
||||
.xPending(streamKey.getBytes(StandardCharsets.UTF_8), group, options);
|
||||
if (pendingMessages == null || pendingMessages.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<RecordId> ids = new ArrayList<>();
|
||||
for (PendingMessage pendingMessage : pendingMessages) {
|
||||
if (pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(idle) >= 0) {
|
||||
ids.add(pendingMessage.getId());
|
||||
}
|
||||
}
|
||||
if (ids.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
stringRedisTemplate.opsForStream().claim(
|
||||
streamKey,
|
||||
group,
|
||||
consumerName,
|
||||
idle,
|
||||
ids.toArray(new RecordId[0])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureConsumerGroup(String streamKey, String group) {
|
||||
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
|
||||
connection.streamCommands().xGroupCreate(
|
||||
streamKey.getBytes(StandardCharsets.UTF_8),
|
||||
group,
|
||||
org.springframework.data.redis.connection.stream.ReadOffset.latest(),
|
||||
true
|
||||
);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
private List<MQMessage> toMessages(String streamKey, List<MapRecord<String, Object, Object>> records) {
|
||||
List<MQMessage> messages = new ArrayList<>(records.size());
|
||||
for (MapRecord<String, Object, Object> record : records) {
|
||||
Object payload = record.getValue().get("payload");
|
||||
if (payload == null) {
|
||||
continue;
|
||||
}
|
||||
MQMessage message = messageConverter.deserialize(String.valueOf(payload));
|
||||
message.setStreamKey(streamKey);
|
||||
message.setStreamMessageId(record.getId().getValue());
|
||||
messages.add(message);
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
private void retryOrDeadLetter(List<MQMessage> messages, String reason) {
|
||||
for (MQMessage message : messages) {
|
||||
int retryCount = message.getRetryCount() + 1;
|
||||
message.setRetryCount(retryCount);
|
||||
message.getHeaders().put("lastError", reason == null ? "" : reason);
|
||||
if (retryCount > properties.getRedis().getMaxRetry()) {
|
||||
deadLetterService.deadLetter(message, reason);
|
||||
} else {
|
||||
stringRedisTemplate.opsForStream().add(
|
||||
org.springframework.data.redis.connection.stream.StreamRecords.string(
|
||||
Map.of("payload", messageConverter.serialize(message))
|
||||
).withStreamKey(message.getStreamKey())
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleMessages(MQConsumerHandler handler, String streamKey, String group, List<MQMessage> messages) throws Exception {
|
||||
try {
|
||||
handler.handle(messages);
|
||||
acknowledge(streamKey, group, messages);
|
||||
return;
|
||||
} catch (Exception batchEx) {
|
||||
if (messages.size() == 1) {
|
||||
retryOrDeadLetter(messages, resolveReason(batchEx));
|
||||
acknowledge(streamKey, group, messages);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for (MQMessage message : messages) {
|
||||
try {
|
||||
handler.handle(List.of(message));
|
||||
} catch (Exception singleEx) {
|
||||
retryOrDeadLetter(List.of(message), resolveReason(singleEx));
|
||||
} finally {
|
||||
acknowledge(streamKey, group, List.of(message));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void acknowledge(String streamKey, String group, List<MQMessage> messages) {
|
||||
if (messages == null || messages.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
String[] ids = messages.stream()
|
||||
.map(MQMessage::getStreamMessageId)
|
||||
.filter(Objects::nonNull)
|
||||
.toArray(String[]::new);
|
||||
if (ids.length == 0) {
|
||||
return;
|
||||
}
|
||||
MQAcknowledger acknowledger = records -> stringRedisTemplate.opsForStream().acknowledge(streamKey, group, ids);
|
||||
acknowledger.acknowledge(messages);
|
||||
}
|
||||
|
||||
private String resolveReason(Exception exception) {
|
||||
if (exception == null || exception.getMessage() == null || exception.getMessage().isBlank()) {
|
||||
return "消费失败";
|
||||
}
|
||||
return exception.getMessage();
|
||||
}
|
||||
|
||||
private void sleepSilently(long millis) {
|
||||
try {
|
||||
Thread.sleep(millis);
|
||||
} catch (InterruptedException ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import tech.easyflow.common.mq.core.MQDeadLetterHandler;
|
||||
import tech.easyflow.common.mq.core.MQDeadLetterService;
|
||||
import tech.easyflow.common.mq.core.MQMessage;
|
||||
import tech.easyflow.common.mq.core.MQMessageConverter;
|
||||
import tech.easyflow.common.mq.exception.MQException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class RedisMQDeadLetterService implements MQDeadLetterService {
|
||||
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final MQMessageConverter messageConverter;
|
||||
private final RedisStreamKeySupport keySupport;
|
||||
private final List<MQDeadLetterHandler> handlers;
|
||||
|
||||
public RedisMQDeadLetterService(StringRedisTemplate stringRedisTemplate,
|
||||
MQMessageConverter messageConverter,
|
||||
RedisStreamKeySupport keySupport,
|
||||
List<MQDeadLetterHandler> handlers) {
|
||||
this.stringRedisTemplate = stringRedisTemplate;
|
||||
this.messageConverter = messageConverter;
|
||||
this.keySupport = keySupport;
|
||||
this.handlers = handlers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deadLetter(MQMessage message, String reason) {
|
||||
if (message == null) {
|
||||
return;
|
||||
}
|
||||
message.getHeaders().put("deadLetterReason", reason == null ? "" : reason);
|
||||
String deadLetterKey = keySupport.deadLetterKey(message.getTopic());
|
||||
if (stringRedisTemplate.opsForStream().add(
|
||||
StreamRecords.string(Map.of("payload", messageConverter.serialize(message))).withStreamKey(deadLetterKey)
|
||||
) == null) {
|
||||
throw new MQException("写入死信流失败");
|
||||
}
|
||||
for (MQDeadLetterHandler handler : handlers) {
|
||||
if (handler.supports(message.getTopic())) {
|
||||
handler.handle(message, reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import tech.easyflow.common.mq.config.MQProperties;
|
||||
import tech.easyflow.common.mq.core.MQMessage;
|
||||
import tech.easyflow.common.mq.core.MQProducer;
|
||||
import tech.easyflow.common.mq.core.MQMessageConverter;
|
||||
import tech.easyflow.common.mq.exception.MQException;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
public class RedisMQProducer implements MQProducer {
|
||||
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final MQProperties properties;
|
||||
private final MQMessageConverter messageConverter;
|
||||
private final RedisStreamKeySupport keySupport;
|
||||
|
||||
public RedisMQProducer(StringRedisTemplate stringRedisTemplate,
|
||||
MQProperties properties,
|
||||
MQMessageConverter messageConverter,
|
||||
RedisStreamKeySupport keySupport) {
|
||||
this.stringRedisTemplate = stringRedisTemplate;
|
||||
this.properties = properties;
|
||||
this.messageConverter = messageConverter;
|
||||
this.keySupport = keySupport;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String send(MQMessage message) {
|
||||
if (message == null) {
|
||||
throw new MQException("MQ 消息不能为空");
|
||||
}
|
||||
if (message.getTopic() == null || message.getTopic().isBlank()) {
|
||||
throw new MQException("MQ topic 不能为空");
|
||||
}
|
||||
if (message.getMessageId() == null || message.getMessageId().isBlank()) {
|
||||
message.setMessageId(UUID.randomUUID().toString());
|
||||
}
|
||||
if (message.getCreatedAt() == null) {
|
||||
message.setCreatedAt(new Date());
|
||||
}
|
||||
int shardCount = Math.max(properties.getRedis().getChatPersistShardCount(), 1);
|
||||
int shard = keySupport.resolveShard(message.getKey(), shardCount);
|
||||
String streamKey = keySupport.streamKey(message.getTopic(), shard);
|
||||
RecordId recordId = stringRedisTemplate.opsForStream().add(
|
||||
StreamRecords.string(Map.of("payload", messageConverter.serialize(message))).withStreamKey(streamKey)
|
||||
);
|
||||
if (recordId == null) {
|
||||
throw new MQException("MQ 消息投递失败");
|
||||
}
|
||||
return recordId.getValue();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import tech.easyflow.common.mq.config.MQProperties;
|
||||
|
||||
public class RedisStreamKeySupport {
|
||||
|
||||
private final MQProperties properties;
|
||||
|
||||
public RedisStreamKeySupport(MQProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public String streamKey(String topic, int shard) {
|
||||
return properties.getRedis().getStreamPrefix() + ":" + topic + ":" + String.format("%02d", shard);
|
||||
}
|
||||
|
||||
public String deadLetterKey(String topic) {
|
||||
return properties.getRedis().getStreamPrefix() + ":dead-letter:" + topic;
|
||||
}
|
||||
|
||||
public int resolveShard(String key, int shardCount) {
|
||||
if (shardCount <= 0) {
|
||||
return 0;
|
||||
}
|
||||
return Math.floorMod(key == null ? 0 : key.hashCode(), shardCount);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package tech.easyflow.common.mq.support;
|
||||
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import tech.easyflow.common.mq.exception.MQException;
|
||||
|
||||
public class MQHealthSupport {
|
||||
|
||||
private final RedisConnectionFactory redisConnectionFactory;
|
||||
|
||||
public MQHealthSupport(RedisConnectionFactory redisConnectionFactory) {
|
||||
this.redisConnectionFactory = redisConnectionFactory;
|
||||
}
|
||||
|
||||
public boolean available() {
|
||||
try (var connection = redisConnectionFactory.getConnection()) {
|
||||
String pong = connection.ping();
|
||||
return pong != null;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void assertAvailable() {
|
||||
if (!available()) {
|
||||
throw new MQException("MQ Redis 不可用");
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user