feat: 重构知识库文档导入任务化流程
- 新增上传建单、异步解析、分块处理与异步向量化闭环 - 收口分享页权限、完成态检索过滤与 SSE 局部状态刷新
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
@@ -34,6 +36,8 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifecycle {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RedisMQConsumerContainer.class);
|
||||
|
||||
private final RedisConnectionFactory redisConnectionFactory;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final MQProperties properties;
|
||||
@@ -71,6 +75,8 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
MQSubscription subscription = handler.subscription();
|
||||
for (int shard = 0; shard < Math.max(subscription.getShardCount(), 1); shard++) {
|
||||
int currentShard = shard;
|
||||
LOG.info("启动 MQ 消费线程: topic={}, group={}, shard={}, handler={}",
|
||||
subscription.getTopic(), subscription.getConsumerGroup(), currentShard, handler.getClass().getSimpleName());
|
||||
executorService.submit(() -> consumeLoop(handler, subscription, currentShard));
|
||||
}
|
||||
}
|
||||
@@ -106,6 +112,8 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
String streamKey = keySupport.streamKey(subscription.getTopic(), shard);
|
||||
String consumerName = subscription.getConsumerGroup() + "-" + shard;
|
||||
ensureConsumerGroup(streamKey, subscription.getConsumerGroup());
|
||||
LOG.info("MQ 消费循环已启动: topic={}, group={}, shard={}, consumer={}, streamKey={}, handler={}",
|
||||
subscription.getTopic(), subscription.getConsumerGroup(), shard, consumerName, streamKey, handler.getClass().getSimpleName());
|
||||
while (running) {
|
||||
try {
|
||||
reclaimPending(streamKey, subscription.getConsumerGroup(), consumerName);
|
||||
@@ -123,8 +131,18 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
if (messages.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
LOG.info("MQ 收到消息批次: topic={}, group={}, shard={}, consumer={}, streamKey={}, count={}",
|
||||
subscription.getTopic(), subscription.getConsumerGroup(), shard, consumerName, streamKey, messages.size());
|
||||
handleMessages(handler, streamKey, subscription.getConsumerGroup(), messages);
|
||||
} catch (Exception ignored) {
|
||||
} catch (Exception exception) {
|
||||
LOG.error("MQ 消费循环异常: topic={}, group={}, shard={}, consumer={}, streamKey={}, handler={}",
|
||||
subscription.getTopic(),
|
||||
subscription.getConsumerGroup(),
|
||||
shard,
|
||||
consumerName,
|
||||
streamKey,
|
||||
handler.getClass().getSimpleName(),
|
||||
exception);
|
||||
sleepSilently(1000L);
|
||||
}
|
||||
}
|
||||
@@ -192,8 +210,12 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
message.setRetryCount(retryCount);
|
||||
message.getHeaders().put("lastError", reason == null ? "" : reason);
|
||||
if (retryCount > properties.getRedis().getMaxRetry()) {
|
||||
LOG.error("MQ 消息超过最大重试次数,进入死信队列: topic={}, messageId={}, streamKey={}, retryCount={}, reason={}",
|
||||
message.getTopic(), message.getMessageId(), message.getStreamKey(), retryCount, reason);
|
||||
deadLetterService.deadLetter(message, reason);
|
||||
} else {
|
||||
LOG.warn("MQ 消息消费失败,准备重试: topic={}, messageId={}, streamKey={}, retryCount={}, reason={}",
|
||||
message.getTopic(), message.getMessageId(), message.getStreamKey(), retryCount, reason);
|
||||
stringRedisTemplate.opsForStream().add(
|
||||
org.springframework.data.redis.connection.stream.StreamRecords.string(
|
||||
Map.of("payload", messageConverter.serialize(message))
|
||||
@@ -205,10 +227,16 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
|
||||
private void handleMessages(MQConsumerHandler handler, String streamKey, String group, List<MQMessage> messages) throws Exception {
|
||||
try {
|
||||
LOG.info("MQ 开始批量处理消息: group={}, streamKey={}, count={}, handler={}",
|
||||
group, streamKey, messages.size(), handler.getClass().getSimpleName());
|
||||
handler.handle(messages);
|
||||
acknowledge(streamKey, group, messages);
|
||||
LOG.info("MQ 批量处理消息完成: group={}, streamKey={}, count={}, handler={}",
|
||||
group, streamKey, messages.size(), handler.getClass().getSimpleName());
|
||||
return;
|
||||
} catch (Exception batchEx) {
|
||||
LOG.error("MQ 批量处理消息失败,准备降级单条处理: group={}, streamKey={}, count={}, handler={}",
|
||||
group, streamKey, messages.size(), handler.getClass().getSimpleName(), batchEx);
|
||||
if (messages.size() == 1) {
|
||||
retryOrDeadLetter(messages, resolveReason(batchEx));
|
||||
acknowledge(streamKey, group, messages);
|
||||
@@ -218,7 +246,11 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
|
||||
for (MQMessage message : messages) {
|
||||
try {
|
||||
LOG.info("MQ 开始单条处理消息: group={}, streamKey={}, messageId={}, handler={}",
|
||||
group, streamKey, message.getMessageId(), handler.getClass().getSimpleName());
|
||||
handler.handle(List.of(message));
|
||||
LOG.info("MQ 单条处理消息完成: group={}, streamKey={}, messageId={}, handler={}",
|
||||
group, streamKey, message.getMessageId(), handler.getClass().getSimpleName());
|
||||
} catch (Exception singleEx) {
|
||||
retryOrDeadLetter(List.of(message), resolveReason(singleEx));
|
||||
} finally {
|
||||
@@ -240,6 +272,7 @@ public class RedisMQConsumerContainer implements MQConsumerContainer, SmartLifec
|
||||
}
|
||||
MQAcknowledger acknowledger = records -> stringRedisTemplate.opsForStream().acknowledge(streamKey, group, ids);
|
||||
acknowledger.acknowledge(messages);
|
||||
LOG.info("MQ 消息确认完成: group={}, streamKey={}, count={}", group, streamKey, ids.length);
|
||||
}
|
||||
|
||||
private String resolveReason(Exception exception) {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package tech.easyflow.common.mq.redis;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
@@ -15,6 +17,8 @@ import java.util.UUID;
|
||||
|
||||
public class RedisMQProducer implements MQProducer {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RedisMQProducer.class);
|
||||
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final MQProperties properties;
|
||||
private final MQMessageConverter messageConverter;
|
||||
@@ -47,12 +51,16 @@ public class RedisMQProducer implements MQProducer {
|
||||
int shardCount = Math.max(properties.getRedis().getChatPersistShardCount(), 1);
|
||||
int shard = keySupport.resolveShard(message.getKey(), shardCount);
|
||||
String streamKey = keySupport.streamKey(message.getTopic(), shard);
|
||||
LOG.info("MQ 开始投递消息: topic={}, messageId={}, key={}, shard={}, streamKey={}",
|
||||
message.getTopic(), message.getMessageId(), message.getKey(), shard, streamKey);
|
||||
RecordId recordId = stringRedisTemplate.opsForStream().add(
|
||||
StreamRecords.string(Map.of("payload", messageConverter.serialize(message))).withStreamKey(streamKey)
|
||||
);
|
||||
if (recordId == null) {
|
||||
throw new MQException("MQ 消息投递失败");
|
||||
}
|
||||
LOG.info("MQ 消息投递完成: topic={}, messageId={}, key={}, shard={}, streamKey={}, recordId={}",
|
||||
message.getTopic(), message.getMessageId(), message.getKey(), shard, streamKey, recordId.getValue());
|
||||
return recordId.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user