负载均衡深度改造,增加分布式锁,表唯一约束等

This commit is contained in:
2026-02-24 11:17:33 +08:00
parent 8d711dc3a2
commit 148a08a3f1
27 changed files with 891 additions and 182 deletions

View File

@@ -9,6 +9,7 @@ import com.easyagents.flow.core.chain.listener.ChainEventListener;
import com.easyagents.flow.core.chain.repository.NodeStateField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.ai.entity.WorkflowExecResult;
@@ -63,6 +64,10 @@ public class ChainEventListenerForSave implements ChainEventListener {
ChainState state = chain.getState();
Workflow workflow = workflowService.getById(definition.getId());
String instanceId = state.getInstanceId();
WorkflowExecResult existed = workflowExecResultService.getByExecKey(instanceId);
if (existed != null) {
return;
}
WorkflowExecResult record = new WorkflowExecResult();
record.setExecKey(instanceId);
record.setWorkflowId(workflow.getId());
@@ -74,7 +79,12 @@ public class ChainEventListenerForSave implements ChainEventListener {
record.setStatus(state.getStatus().getValue());
record.setCreatedKey(WorkFlowUtil.USER_KEY);
record.setCreatedBy(WorkFlowUtil.getOperator(chain).getId().toString());
workflowExecResultService.save(record);
try {
workflowExecResultService.save(record);
} catch (DuplicateKeyException e) {
// 多节点重试时可能并发写同一 exec_key按幂等处理。
log.debug("exec result already exists, execKey={}", instanceId, e);
}
}
private void handleChainEndEvent(ChainEndEvent event, Chain chain) {

View File

@@ -5,10 +5,16 @@ import tech.easyflow.ai.mapper.BotDocumentCollectionMapper;
import tech.easyflow.ai.service.BotDocumentCollectionService;
import com.mybatisflex.spring.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import tech.easyflow.common.cache.RedisLockExecutor;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.time.Duration;
import com.mybatisflex.core.query.QueryWrapper;
/**
@@ -20,6 +26,13 @@ import com.mybatisflex.core.query.QueryWrapper;
@Service
public class BotDocumentCollectionServiceImpl extends ServiceImpl<BotDocumentCollectionMapper, BotDocumentCollection> implements BotDocumentCollectionService {
private static final String BOT_BINDING_LOCK_KEY_PREFIX = "easyflow:lock:bot:binding:";
private static final Duration LOCK_WAIT_TIMEOUT = Duration.ofSeconds(2);
private static final Duration LOCK_LEASE_TIMEOUT = Duration.ofSeconds(10);
@Resource
private RedisLockExecutor redisLockExecutor;
@Override
public List<BotDocumentCollection> listByBotId(BigInteger botId) {
@@ -30,15 +43,30 @@ public class BotDocumentCollectionServiceImpl extends ServiceImpl<BotDocumentCol
}
@Override
@Transactional
public void saveBotAndKnowledge(BigInteger botId, BigInteger[] knowledgeIds) {
this.remove(QueryWrapper.create().eq(BotDocumentCollection::getBotId, botId));
List<BotDocumentCollection> list = new ArrayList<>(knowledgeIds.length);
for (BigInteger knowledgeId : knowledgeIds) {
BotDocumentCollection botDocumentCollection = new BotDocumentCollection();
botDocumentCollection.setBotId(botId);
botDocumentCollection.setDocumentCollectionId(knowledgeId);
list.add(botDocumentCollection);
}
this.saveBatch(list);
redisLockExecutor.executeWithLock(BOT_BINDING_LOCK_KEY_PREFIX + botId, LOCK_WAIT_TIMEOUT, LOCK_LEASE_TIMEOUT, () -> {
this.remove(QueryWrapper.create().eq(BotDocumentCollection::getBotId, botId));
Set<BigInteger> uniqueKnowledgeIds = new LinkedHashSet<>();
if (knowledgeIds != null) {
for (BigInteger knowledgeId : knowledgeIds) {
if (knowledgeId != null) {
uniqueKnowledgeIds.add(knowledgeId);
}
}
}
if (uniqueKnowledgeIds.isEmpty()) {
return;
}
List<BotDocumentCollection> list = new ArrayList<>(uniqueKnowledgeIds.size());
for (BigInteger knowledgeId : uniqueKnowledgeIds) {
BotDocumentCollection botDocumentCollection = new BotDocumentCollection();
botDocumentCollection.setBotId(botId);
botDocumentCollection.setDocumentCollectionId(knowledgeId);
list.add(botDocumentCollection);
}
this.saveBatch(list);
});
}
}

View File

@@ -7,11 +7,15 @@ import tech.easyflow.ai.entity.BotMcp;
import tech.easyflow.ai.mapper.BotMcpMapper;
import tech.easyflow.ai.service.BotMcpService;
import org.springframework.stereotype.Service;
import tech.easyflow.common.cache.RedisLockExecutor;
import javax.annotation.Resource;
import java.time.Duration;
import java.math.BigInteger;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 服务层实现。
@@ -22,29 +26,53 @@ import java.util.Map;
@Service
public class BotMcpServiceImpl extends ServiceImpl<BotMcpMapper, BotMcp> implements BotMcpService{
private static final String BOT_BINDING_LOCK_KEY_PREFIX = "easyflow:lock:bot:binding:";
private static final Duration LOCK_WAIT_TIMEOUT = Duration.ofSeconds(2);
private static final Duration LOCK_LEASE_TIMEOUT = Duration.ofSeconds(10);
@Resource
private RedisLockExecutor redisLockExecutor;
@Override
@Transactional
public void updateBotMcpToolIds(BigInteger botId, List<Map<String, List<List<String>>>> mcpSelectedData) {
// 删除原来绑定的mcp
this.remove(QueryWrapper.create().eq(BotMcp::getBotId, botId));
for (Map<String, List<List<String>>> mcpItem : mcpSelectedData) {
for (Map.Entry<String, List<List<String>>> entry : mcpItem.entrySet()) {
String mcpId = entry.getKey(); // 上一级id
List<List<String>> toolList = entry.getValue(); // 包含name和description的二维数组
redisLockExecutor.executeWithLock(BOT_BINDING_LOCK_KEY_PREFIX + botId, LOCK_WAIT_TIMEOUT, LOCK_LEASE_TIMEOUT, () -> {
this.remove(QueryWrapper.create().eq(BotMcp::getBotId, botId));
if (mcpSelectedData == null || mcpSelectedData.isEmpty()) {
return;
}
// 遍历每个工具的[name, description]
for (List<String> toolInfo : toolList) {
String toolName = toolInfo.get(0); // 工具名称
String toolDesc = toolInfo.get(1); // 工具描述
System.out.println("工具名称:" + toolName + ",描述:" + toolDesc);
BotMcp botMcp = new BotMcp();
botMcp.setBotId(botId);
botMcp.setMcpId(new BigInteger(mcpId));
botMcp.setMcpToolName(toolName);
botMcp.setMcpToolDescription(toolDesc);
this.save(botMcp);
Set<String> uniqueTools = new LinkedHashSet<>();
for (Map<String, List<List<String>>> mcpItem : mcpSelectedData) {
for (Map.Entry<String, List<List<String>>> entry : mcpItem.entrySet()) {
String mcpId = entry.getKey();
List<List<String>> toolList = entry.getValue();
if (toolList == null || toolList.isEmpty()) {
continue;
}
for (List<String> toolInfo : toolList) {
if (toolInfo == null || toolInfo.size() < 2) {
continue;
}
String toolName = toolInfo.get(0);
String toolDesc = toolInfo.get(1);
if (toolName == null) {
continue;
}
String uniqueKey = mcpId + "|" + toolName;
if (!uniqueTools.add(uniqueKey)) {
continue;
}
BotMcp botMcp = new BotMcp();
botMcp.setBotId(botId);
botMcp.setMcpId(new BigInteger(mcpId));
botMcp.setMcpToolName(toolName);
botMcp.setMcpToolDescription(toolDesc);
this.save(botMcp);
}
}
}
}
});
}
}

View File

@@ -3,16 +3,21 @@ package tech.easyflow.ai.service.impl;
import com.mybatisflex.core.query.QueryWrapper;
import com.mybatisflex.spring.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import tech.easyflow.ai.entity.BotPlugin;
import tech.easyflow.ai.entity.Plugin;
import tech.easyflow.ai.mapper.BotPluginMapper;
import tech.easyflow.ai.mapper.PluginMapper;
import tech.easyflow.ai.service.BotPluginService;
import tech.easyflow.common.cache.RedisLockExecutor;
import javax.annotation.Resource;
import java.time.Duration;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import static tech.easyflow.ai.entity.table.BotPluginTableDef.BOT_PLUGIN;
@@ -25,12 +30,19 @@ import static tech.easyflow.ai.entity.table.BotPluginTableDef.BOT_PLUGIN;
@Service
public class BotPluginServiceImpl extends ServiceImpl<BotPluginMapper, BotPlugin> implements BotPluginService {
private static final String BOT_BINDING_LOCK_KEY_PREFIX = "easyflow:lock:bot:binding:";
private static final Duration LOCK_WAIT_TIMEOUT = Duration.ofSeconds(2);
private static final Duration LOCK_LEASE_TIMEOUT = Duration.ofSeconds(10);
@Resource
private BotPluginMapper botPluginMapper;
@Resource
private PluginMapper pluginMapper;
@Resource
private RedisLockExecutor redisLockExecutor;
@Override
public List<Plugin> getList(String botId) {
QueryWrapper w = QueryWrapper.create();
@@ -58,15 +70,31 @@ public class BotPluginServiceImpl extends ServiceImpl<BotPluginMapper, BotPlugin
}
@Override
@Transactional
public void saveBotAndPluginTool(BigInteger botId, BigInteger[] pluginToolIds) {
this.remove(QueryWrapper.create().eq(BotPlugin::getBotId, botId));
List<BotPlugin> list = new ArrayList<>(pluginToolIds.length);
for (BigInteger pluginToolId : pluginToolIds) {
BotPlugin aiBotPluginTool = new BotPlugin();
aiBotPluginTool.setBotId(botId);
aiBotPluginTool.setPluginItemId(pluginToolId);
list.add(aiBotPluginTool);
}
this.saveBatch(list);
redisLockExecutor.executeWithLock(BOT_BINDING_LOCK_KEY_PREFIX + botId, LOCK_WAIT_TIMEOUT, LOCK_LEASE_TIMEOUT, () -> {
this.remove(QueryWrapper.create().eq(BotPlugin::getBotId, botId));
Set<BigInteger> uniquePluginToolIds = new LinkedHashSet<>();
if (pluginToolIds != null) {
for (BigInteger pluginToolId : pluginToolIds) {
if (pluginToolId != null) {
uniquePluginToolIds.add(pluginToolId);
}
}
}
if (uniquePluginToolIds.isEmpty()) {
return;
}
List<BotPlugin> list = new ArrayList<>(uniquePluginToolIds.size());
for (BigInteger pluginToolId : uniquePluginToolIds) {
BotPlugin aiBotPluginTool = new BotPlugin();
aiBotPluginTool.setBotId(botId);
aiBotPluginTool.setPluginItemId(pluginToolId);
list.add(aiBotPluginTool);
}
this.saveBatch(list);
});
}
}

View File

@@ -5,10 +5,16 @@ import tech.easyflow.ai.mapper.BotWorkflowMapper;
import tech.easyflow.ai.service.BotWorkflowService;
import com.mybatisflex.spring.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import tech.easyflow.common.cache.RedisLockExecutor;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.time.Duration;
import com.mybatisflex.core.query.QueryWrapper;
/**
@@ -20,6 +26,13 @@ import com.mybatisflex.core.query.QueryWrapper;
@Service
public class BotWorkflowServiceImpl extends ServiceImpl<BotWorkflowMapper, BotWorkflow> implements BotWorkflowService {
private static final String BOT_BINDING_LOCK_KEY_PREFIX = "easyflow:lock:bot:binding:";
private static final Duration LOCK_WAIT_TIMEOUT = Duration.ofSeconds(2);
private static final Duration LOCK_LEASE_TIMEOUT = Duration.ofSeconds(10);
@Resource
private RedisLockExecutor redisLockExecutor;
@Override
public List<BotWorkflow> listByBotId(BigInteger botId) {
@@ -30,15 +43,31 @@ public class BotWorkflowServiceImpl extends ServiceImpl<BotWorkflowMapper, BotWo
}
@Override
@Transactional
public void saveBotAndWorkflowTool(BigInteger botId, BigInteger[] workflowIds) {
this.remove(QueryWrapper.create().eq(BotWorkflow::getBotId, botId));
List<BotWorkflow> list = new ArrayList<>(workflowIds.length);
for (BigInteger workflowId : workflowIds) {
BotWorkflow botWorkflow = new BotWorkflow();
botWorkflow.setBotId(botId);
botWorkflow.setWorkflowId(workflowId);
list.add(botWorkflow);
}
this.saveBatch(list);
redisLockExecutor.executeWithLock(BOT_BINDING_LOCK_KEY_PREFIX + botId, LOCK_WAIT_TIMEOUT, LOCK_LEASE_TIMEOUT, () -> {
this.remove(QueryWrapper.create().eq(BotWorkflow::getBotId, botId));
Set<BigInteger> uniqueWorkflowIds = new LinkedHashSet<>();
if (workflowIds != null) {
for (BigInteger workflowId : workflowIds) {
if (workflowId != null) {
uniqueWorkflowIds.add(workflowId);
}
}
}
if (uniqueWorkflowIds.isEmpty()) {
return;
}
List<BotWorkflow> list = new ArrayList<>(uniqueWorkflowIds.size());
for (BigInteger workflowId : uniqueWorkflowIds) {
BotWorkflow botWorkflow = new BotWorkflow();
botWorkflow.setBotId(botId);
botWorkflow.setWorkflowId(workflowId);
list.add(botWorkflow);
}
this.saveBatch(list);
});
}
}