feat: 全新智能体功能
- 基于先进智能体框架,增加智能体编排功能 - 增加智能体聊天,并对接持久化
This commit is contained in:
@@ -20,6 +20,7 @@ public class RedisLockExecutor {
|
||||
private static final long RETRY_INTERVAL_MILLIS = 50L;
|
||||
|
||||
private static final DefaultRedisScript<Long> RELEASE_LOCK_SCRIPT;
|
||||
private static final DefaultRedisScript<Long> RENEW_LOCK_SCRIPT;
|
||||
|
||||
static {
|
||||
RELEASE_LOCK_SCRIPT = new DefaultRedisScript<>();
|
||||
@@ -29,6 +30,13 @@ public class RedisLockExecutor {
|
||||
"else return 0 end"
|
||||
);
|
||||
RELEASE_LOCK_SCRIPT.setResultType(Long.class);
|
||||
RENEW_LOCK_SCRIPT = new DefaultRedisScript<>();
|
||||
RENEW_LOCK_SCRIPT.setScriptText(
|
||||
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
|
||||
"return redis.call('pexpire', KEYS[1], ARGV[2]) " +
|
||||
"else return 0 end"
|
||||
);
|
||||
RENEW_LOCK_SCRIPT.setResultType(Long.class);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
@@ -42,6 +50,26 @@ public class RedisLockExecutor {
|
||||
}
|
||||
|
||||
public <T> T executeWithLock(String lockKey, Duration waitTimeout, Duration leaseTimeout, Supplier<T> task) {
|
||||
LockHandle handle = acquire(lockKey, waitTimeout, leaseTimeout);
|
||||
try {
|
||||
return task.get();
|
||||
} finally {
|
||||
handle.release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取显式释放的分布式锁句柄。
|
||||
*
|
||||
* <p>长连接、SSE 或异步任务不能使用 callback 型锁,否则 callback 返回后锁会被提前释放。
|
||||
* 该方法返回 owner token 绑定的句柄,由调用方在运行完成、失败或取消时显式释放。</p>
|
||||
*
|
||||
* @param lockKey 锁 key
|
||||
* @param waitTimeout 等待时间
|
||||
* @param leaseTimeout 租约时间
|
||||
* @return 锁句柄
|
||||
*/
|
||||
public LockHandle acquire(String lockKey, Duration waitTimeout, Duration leaseTimeout) {
|
||||
String lockValue = UUID.randomUUID().toString();
|
||||
boolean acquired = false;
|
||||
long deadline = System.nanoTime() + waitTimeout.toNanos();
|
||||
@@ -58,23 +86,87 @@ public class RedisLockExecutor {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException("等待分布式锁被中断,lockKey=" + lockKey, e);
|
||||
}
|
||||
|
||||
if (!acquired) {
|
||||
throw new IllegalStateException("获取分布式锁失败,请稍后重试,lockKey=" + lockKey);
|
||||
}
|
||||
|
||||
try {
|
||||
return task.get();
|
||||
} finally {
|
||||
releaseLock(lockKey, lockValue);
|
||||
}
|
||||
return new LockHandle(lockKey, lockValue, leaseTimeout);
|
||||
}
|
||||
|
||||
private void releaseLock(String lockKey, String lockValue) {
|
||||
/**
|
||||
* 按 owner token 释放锁。
|
||||
*
|
||||
* @param lockKey 锁 key
|
||||
* @param lockValue owner token
|
||||
*/
|
||||
public void releaseLock(String lockKey, String lockValue) {
|
||||
try {
|
||||
stringRedisTemplate.execute(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), lockValue);
|
||||
} catch (Exception e) {
|
||||
log.warn("释放分布式锁失败,lockKey={}", lockKey, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 按 owner token 续期锁。
|
||||
*
|
||||
* @param lockKey 锁 key
|
||||
* @param lockValue owner token
|
||||
* @param leaseTimeout 新租约时间
|
||||
* @return 续期成功时为 true
|
||||
*/
|
||||
public boolean renewLock(String lockKey, String lockValue, Duration leaseTimeout) {
|
||||
try {
|
||||
Long result = stringRedisTemplate.execute(RENEW_LOCK_SCRIPT, Collections.singletonList(lockKey),
|
||||
lockValue, String.valueOf(leaseTimeout.toMillis()));
|
||||
return Long.valueOf(1L).equals(result);
|
||||
} catch (Exception e) {
|
||||
log.warn("续期分布式锁失败,lockKey={}", lockKey, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 显式分布式锁句柄。
|
||||
*/
|
||||
public final class LockHandle implements AutoCloseable {
|
||||
|
||||
private final String lockKey;
|
||||
private final String lockValue;
|
||||
private final Duration leaseTimeout;
|
||||
private volatile boolean released;
|
||||
|
||||
private LockHandle(String lockKey, String lockValue, Duration leaseTimeout) {
|
||||
this.lockKey = lockKey;
|
||||
this.lockValue = lockValue;
|
||||
this.leaseTimeout = leaseTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* 续期当前锁。
|
||||
*
|
||||
* @return 续期成功时为 true
|
||||
*/
|
||||
public boolean renew() {
|
||||
if (released) {
|
||||
return false;
|
||||
}
|
||||
return renewLock(lockKey, lockValue, leaseTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放当前锁。
|
||||
*/
|
||||
public void release() {
|
||||
if (released) {
|
||||
return;
|
||||
}
|
||||
released = true;
|
||||
releaseLock(lockKey, lockValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,8 @@ public enum ChatType {
|
||||
TOOL_CALL,
|
||||
TOOL_RESULT,
|
||||
STATUS,
|
||||
CITATIONS,
|
||||
SESSION_CREATED,
|
||||
ERROR,
|
||||
FORM_REQUEST,
|
||||
FORM_CANCEL,
|
||||
|
||||
@@ -4,9 +4,10 @@ import com.alibaba.fastjson.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.web.context.request.async.AsyncRequestNotUsableException;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
import tech.easyflow.common.util.StringUtil;
|
||||
import tech.easyflow.common.util.SpringContextUtil;
|
||||
import tech.easyflow.common.util.StringUtil;
|
||||
import tech.easyflow.core.chat.protocol.ChatEnvelope;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -90,15 +91,21 @@ public class ChatSseEmitter {
|
||||
.data(json)
|
||||
);
|
||||
return true;
|
||||
} catch (IllegalStateException e) {
|
||||
closed.compareAndSet(false, true);
|
||||
LOG.error("ChatSseEmitter send failed(event={}), message={}, exception={}", event, e.getMessage(), e.toString(), e);
|
||||
} catch (AsyncRequestNotUsableException e) {
|
||||
markDisconnected(event, e);
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
LOG.error("ChatSseEmitter send io failed(event={}), message={}, exception={}", event, e.getMessage(), e.toString(), e);
|
||||
safeCompleteWithError(e);
|
||||
markDisconnected(event, e);
|
||||
return false;
|
||||
} catch (IllegalStateException e) {
|
||||
closed.compareAndSet(false, true);
|
||||
LOG.warn("ChatSseEmitter send failed(event={}), message={}, exception={}", event, e.getMessage(), e.toString());
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
if (isClientDisconnected(e)) {
|
||||
markDisconnected(event, e);
|
||||
return false;
|
||||
}
|
||||
LOG.error("ChatSseEmitter send unexpected failed(event={}), message={}, exception={}", event, e.getMessage(), e.toString(), e);
|
||||
safeCompleteWithError(e);
|
||||
return false;
|
||||
@@ -165,4 +172,31 @@ public class ChatSseEmitter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void markDisconnected(String event, Throwable ex) {
|
||||
closed.compareAndSet(false, true);
|
||||
LOG.warn("ChatSseEmitter client disconnected(event={}), message={}, exception={}",
|
||||
event, ex == null ? null : ex.getMessage(), ex == null ? null : ex.toString());
|
||||
}
|
||||
|
||||
private boolean isClientDisconnected(Throwable ex) {
|
||||
Throwable current = ex;
|
||||
while (current != null) {
|
||||
if (current instanceof AsyncRequestNotUsableException || current instanceof IOException) {
|
||||
return true;
|
||||
}
|
||||
String message = current.getMessage();
|
||||
if (message != null) {
|
||||
String lowerMessage = message.toLowerCase();
|
||||
if (lowerMessage.contains("broken pipe")
|
||||
|| lowerMessage.contains("connection reset")
|
||||
|| lowerMessage.contains("disconnected client")
|
||||
|| lowerMessage.contains("response not usable")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
current = current.getCause();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user