feat: 归档 XL10 异步工具业务编译层

- 将 AgentDefinitionCompiler 升级为 AgentRuntimeCompiler

- 接入 Workflow 和 Plugin 的同步/异步工具编译与 Redis 任务态

- 增加异步执行配置开关、聊天时间线聚合和后端测试
This commit is contained in:
2026-06-04 15:23:56 +08:00
parent 1ea863cb2c
commit c316eff5be
26 changed files with 2859 additions and 62 deletions

View File

@@ -67,6 +67,11 @@ public class AgentRuntimeProperties {
*/ */
private Duration lockRenewInterval = Duration.ofMinutes(1); private Duration lockRenewInterval = Duration.ofMinutes(1);
/**
* Agent 异步工具任务 Redis 运行态 TTL。
*/
private Duration asyncToolTaskTtl = Duration.ofHours(24);
/** /**
* 获取 Redis 热态 session 缓存 TTL。 * 获取 Redis 热态 session 缓存 TTL。
* *
@@ -258,6 +263,24 @@ public class AgentRuntimeProperties {
this.lockRenewInterval = lockRenewInterval == null ? Duration.ofMinutes(1) : lockRenewInterval; this.lockRenewInterval = lockRenewInterval == null ? Duration.ofMinutes(1) : lockRenewInterval;
} }
/**
* 获取 Agent 异步工具任务 Redis 运行态 TTL。
*
* @return 任务 TTL
*/
public Duration getAsyncToolTaskTtl() {
return asyncToolTaskTtl;
}
/**
* 设置 Agent 异步工具任务 Redis 运行态 TTL。
*
* @param asyncToolTaskTtl 任务 TTL
*/
public void setAsyncToolTaskTtl(Duration asyncToolTaskTtl) {
this.asyncToolTaskTtl = asyncToolTaskTtl == null ? Duration.ofHours(24) : asyncToolTaskTtl;
}
private static String defaultInstanceId() { private static String defaultInstanceId() {
String envInstanceId = System.getenv("EASYFLOW_INSTANCE_ID"); String envInstanceId = System.getenv("EASYFLOW_INSTANCE_ID");
if (StringUtils.hasText(envInstanceId)) { if (StringUtils.hasText(envInstanceId)) {

View File

@@ -70,7 +70,7 @@ public class AgentRunService {
@Resource @Resource
private AgentService agentService; private AgentService agentService;
@Resource @Resource
private AgentDefinitionCompiler agentDefinitionCompiler; private AgentRuntimeCompiler agentRuntimeCompiler;
@Resource @Resource
private AgentRuntimeFactory agentRuntimeFactory; private AgentRuntimeFactory agentRuntimeFactory;
@Resource @Resource
@@ -363,7 +363,7 @@ public class AgentRunService {
if (persistChatlog) { if (persistChatlog) {
bindAgentSession(agent, runtimeSessionId, chatContext); bindAgentSession(agent, runtimeSessionId, chatContext);
} }
AgentRuntimeBundle bundle = agentDefinitionCompiler.compile(agent); AgentRuntimeBundle bundle = agentRuntimeCompiler.compile(agent);
AgentRuntime runtime = agentRuntimeFactory.create(); AgentRuntime runtime = agentRuntimeFactory.create();
// 会话初始化请求 // 会话初始化请求
AgentInitRequest request = new AgentInitRequest(); AgentInitRequest request = new AgentInitRequest();
@@ -554,16 +554,23 @@ public class AgentRunService {
} }
return; return;
} }
if (isAsyncToolEvent(event.getEventType())) {
if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, asyncToolChatType(event), buildAsyncToolEventPayload(event))) {
cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog);
}
return;
}
if (event.getEventType() == AgentRuntimeEventType.TOOL_CALL) { if (event.getEventType() == AgentRuntimeEventType.TOOL_CALL) {
LOG.info("Agent runtime tool call, requestId={}, toolCallId={}, payload={}, metadata={}", LOG.info("Agent runtime tool call, requestId={}, toolCallId={}, payload={}, metadata={}",
requestId, event.getToolCallId(), event.getPayload(), event.getMetadata()); requestId, event.getToolCallId(), event.getPayload(), event.getMetadata());
Map<String, Object> toolPayload = buildToolEventPayload(event);
assistantAccumulator.appendToolCall( assistantAccumulator.appendToolCall(
firstText(event.getToolCallId(), stringPayload(event, "toolCallId")), firstText(stringValue(toolPayload, "toolCallId"), event.getToolCallId()),
firstText(stringPayload(event, "toolName"), stringPayload(event, "name")), firstText(stringValue(toolPayload, "toolName"), stringValue(toolPayload, "name")),
stringPayload(event, "toolDisplayName"), stringValue(toolPayload, "toolDisplayName"),
firstNonNull(event.getPayload().get("input"), event.getPayload().get("toolInput")) firstNonNull(toolPayload.get("input"), toolPayload.get("toolInput"))
); );
if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_CALL, buildToolEventPayload(event))) { if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_CALL, toolPayload)) {
cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog); cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog);
} }
return; return;
@@ -571,14 +578,15 @@ public class AgentRunService {
if (event.getEventType() == AgentRuntimeEventType.TOOL_RESULT) { if (event.getEventType() == AgentRuntimeEventType.TOOL_RESULT) {
LOG.info("Agent runtime tool result, requestId={}, toolCallId={}, payload={}, metadata={}", LOG.info("Agent runtime tool result, requestId={}, toolCallId={}, payload={}, metadata={}",
requestId, event.getToolCallId(), event.getPayload(), event.getMetadata()); requestId, event.getToolCallId(), event.getPayload(), event.getMetadata());
Map<String, Object> toolPayload = buildToolEventPayload(event);
assistantAccumulator.appendToolResult( assistantAccumulator.appendToolResult(
firstText(event.getToolCallId(), stringPayload(event, "toolCallId")), firstText(stringValue(toolPayload, "toolCallId"), event.getToolCallId()),
firstText(stringPayload(event, "toolName"), stringPayload(event, "name")), firstText(stringValue(toolPayload, "toolName"), stringValue(toolPayload, "name")),
stringPayload(event, "toolDisplayName"), stringValue(toolPayload, "toolDisplayName"),
firstNonNull(firstNonNull(event.getPayload().get("output"), event.getPayload().get("result")), firstNonNull(firstNonNull(toolPayload.get("output"), toolPayload.get("result")),
event.getPayload().get("text")) toolPayload.get("text"))
); );
if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_RESULT, buildToolEventPayload(event))) { if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_RESULT, toolPayload)) {
cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog); cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog);
} }
return; return;
@@ -1181,9 +1189,81 @@ public class AgentRunService {
if (toolCallId != null && !toolCallId.isBlank()) { if (toolCallId != null && !toolCallId.isBlank()) {
payload.put("toolCallId", toolCallId); payload.put("toolCallId", toolCallId);
} }
if (Boolean.TRUE.equals(event.getMetadata().get("asyncTool"))) {
enrichAsyncToolPayload(payload, event.getMetadata(), toolCallId);
String taskId = stringValue(payload, "taskId");
if (taskId != null && !taskId.isBlank()) {
payload.put("toolCallId", taskId);
}
}
return payload; return payload;
} }
private boolean isAsyncToolEvent(AgentRuntimeEventType type) {
return type == AgentRuntimeEventType.ASYNC_TOOL_SUBMITTED
|| type == AgentRuntimeEventType.ASYNC_TOOL_OBSERVED
|| type == AgentRuntimeEventType.ASYNC_TOOL_RESULT
|| type == AgentRuntimeEventType.ASYNC_TOOL_CANCELLED
|| type == AgentRuntimeEventType.ASYNC_TOOL_LISTED
|| type == AgentRuntimeEventType.ASYNC_TOOL_FAILED;
}
private ChatType asyncToolChatType(AgentRuntimeEvent event) {
String status = stringPayload(event, "status");
if ("SUCCEEDED".equalsIgnoreCase(status)
|| "FAILED".equalsIgnoreCase(status)
|| "CANCELLED".equalsIgnoreCase(status)
|| "TIMEOUT".equalsIgnoreCase(status)
|| event.getEventType() == AgentRuntimeEventType.ASYNC_TOOL_RESULT
|| event.getEventType() == AgentRuntimeEventType.ASYNC_TOOL_FAILED
|| event.getEventType() == AgentRuntimeEventType.ASYNC_TOOL_CANCELLED) {
return ChatType.TOOL_RESULT;
}
return ChatType.TOOL_CALL;
}
private Map<String, Object> buildAsyncToolEventPayload(AgentRuntimeEvent event) {
Map<String, Object> payload = new LinkedHashMap<>(event.getPayload() == null ? Map.of() : event.getPayload());
String taskId = stringValue(payload, "taskId");
String toolCallId = firstText(taskId, event.getToolCallId());
if (toolCallId != null && !toolCallId.isBlank()) {
payload.put("toolCallId", toolCallId);
}
enrichAsyncToolPayload(payload, event.getMetadata(), toolCallId);
return payload;
}
private void enrichAsyncToolPayload(Map<String, Object> payload, Map<String, Object> metadata, String fallbackId) {
Map<String, Object> safeMetadata = metadata == null ? Map.of() : metadata;
payload.put("asyncTool", true);
putIfPresent(payload, "asyncToolName", firstText(stringValue(payload, "asyncToolName"), stringValue(safeMetadata, "asyncToolName")));
putIfPresent(payload, "phase", firstText(stringValue(payload, "phase"), stringValue(safeMetadata, "asyncToolPhase")));
putIfPresent(payload, "taskId", firstText(stringValue(payload, "taskId"), stringValue(safeMetadata, "taskId")));
putIfPresent(payload, "status", firstText(stringValue(payload, "status"), stringValue(safeMetadata, "status")));
String displayName = firstText(stringValue(payload, "toolDisplayName"),
firstText(stringValue(safeMetadata, "toolDisplayName"), stringValue(payload, "asyncToolName")));
putIfPresent(payload, "toolDisplayName", displayName);
putIfPresent(payload, "toolName", displayName);
putIfPresent(payload, "name", displayName);
String statusKey = "async-tool:" + firstText(stringValue(payload, "taskId"), fallbackId);
payload.put("statusKey", statusKey);
payload.put("label", asyncToolLabel(stringValue(payload, "status"), stringValue(payload, "phase"), displayName));
}
private String asyncToolLabel(String status, String phase, String displayName) {
String name = displayName == null || displayName.isBlank() ? "异步工具" : displayName;
if ("SUCCEEDED".equalsIgnoreCase(status)) {
return name + "已完成";
}
if ("FAILED".equalsIgnoreCase(status) || "TIMEOUT".equalsIgnoreCase(status)) {
return name + "执行失败";
}
if ("PENDING".equalsIgnoreCase(status) || "submit".equalsIgnoreCase(phase)) {
return name + "已提交";
}
return name + "执行中";
}
/** /**
* 构建知识库检索状态载荷,确保前端可按稳定 key 合并同一轮状态行。 * 构建知识库检索状态载荷,确保前端可按稳定 key 合并同一轮状态行。
* *

View File

@@ -29,6 +29,8 @@ import tech.easyflow.agent.entity.Agent;
import tech.easyflow.agent.entity.AgentKnowledgeBinding; import tech.easyflow.agent.entity.AgentKnowledgeBinding;
import tech.easyflow.agent.entity.AgentToolBinding; import tech.easyflow.agent.entity.AgentToolBinding;
import tech.easyflow.agent.enums.AgentToolType; import tech.easyflow.agent.enums.AgentToolType;
import tech.easyflow.agent.runtime.tool.AgentToolRuntimeCompilation;
import tech.easyflow.agent.runtime.tool.AgentToolRuntimeCompiler;
import tech.easyflow.ai.easyagents.tool.ChatToolNameHelper; import tech.easyflow.ai.easyagents.tool.ChatToolNameHelper;
import tech.easyflow.ai.easyagents.tool.WorkflowTool; import tech.easyflow.ai.easyagents.tool.WorkflowTool;
import tech.easyflow.ai.easyagentsflow.support.PublishedWorkflowDefinitionIds; import tech.easyflow.ai.easyagentsflow.support.PublishedWorkflowDefinitionIds;
@@ -46,12 +48,12 @@ import java.util.regex.Pattern;
import java.util.*; import java.util.*;
/** /**
* Agent 发布快照编译为 easy-agents-agent-runtime 可执行定义 * Agent 发布快照编译为可执行定义
*/ */
@Component @Component
public class AgentDefinitionCompiler { public class AgentRuntimeCompiler {
private static final Logger LOG = LoggerFactory.getLogger(AgentDefinitionCompiler.class); private static final Logger LOG = LoggerFactory.getLogger(AgentRuntimeCompiler.class);
private static final int LOG_TEXT_MAX_LENGTH = 500; private static final int LOG_TEXT_MAX_LENGTH = 500;
private static final Pattern MCP_INPUT_PATTERN = Pattern.compile("\\$\\{input:([A-Za-z0-9_.-]+)}"); private static final Pattern MCP_INPUT_PATTERN = Pattern.compile("\\$\\{input:([A-Za-z0-9_.-]+)}");
@@ -67,6 +69,8 @@ public class AgentDefinitionCompiler {
private DocumentCollectionService documentCollectionService; private DocumentCollectionService documentCollectionService;
@Resource @Resource
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
@Resource
private AgentToolRuntimeCompiler agentToolRuntimeCompiler;
/** /**
* 编译 Agent 运行时定义和调用器 * 编译 Agent 运行时定义和调用器
@@ -209,35 +213,10 @@ public class AgentDefinitionCompiler {
} }
private void compileTools(Agent agent, AgentDefinition definition, AgentRuntimeBundle bundle) { private void compileTools(Agent agent, AgentDefinition definition, AgentRuntimeBundle bundle) {
if (agent.getToolBindings() == null) { AgentToolRuntimeCompilation compilation = agentToolRuntimeCompiler.compile(agent);
return; definition.setToolSpecs(compilation.getToolSpecs());
} definition.setMcpSpecs(compilation.getMcpSpecs());
List<AgentToolSpec> specs = new ArrayList<>(); bundle.setToolInvokers(compilation.getToolInvokers());
Map<String, com.easyagents.agent.runtime.tool.AgentToolInvoker> invokers = new LinkedHashMap<>();
List<McpSpec> mcpSpecs = new ArrayList<>();
Map<BigInteger, McpSpec> mcpSpecMap = new LinkedHashMap<>();
for (AgentToolBinding binding : agent.getToolBindings()) {
if (!Boolean.TRUE.equals(binding.getEnabled())) {
continue;
}
AgentToolType type = AgentToolType.from(binding.getToolType());
if (type == AgentToolType.MCP) {
McpSpec mcpSpec = mcpSpecMap.computeIfAbsent(binding.getTargetId(),
ignored -> buildMcpSpec(binding));
applyMcpToolBinding(mcpSpec, binding);
if (!mcpSpecs.contains(mcpSpec)) {
mcpSpecs.add(mcpSpec);
}
continue;
}
Tool tool = buildTool(binding);
AgentToolSpec spec = toToolSpec(tool, binding);
specs.add(spec);
invokers.put(spec.getName(), (arguments, context) -> invokeTool(tool, arguments));
}
definition.setToolSpecs(specs);
definition.setMcpSpecs(mcpSpecs);
bundle.setToolInvokers(invokers);
} }
private Tool buildTool(AgentToolBinding binding) { private Tool buildTool(AgentToolBinding binding) {

View File

@@ -0,0 +1,310 @@
package tech.easyflow.agent.runtime.asynctool;
import com.easyagents.agent.runtime.tool.AgentToolContext;
import com.easyagents.agent.runtime.tool.asynctool.*;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.StringUtils;
import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult;
import tech.easyflow.common.web.exceptions.BusinessException;
import java.time.Instant;
import java.util.*;
/**
* EasyFlow Agent 异步业务工具基类。
*/
public abstract class AbstractAgentAsyncSubTools implements AsyncSubTools {
private static final String ERROR_TYPE_NOT_FOUND = "TASK_NOT_FOUND";
private static final String ERROR_TYPE_EXCEPTION = "EXCEPTION";
private final AgentAsyncToolTaskStore taskStore;
private final ThreadPoolTaskExecutor taskExecutor;
/**
* 创建异步业务工具基类。
*
* @param taskStore 任务存储
* @param taskExecutor 后台执行器
*/
protected AbstractAgentAsyncSubTools(AgentAsyncToolTaskStore taskStore,
ThreadPoolTaskExecutor taskExecutor) {
this.taskStore = taskStore;
this.taskExecutor = taskExecutor;
}
/**
* 获取工具类型。
*
* @return 工具类型
*/
protected abstract String toolType();
/**
* 获取运行时工具名。
*
* @return 运行时工具名
*/
protected abstract String toolName();
/**
* 获取用户可见工具名称。
*
* @return 用户可见工具名称
*/
protected abstract String displayName();
/**
* 获取业务资源 ID。
*
* @return 业务资源 ID
*/
protected abstract String businessId();
/**
* 执行业务工具。
*
* @param arguments 调用参数
* @return 执行结果
*/
protected abstract AgentToolExecutionResult executeBusiness(Map<String, Object> arguments);
/**
* {@inheritDoc}
*/
@Override
public AsyncToolSubmitResult submit(Map<String, Object> arguments, AgentToolContext context) {
String sessionId = requireSessionId(context);
String taskId = newTaskId();
AgentAsyncToolTaskRecord record = new AgentAsyncToolTaskRecord();
record.setTaskId(taskId);
record.setToolType(toolType());
record.setToolName(toolName());
record.setBusinessId(businessId());
record.setStatus(AsyncToolTaskStatus.PENDING);
record.setArguments(arguments == null ? Map.of() : new LinkedHashMap<>(arguments));
record.setSummary(displayName() + "任务已提交");
record.setRequestId(context == null ? null : context.getRequestId());
record.setTraceId(context == null ? null : context.getTraceId());
record.setSessionId(sessionId);
record.setAgentId(context == null ? null : context.getAgentId());
record.setToolCallId(context == null ? null : context.getToolCallId());
record.getMetadata().put("toolDisplayName", displayName());
appendEvent(record, "SUBMITTED", displayName() + "任务已提交");
taskStore.create(record);
dispatch(sessionId, record.getTaskId(), record.getArguments());
AsyncToolSubmitResult result = new AsyncToolSubmitResult();
result.setTaskId(taskId);
result.setStatus(AsyncToolTaskStatus.PENDING);
result.setCursor(0L);
result.setSummary(record.getSummary());
result.setNextAction(toolName() + "_observe 查看任务进度。");
result.getMetadata().put("toolDisplayName", displayName());
return result;
}
/**
* {@inheritDoc}
*/
@Override
public AsyncToolTaskView observe(AsyncToolObserveRequest request, AgentToolContext context) {
return taskView(request == null ? null : request.getTaskId(),
request == null ? null : request.getCursor(),
request == null ? null : request.getLimit(),
context);
}
/**
* {@inheritDoc}
*/
@Override
public AsyncToolTaskView result(AsyncToolResultRequest request, AgentToolContext context) {
return taskView(request == null ? null : request.getTaskId(),
request == null ? null : request.getCursor(),
request == null ? null : request.getLimit(),
context);
}
/**
* {@inheritDoc}
*/
@Override
public AsyncToolCancelResult cancel(AsyncToolCancelRequest request, AgentToolContext context) {
AsyncToolCancelResult result = new AsyncToolCancelResult();
result.setTaskId(request == null ? null : request.getTaskId());
result.setStatus(AsyncToolTaskStatus.FAILED);
result.setErrorMessage("当前异步工具不支持取消正在执行的任务");
result.setMessage("不支持取消");
result.getMetadata().put("toolDisplayName", displayName());
return result;
}
/**
* {@inheritDoc}
*/
@Override
public AsyncToolTaskListResult list(AsyncToolListRequest request, AgentToolContext context) {
String sessionId = requireSessionId(context);
AsyncToolTaskStatus status = request == null ? null : request.getStatus();
List<AsyncToolTaskSummary> tasks = new ArrayList<>();
for (AgentAsyncToolTaskRecord record : taskStore.list(sessionId, status)) {
tasks.add(summary(record));
}
AsyncToolTaskListResult result = new AsyncToolTaskListResult();
result.setTasks(tasks);
result.getMetadata().put("toolDisplayName", displayName());
return result;
}
private void dispatch(String sessionId, String taskId, Map<String, Object> arguments) {
try {
taskExecutor.execute(() -> executeTask(sessionId, taskId, arguments));
} catch (Exception e) {
taskStore.update(sessionId, taskId, record -> fail(record, e));
throw new BusinessException("提交异步工具任务失败:" + safeMessage(e));
}
}
private void executeTask(String sessionId, String taskId, Map<String, Object> arguments) {
try {
taskStore.update(sessionId, taskId, record -> {
record.setStatus(AsyncToolTaskStatus.RUNNING);
record.setSummary(displayName() + "任务执行中");
appendEvent(record, "RUNNING", displayName() + "任务执行中");
return record;
});
AgentToolExecutionResult executionResult = executeBusiness(arguments);
taskStore.update(sessionId, taskId, record -> {
record.setStatus(AsyncToolTaskStatus.SUCCEEDED);
record.setSummary(displayName() + "任务已完成");
record.setResult(executionResult == null ? null : executionResult.getResult());
record.setBusinessExecutionId(executionResult == null ? null : executionResult.getBusinessExecutionId());
appendEvent(record, "SUCCEEDED", displayName() + "任务已完成");
return record;
});
} catch (Exception e) {
taskStore.update(sessionId, taskId, record -> fail(record, e));
}
}
private AgentAsyncToolTaskRecord fail(AgentAsyncToolTaskRecord record, Exception error) {
record.setStatus(AsyncToolTaskStatus.FAILED);
record.setSummary(displayName() + "任务执行失败");
record.setErrorType(ERROR_TYPE_EXCEPTION);
record.setErrorMessage(safeMessage(error));
appendEvent(record, "FAILED", record.getErrorMessage());
return record;
}
private AsyncToolTaskView taskView(String taskId, Long cursor, Integer limit, AgentToolContext context) {
String sessionId = requireSessionId(context);
if (!StringUtils.hasText(taskId)) {
return notFoundView(taskId, cursor, "任务 ID 不能为空");
}
return taskStore.get(sessionId, taskId)
.map(record -> toView(record, cursor, limit))
.orElseGet(() -> notFoundView(taskId, cursor, "异步工具任务不存在或已过期"));
}
private AsyncToolTaskView toView(AgentAsyncToolTaskRecord record, Long cursor, Integer limit) {
long safeCursor = cursor == null ? 0L : Math.max(0L, cursor);
int safeLimit = limit == null || limit <= 0 ? 20 : Math.min(limit, 100);
List<AsyncToolTaskEvent> events = new ArrayList<>();
for (AsyncToolTaskEvent event : record.getEvents()) {
if (event.getSequence() != null && event.getSequence() > safeCursor) {
events.add(event);
}
if (events.size() >= safeLimit) {
break;
}
}
Long nextCursor = events.isEmpty()
? safeCursor
: events.get(events.size() - 1).getSequence();
AsyncToolTaskView view = new AsyncToolTaskView();
view.setTaskId(record.getTaskId());
view.setStatus(record.getStatus());
view.setCursor(safeCursor);
view.setNextCursor(nextCursor);
view.setSummary(record.getSummary());
view.setNextAction(nextAction(record.getStatus()));
view.setEvents(events);
view.setResult(record.getResult());
view.setErrorMessage(record.getErrorMessage());
view.setErrorType(record.getErrorType());
view.setTerminal(record.getStatus() != null && record.getStatus().isTerminal());
view.setResultAvailable(record.getStatus() == AsyncToolTaskStatus.SUCCEEDED && record.getResult() != null);
view.getMetadata().put("toolDisplayName", displayName());
putIfNotNull(view.getPayload(), "businessId", record.getBusinessId());
putIfNotNull(view.getPayload(), "businessExecutionId", record.getBusinessExecutionId());
return view;
}
private AsyncToolTaskView notFoundView(String taskId, Long cursor, String message) {
AsyncToolTaskView view = new AsyncToolTaskView();
view.setTaskId(taskId);
view.setStatus(AsyncToolTaskStatus.FAILED);
view.setCursor(cursor == null ? 0L : cursor);
view.setNextCursor(cursor == null ? 0L : cursor);
view.setSummary(message);
view.setErrorType(ERROR_TYPE_NOT_FOUND);
view.setErrorMessage(message);
view.setTerminal(true);
view.setResultAvailable(false);
view.getMetadata().put("toolDisplayName", displayName());
return view;
}
private AsyncToolTaskSummary summary(AgentAsyncToolTaskRecord record) {
AsyncToolTaskSummary summary = new AsyncToolTaskSummary();
summary.setTaskId(record.getTaskId());
summary.setStatus(record.getStatus());
summary.setSummary(record.getSummary());
summary.setCreatedAt(record.getCreatedAt());
summary.setUpdatedAt(record.getUpdatedAt());
summary.getPayload().put("toolName", record.getToolName());
summary.getPayload().put("toolDisplayName", displayName());
return summary;
}
private void appendEvent(AgentAsyncToolTaskRecord record, String type, String text) {
AsyncToolTaskEvent event = new AsyncToolTaskEvent();
event.setSequence((long) record.getEvents().size() + 1L);
event.setType(type);
event.setText(text);
event.setCreatedAt(Instant.now());
record.getEvents().add(event);
}
private String nextAction(AsyncToolTaskStatus status) {
if (status != null && status.isTerminal()) {
return "任务已结束。";
}
return toolName() + "_observe 继续查看任务进度。";
}
private String requireSessionId(AgentToolContext context) {
if (context == null || !StringUtils.hasText(context.getSessionId())) {
throw new BusinessException("异步工具任务缺少 Agent session 上下文");
}
return context.getSessionId();
}
private String newTaskId() {
String idPart = UUID.randomUUID().toString().replace("-", "");
return "async_" + idPart;
}
private void putIfNotNull(Map<String, Object> target, String key, Object value) {
if (value != null) {
target.put(key, value);
}
}
private String safeMessage(Exception e) {
return e == null || e.getMessage() == null || e.getMessage().isBlank()
? "异步工具任务执行失败"
: e.getMessage();
}
}

View File

@@ -0,0 +1,362 @@
package tech.easyflow.agent.runtime.asynctool;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskEvent;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Agent 异步工具任务 Redis 运行态记录。
*/
public class AgentAsyncToolTaskRecord {
private String taskId;
private String toolType;
private String toolName;
private String businessId;
private String businessExecutionId;
private String sessionScopedKey;
private Long ttlSeconds;
private AsyncToolTaskStatus status = AsyncToolTaskStatus.PENDING;
private Map<String, Object> arguments = new LinkedHashMap<>();
private String summary;
private Object result;
private String errorMessage;
private String errorType;
private List<AsyncToolTaskEvent> events = new ArrayList<>();
private String requestId;
private String traceId;
private String sessionId;
private String agentId;
private String toolCallId;
private Instant createdAt = Instant.now();
private Instant updatedAt = Instant.now();
private Map<String, Object> payload = new LinkedHashMap<>();
private Map<String, Object> metadata = new LinkedHashMap<>();
/**
* 获取任务 ID。
*
* @return 任务 ID
*/
public String getTaskId() { return taskId; }
/**
* 设置任务 ID。
*
* @param taskId 任务 ID
*/
public void setTaskId(String taskId) { this.taskId = taskId; }
/**
* 获取工具类型。
*
* @return 工具类型
*/
public String getToolType() { return toolType; }
/**
* 设置工具类型。
*
* @param toolType 工具类型
*/
public void setToolType(String toolType) { this.toolType = toolType; }
/**
* 获取工具名称。
*
* @return 工具名称
*/
public String getToolName() { return toolName; }
/**
* 设置工具名称。
*
* @param toolName 工具名称
*/
public void setToolName(String toolName) { this.toolName = toolName; }
/**
* 获取业务资源 ID。
*
* @return 业务资源 ID
*/
public String getBusinessId() { return businessId; }
/**
* 设置业务资源 ID。
*
* @param businessId 业务资源 ID
*/
public void setBusinessId(String businessId) { this.businessId = businessId; }
/**
* 获取业务执行记录 ID。
*
* @return 业务执行记录 ID
*/
public String getBusinessExecutionId() { return businessExecutionId; }
/**
* 设置业务执行记录 ID。
*
* @param businessExecutionId 业务执行记录 ID
*/
public void setBusinessExecutionId(String businessExecutionId) { this.businessExecutionId = businessExecutionId; }
/**
* 获取会话内任务存储 key。
*
* @return 会话内任务存储 key
*/
public String getSessionScopedKey() { return sessionScopedKey; }
/**
* 设置会话内任务存储 key。
*
* @param sessionScopedKey 会话内任务存储 key
*/
public void setSessionScopedKey(String sessionScopedKey) { this.sessionScopedKey = sessionScopedKey; }
/**
* 获取任务 TTL 秒数。
*
* @return TTL 秒数
*/
public Long getTtlSeconds() { return ttlSeconds; }
/**
* 设置任务 TTL 秒数。
*
* @param ttlSeconds TTL 秒数
*/
public void setTtlSeconds(Long ttlSeconds) { this.ttlSeconds = ttlSeconds; }
/**
* 获取任务状态。
*
* @return 任务状态
*/
public AsyncToolTaskStatus getStatus() { return status; }
/**
* 设置任务状态。
*
* @param status 任务状态
*/
public void setStatus(AsyncToolTaskStatus status) { this.status = status == null ? AsyncToolTaskStatus.PENDING : status; }
/**
* 获取任务参数。
*
* @return 任务参数
*/
public Map<String, Object> getArguments() { return arguments; }
/**
* 设置任务参数。
*
* @param arguments 任务参数
*/
public void setArguments(Map<String, Object> arguments) { this.arguments = arguments == null ? new LinkedHashMap<>() : arguments; }
/**
* 获取任务摘要。
*
* @return 任务摘要
*/
public String getSummary() { return summary; }
/**
* 设置任务摘要。
*
* @param summary 任务摘要
*/
public void setSummary(String summary) { this.summary = summary; }
/**
* 获取任务结果。
*
* @return 任务结果
*/
public Object getResult() { return result; }
/**
* 设置任务结果。
*
* @param result 任务结果
*/
public void setResult(Object result) { this.result = result; }
/**
* 获取错误消息。
*
* @return 错误消息
*/
public String getErrorMessage() { return errorMessage; }
/**
* 设置错误消息。
*
* @param errorMessage 错误消息
*/
public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
/**
* 获取错误类型。
*
* @return 错误类型
*/
public String getErrorType() { return errorType; }
/**
* 设置错误类型。
*
* @param errorType 错误类型
*/
public void setErrorType(String errorType) { this.errorType = errorType; }
/**
* 获取任务事件列表。
*
* @return 任务事件列表
*/
public List<AsyncToolTaskEvent> getEvents() { return events; }
/**
* 设置任务事件列表。
*
* @param events 任务事件列表
*/
public void setEvents(List<AsyncToolTaskEvent> events) { this.events = events == null ? new ArrayList<>() : events; }
/**
* 获取请求 ID。
*
* @return 请求 ID
*/
public String getRequestId() { return requestId; }
/**
* 设置请求 ID。
*
* @param requestId 请求 ID
*/
public void setRequestId(String requestId) { this.requestId = requestId; }
/**
* 获取链路 ID。
*
* @return 链路 ID
*/
public String getTraceId() { return traceId; }
/**
* 设置链路 ID。
*
* @param traceId 链路 ID
*/
public void setTraceId(String traceId) { this.traceId = traceId; }
/**
* 获取 Agent Runtime session ID。
*
* @return session ID
*/
public String getSessionId() { return sessionId; }
/**
* 设置 Agent Runtime session ID。
*
* @param sessionId session ID
*/
public void setSessionId(String sessionId) { this.sessionId = sessionId; }
/**
* 获取 Agent ID。
*
* @return Agent ID
*/
public String getAgentId() { return agentId; }
/**
* 设置 Agent ID。
*
* @param agentId Agent ID
*/
public void setAgentId(String agentId) { this.agentId = agentId; }
/**
* 获取工具调用 ID。
*
* @return 工具调用 ID
*/
public String getToolCallId() { return toolCallId; }
/**
* 设置工具调用 ID。
*
* @param toolCallId 工具调用 ID
*/
public void setToolCallId(String toolCallId) { this.toolCallId = toolCallId; }
/**
* 获取创建时间。
*
* @return 创建时间
*/
public Instant getCreatedAt() { return createdAt; }
/**
* 设置创建时间。
*
* @param createdAt 创建时间
*/
public void setCreatedAt(Instant createdAt) { this.createdAt = createdAt == null ? Instant.now() : createdAt; }
/**
* 获取更新时间。
*
* @return 更新时间
*/
public Instant getUpdatedAt() { return updatedAt; }
/**
* 设置更新时间。
*
* @param updatedAt 更新时间
*/
public void setUpdatedAt(Instant updatedAt) { this.updatedAt = updatedAt == null ? Instant.now() : updatedAt; }
/**
* 获取业务载荷。
*
* @return 业务载荷
*/
public Map<String, Object> getPayload() { return payload; }
/**
* 设置业务载荷。
*
* @param payload 业务载荷
*/
public void setPayload(Map<String, Object> payload) { this.payload = payload == null ? new LinkedHashMap<>() : payload; }
/**
* 获取元数据。
*
* @return 元数据
*/
public Map<String, Object> getMetadata() { return metadata; }
/**
* 设置元数据。
*
* @param metadata 元数据
*/
public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata == null ? new LinkedHashMap<>() : metadata; }
}

View File

@@ -0,0 +1,48 @@
package tech.easyflow.agent.runtime.asynctool;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus;
import java.util.List;
import java.util.Optional;
import java.util.function.UnaryOperator;
/**
* Agent 异步工具任务运行态存储。
*/
public interface AgentAsyncToolTaskStore {
/**
* 创建任务记录。
*
* @param record 任务记录
*/
void create(AgentAsyncToolTaskRecord record);
/**
* 获取当前 session 下的任务记录。
*
* @param sessionId Agent Runtime session ID
* @param taskId 任务 ID
* @return 任务记录
*/
Optional<AgentAsyncToolTaskRecord> get(String sessionId, String taskId);
/**
* 更新当前 session 下的任务记录。
*
* @param sessionId Agent Runtime session ID
* @param taskId 任务 ID
* @param updater 更新函数
* @return 更新后的任务记录
*/
Optional<AgentAsyncToolTaskRecord> update(String sessionId, String taskId, UnaryOperator<AgentAsyncToolTaskRecord> updater);
/**
* 查询当前 session 下可见任务。
*
* @param sessionId Agent Runtime session ID
* @param status 状态过滤;为空时返回全部未过期任务
* @return 任务列表
*/
List<AgentAsyncToolTaskRecord> list(String sessionId, AsyncToolTaskStatus status);
}

View File

@@ -0,0 +1,83 @@
package tech.easyflow.agent.runtime.asynctool;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import tech.easyflow.agent.enums.AgentToolType;
import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult;
import tech.easyflow.agent.runtime.tool.PluginToolExecutor;
import tech.easyflow.ai.entity.PluginItem;
import java.util.Map;
/**
* Plugin 异步工具子能力实现。
*/
public class PluginAsyncSubTools extends AbstractAgentAsyncSubTools {
private final PluginItem pluginItem;
private final String toolName;
private final String displayName;
private final PluginToolExecutor pluginToolExecutor;
/**
* 创建 Plugin 异步工具子能力。
*
* @param pluginItem 插件工具快照
* @param toolName runtime 工具名
* @param displayName 用户可见名称
* @param pluginToolExecutor Plugin 执行器
* @param taskStore 任务存储
* @param taskExecutor 后台执行器
*/
public PluginAsyncSubTools(PluginItem pluginItem,
String toolName,
String displayName,
PluginToolExecutor pluginToolExecutor,
AgentAsyncToolTaskStore taskStore,
ThreadPoolTaskExecutor taskExecutor) {
super(taskStore, taskExecutor);
this.pluginItem = pluginItem;
this.toolName = toolName;
this.displayName = displayName;
this.pluginToolExecutor = pluginToolExecutor;
}
/**
* {@inheritDoc}
*/
@Override
protected String toolType() {
return AgentToolType.PLUGIN.name();
}
/**
* {@inheritDoc}
*/
@Override
protected String toolName() {
return toolName;
}
/**
* {@inheritDoc}
*/
@Override
protected String displayName() {
return displayName;
}
/**
* {@inheritDoc}
*/
@Override
protected String businessId() {
return pluginItem == null || pluginItem.getId() == null ? null : String.valueOf(pluginItem.getId());
}
/**
* {@inheritDoc}
*/
@Override
protected AgentToolExecutionResult executeBusiness(Map<String, Object> arguments) {
return pluginToolExecutor.execute(pluginItem, arguments);
}
}

View File

@@ -0,0 +1,172 @@
package tech.easyflow.agent.runtime.asynctool;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import tech.easyflow.agent.config.AgentRuntimeProperties;
import tech.easyflow.common.web.exceptions.BusinessException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
/**
* 基于 Redis 单 key 的 Agent 异步工具任务存储。
*/
@Service
public class RedisAgentAsyncToolTaskStore implements AgentAsyncToolTaskStore {
private static final String KEY_PREFIX = "easyflow:agent:async-tool:";
private final StringRedisTemplate stringRedisTemplate;
private final ObjectMapper objectMapper;
private final AgentRuntimeProperties properties;
/**
* 创建 Redis 任务存储。
*
* @param stringRedisTemplate Redis 字符串模板
* @param objectMapper JSON mapper
* @param properties Agent runtime 配置
*/
public RedisAgentAsyncToolTaskStore(StringRedisTemplate stringRedisTemplate,
ObjectMapper objectMapper,
AgentRuntimeProperties properties) {
this.stringRedisTemplate = stringRedisTemplate;
this.objectMapper = objectMapper;
this.properties = properties;
}
/**
* {@inheritDoc}
*/
@Override
public void create(AgentAsyncToolTaskRecord record) {
if (record == null) {
throw new BusinessException("异步工具任务不能为空");
}
String sessionId = requireText(record.getSessionId(), "异步工具任务 sessionId 不能为空");
String taskId = requireText(record.getTaskId(), "异步工具任务 taskId 不能为空");
record.setSessionScopedKey(key(sessionId, taskId));
Duration ttl = taskTtl();
record.setTtlSeconds(ttl.toSeconds());
write(record, ttl);
}
/**
* {@inheritDoc}
*/
@Override
public Optional<AgentAsyncToolTaskRecord> get(String sessionId, String taskId) {
String value = stringRedisTemplate.opsForValue().get(key(sessionId, taskId));
if (!StringUtils.hasText(value)) {
return Optional.empty();
}
return Optional.of(read(value));
}
/**
* {@inheritDoc}
*/
@Override
public Optional<AgentAsyncToolTaskRecord> update(String sessionId,
String taskId,
UnaryOperator<AgentAsyncToolTaskRecord> updater) {
Optional<AgentAsyncToolTaskRecord> existing = get(sessionId, taskId);
if (existing.isEmpty()) {
return Optional.empty();
}
AgentAsyncToolTaskRecord updated = updater == null ? existing.get() : updater.apply(existing.get());
if (updated == null) {
return Optional.empty();
}
updated.setUpdatedAt(Instant.now());
write(updated, remainingTtl(sessionId, taskId));
return Optional.of(updated);
}
/**
* {@inheritDoc}
*/
@Override
public List<AgentAsyncToolTaskRecord> list(String sessionId, AsyncToolTaskStatus status) {
String safeSessionId = requireText(sessionId, "异步工具任务 sessionId 不能为空");
List<AgentAsyncToolTaskRecord> result = new ArrayList<>();
ScanOptions options = ScanOptions.scanOptions().match(KEY_PREFIX + safeSessionId + ":*").count(100).build();
try (Cursor<String> cursor = stringRedisTemplate.scan(options)) {
while (cursor.hasNext()) {
String key = cursor.next();
String value = stringRedisTemplate.opsForValue().get(key);
if (!StringUtils.hasText(value)) {
continue;
}
AgentAsyncToolTaskRecord record = read(value);
if (status == null || status == record.getStatus()) {
result.add(record);
}
}
}
result.sort(Comparator.comparing(AgentAsyncToolTaskRecord::getCreatedAt,
Comparator.nullsLast(Comparator.reverseOrder())));
return result;
}
private void write(AgentAsyncToolTaskRecord record, Duration ttl) {
try {
stringRedisTemplate.opsForValue().set(record.getSessionScopedKey(),
objectMapper.writeValueAsString(record), Math.max(1L, ttl.toSeconds()), TimeUnit.SECONDS);
} catch (Exception e) {
throw new BusinessException("写入异步工具任务状态失败:" + safeMessage(e));
}
}
private AgentAsyncToolTaskRecord read(String value) {
try {
return objectMapper.readValue(value, AgentAsyncToolTaskRecord.class);
} catch (Exception e) {
throw new BusinessException("读取异步工具任务状态失败:" + safeMessage(e));
}
}
private Duration remainingTtl(String sessionId, String taskId) {
Long seconds = stringRedisTemplate.getExpire(key(sessionId, taskId), TimeUnit.SECONDS);
if (seconds == null || seconds <= 0L) {
return taskTtl();
}
return Duration.ofSeconds(seconds);
}
private Duration taskTtl() {
Duration ttl = properties == null ? Duration.ofHours(24) : properties.getAsyncToolTaskTtl();
return ttl == null || ttl.isZero() || ttl.isNegative() ? Duration.ofHours(24) : ttl;
}
private String key(String sessionId, String taskId) {
return KEY_PREFIX
+ requireText(sessionId, "异步工具任务 sessionId 不能为空")
+ ":"
+ requireText(taskId, "异步工具任务 taskId 不能为空");
}
private String requireText(String value, String message) {
if (!StringUtils.hasText(value)) {
throw new BusinessException(message);
}
return value.trim();
}
private String safeMessage(Exception e) {
return e == null || e.getMessage() == null || e.getMessage().isBlank()
? "未知错误"
: e.getMessage();
}
}

View File

@@ -0,0 +1,83 @@
package tech.easyflow.agent.runtime.asynctool;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import tech.easyflow.agent.enums.AgentToolType;
import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult;
import tech.easyflow.agent.runtime.tool.WorkflowToolExecutor;
import tech.easyflow.ai.entity.Workflow;
import java.util.Map;
/**
* Workflow 异步工具子能力实现。
*/
public class WorkflowAsyncSubTools extends AbstractAgentAsyncSubTools {
private final Workflow workflow;
private final String toolName;
private final String displayName;
private final WorkflowToolExecutor workflowToolExecutor;
/**
* 创建 Workflow 异步工具子能力。
*
* @param workflow 工作流快照
* @param toolName runtime 工具名
* @param displayName 用户可见名称
* @param workflowToolExecutor Workflow 执行器
* @param taskStore 任务存储
* @param taskExecutor 后台执行器
*/
public WorkflowAsyncSubTools(Workflow workflow,
String toolName,
String displayName,
WorkflowToolExecutor workflowToolExecutor,
AgentAsyncToolTaskStore taskStore,
ThreadPoolTaskExecutor taskExecutor) {
super(taskStore, taskExecutor);
this.workflow = workflow;
this.toolName = toolName;
this.displayName = displayName;
this.workflowToolExecutor = workflowToolExecutor;
}
/**
* {@inheritDoc}
*/
@Override
protected String toolType() {
return AgentToolType.WORKFLOW.name();
}
/**
* {@inheritDoc}
*/
@Override
protected String toolName() {
return toolName;
}
/**
* {@inheritDoc}
*/
@Override
protected String displayName() {
return displayName;
}
/**
* {@inheritDoc}
*/
@Override
protected String businessId() {
return workflow == null || workflow.getId() == null ? null : String.valueOf(workflow.getId());
}
/**
* {@inheritDoc}
*/
@Override
protected AgentToolExecutionResult executeBusiness(Map<String, Object> arguments) {
return workflowToolExecutor.execute(workflow, arguments);
}
}

View File

@@ -0,0 +1,35 @@
package tech.easyflow.agent.runtime.tool;
/**
* Agent 工具执行模式。
*/
public enum AgentToolExecutionMode {
/**
* 同步执行。
*/
SYNC,
/**
* 异步执行。
*/
ASYNC;
/**
* 解析执行模式。
*
* @param value 原始配置值
* @return 执行模式;非法或为空时返回 SYNC
*/
public static AgentToolExecutionMode from(String value) {
if (value == null || value.isBlank()) {
return SYNC;
}
for (AgentToolExecutionMode mode : values()) {
if (mode.name().equalsIgnoreCase(value.trim())) {
return mode;
}
}
return SYNC;
}
}

View File

@@ -0,0 +1,57 @@
package tech.easyflow.agent.runtime.tool;
/**
* Agent 业务工具执行结果。
*/
public class AgentToolExecutionResult {
private Object result;
private String businessExecutionId;
/**
* 创建执行结果。
*
* @param result 业务结果
* @param businessExecutionId 业务执行记录 ID
*/
public AgentToolExecutionResult(Object result, String businessExecutionId) {
this.result = result;
this.businessExecutionId = businessExecutionId;
}
/**
* 获取业务结果。
*
* @return 业务结果
*/
public Object getResult() {
return result;
}
/**
* 设置业务结果。
*
* @param result 业务结果
*/
public void setResult(Object result) {
this.result = result;
}
/**
* 获取业务执行记录 ID。
*
* @return 业务执行记录 ID
*/
public String getBusinessExecutionId() {
return businessExecutionId;
}
/**
* 设置业务执行记录 ID。
*
* @param businessExecutionId 业务执行记录 ID
*/
public void setBusinessExecutionId(String businessExecutionId) {
this.businessExecutionId = businessExecutionId;
}
}

View File

@@ -0,0 +1,74 @@
package tech.easyflow.agent.runtime.tool;
import com.easyagents.agent.runtime.mcp.McpSpec;
import com.easyagents.agent.runtime.tool.AgentToolInvoker;
import com.easyagents.agent.runtime.tool.AgentToolSpec;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Agent 工具运行时编译结果。
*/
public class AgentToolRuntimeCompilation {
private List<AgentToolSpec> toolSpecs = new ArrayList<>();
private List<McpSpec> mcpSpecs = new ArrayList<>();
private Map<String, AgentToolInvoker> toolInvokers = new LinkedHashMap<>();
/**
* 获取普通工具声明。
*
* @return 普通工具声明
*/
public List<AgentToolSpec> getToolSpecs() {
return toolSpecs;
}
/**
* 设置普通工具声明。
*
* @param toolSpecs 普通工具声明
*/
public void setToolSpecs(List<AgentToolSpec> toolSpecs) {
this.toolSpecs = toolSpecs == null ? new ArrayList<>() : toolSpecs;
}
/**
* 获取 MCP 声明。
*
* @return MCP 声明
*/
public List<McpSpec> getMcpSpecs() {
return mcpSpecs;
}
/**
* 设置 MCP 声明。
*
* @param mcpSpecs MCP 声明
*/
public void setMcpSpecs(List<McpSpec> mcpSpecs) {
this.mcpSpecs = mcpSpecs == null ? new ArrayList<>() : mcpSpecs;
}
/**
* 获取工具调用器。
*
* @return 工具调用器
*/
public Map<String, AgentToolInvoker> getToolInvokers() {
return toolInvokers;
}
/**
* 设置工具调用器。
*
* @param toolInvokers 工具调用器
*/
public void setToolInvokers(Map<String, AgentToolInvoker> toolInvokers) {
this.toolInvokers = toolInvokers == null ? new LinkedHashMap<>() : toolInvokers;
}
}

View File

@@ -0,0 +1,627 @@
package tech.easyflow.agent.runtime.tool;
import com.easyagents.agent.runtime.hitl.AgentToolApprovalRequest;
import com.easyagents.agent.runtime.mcp.McpSpec;
import com.easyagents.agent.runtime.mcp.McpTransportType;
import com.easyagents.agent.runtime.tool.*;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSpec;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSpecExpander;
import com.easyagents.core.model.chat.tool.Parameter;
import com.easyagents.core.model.chat.tool.Tool;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import tech.easyflow.agent.entity.Agent;
import tech.easyflow.agent.entity.AgentToolBinding;
import tech.easyflow.agent.enums.AgentToolType;
import tech.easyflow.agent.runtime.asynctool.AgentAsyncToolTaskStore;
import tech.easyflow.agent.runtime.asynctool.PluginAsyncSubTools;
import tech.easyflow.agent.runtime.asynctool.WorkflowAsyncSubTools;
import tech.easyflow.ai.easyagents.tool.ChatToolNameHelper;
import tech.easyflow.ai.entity.Mcp;
import tech.easyflow.ai.entity.PluginItem;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.ai.service.McpService;
import tech.easyflow.ai.service.PluginItemService;
import tech.easyflow.ai.service.WorkflowService;
import tech.easyflow.common.web.exceptions.BusinessException;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.time.Duration;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Agent 工具运行时编译器。
*/
@Component
public class AgentToolRuntimeCompiler {
private static final Pattern MCP_INPUT_PATTERN = Pattern.compile("\\$\\{input:([A-Za-z0-9_.-]+)}");
private static final Pattern ASYNC_SAFE_NAME = Pattern.compile("^[a-z][a-z0-9_]*$");
@Resource
private WorkflowService workflowService;
@Resource
private PluginItemService pluginItemService;
@Resource
private McpService mcpService;
@Resource
private ObjectMapper objectMapper;
@Resource
private WorkflowToolExecutor workflowToolExecutor;
@Resource
private PluginToolExecutor pluginToolExecutor;
@Resource
private AgentAsyncToolTaskStore asyncToolTaskStore;
@Resource(name = "agentAsyncToolExecutor")
private ThreadPoolTaskExecutor agentAsyncToolExecutor;
/**
* 编译 Agent 工具配置。
*
* @param agent Agent 业务定义
* @return 工具编译结果
*/
public AgentToolRuntimeCompilation compile(Agent agent) {
AgentToolRuntimeCompilation compilation = new AgentToolRuntimeCompilation();
if (agent == null || agent.getToolBindings() == null) {
return compilation;
}
List<AgentToolSpec> specs = new ArrayList<>();
Map<String, AgentToolInvoker> invokers = new LinkedHashMap<>();
List<McpSpec> mcpSpecs = new ArrayList<>();
Map<BigInteger, McpSpec> mcpSpecMap = new LinkedHashMap<>();
Set<String> compiledToolNames = new LinkedHashSet<>();
AsyncToolSpecExpander asyncExpander = new AsyncToolSpecExpander();
for (AgentToolBinding binding : agent.getToolBindings()) {
if (!Boolean.TRUE.equals(binding.getEnabled())) {
continue;
}
AgentToolType type = AgentToolType.from(binding.getToolType());
if (type == AgentToolType.MCP) {
McpSpec mcpSpec = mcpSpecMap.computeIfAbsent(binding.getTargetId(), ignored -> buildMcpSpec(binding));
applyMcpToolBinding(mcpSpec, binding);
if (!mcpSpecs.contains(mcpSpec)) {
mcpSpecs.add(mcpSpec);
}
continue;
}
if (executionMode(binding) == AgentToolExecutionMode.ASYNC) {
AsyncToolSpec asyncSpec = buildAsyncToolSpec(type, binding);
addExpandedTools(specs, invokers, compiledToolNames,
asyncExpander.expandSpecs(asyncSpec),
asyncExpander.expandInvokers(asyncSpec));
continue;
}
CompiledSyncTool syncTool = buildSyncTool(type, binding);
addCompiledTool(specs, invokers, compiledToolNames, syncTool.spec(), syncTool.invoker());
}
compilation.setToolSpecs(specs);
compilation.setMcpSpecs(mcpSpecs);
compilation.setToolInvokers(invokers);
return compilation;
}
private void addExpandedTools(List<AgentToolSpec> specs,
Map<String, AgentToolInvoker> invokers,
Set<String> compiledToolNames,
List<AgentToolSpec> expandedSpecs,
Map<String, AgentToolInvoker> expandedInvokers) {
for (AgentToolSpec spec : expandedSpecs) {
addCompiledTool(specs, invokers, compiledToolNames, spec,
expandedInvokers == null ? null : expandedInvokers.get(spec.getName()));
}
}
private void addCompiledTool(List<AgentToolSpec> specs,
Map<String, AgentToolInvoker> invokers,
Set<String> compiledToolNames,
AgentToolSpec spec,
AgentToolInvoker invoker) {
String name = spec == null ? null : spec.getName();
if (name == null || name.isBlank()) {
throw new BusinessException("Agent 工具运行名不能为空");
}
if (!compiledToolNames.add(name)) {
throw new BusinessException("Agent 工具运行名冲突:" + name + ",请调整工具名称");
}
specs.add(spec);
if (invoker != null) {
invokers.put(name, invoker);
}
}
private CompiledSyncTool buildSyncTool(AgentToolType type, AgentToolBinding binding) {
if (type == AgentToolType.WORKFLOW) {
Workflow workflow = requireWorkflow(binding);
Tool tool = workflowToolExecutor.buildTool(workflow);
AgentToolSpec spec = toToolSpec(tool, binding);
AgentToolInvoker invoker = (arguments, context) -> invokeSafely(spec.getName(),
() -> workflowToolExecutor.execute(workflow, arguments).getResult());
return new CompiledSyncTool(spec, invoker);
}
if (type == AgentToolType.PLUGIN) {
PluginItem pluginItem = requirePlugin(binding);
Tool tool = pluginToolExecutor.buildTool(pluginItem);
AgentToolSpec spec = toToolSpec(tool, binding);
AgentToolInvoker invoker = (arguments, context) -> invokeSafely(spec.getName(),
() -> pluginToolExecutor.execute(pluginItem, arguments).getResult());
return new CompiledSyncTool(spec, invoker);
}
throw new BusinessException("不支持的 Agent 工具类型:" + type.name());
}
private AsyncToolSpec buildAsyncToolSpec(AgentToolType type, AgentToolBinding binding) {
if (type == AgentToolType.WORKFLOW) {
Workflow workflow = requireWorkflow(binding);
Tool tool = workflowToolExecutor.buildTool(workflow);
String asyncName = asyncToolName(tool, binding, "workflow");
String toolDisplayName = displayName(tool, workflow.getTitle());
AsyncToolSpec spec = baseAsyncSpec(asyncName, tool, binding, toolDisplayName);
spec.setSubTools(new WorkflowAsyncSubTools(workflow, asyncName, toolDisplayName,
workflowToolExecutor, asyncToolTaskStore, agentAsyncToolExecutor));
return spec;
}
if (type == AgentToolType.PLUGIN) {
PluginItem pluginItem = requirePlugin(binding);
Tool tool = pluginToolExecutor.buildTool(pluginItem);
String asyncName = asyncToolName(tool, binding, "plugin");
String toolDisplayName = displayName(tool, pluginItem.getName());
AsyncToolSpec spec = baseAsyncSpec(asyncName, tool, binding, toolDisplayName);
spec.setSubTools(new PluginAsyncSubTools(pluginItem, asyncName, toolDisplayName,
pluginToolExecutor, asyncToolTaskStore, agentAsyncToolExecutor));
return spec;
}
throw new BusinessException("不支持的 Agent 异步工具类型:" + type.name());
}
private AsyncToolSpec baseAsyncSpec(String asyncName, Tool tool, AgentToolBinding binding, String toolDisplayName) {
AsyncToolSpec spec = new AsyncToolSpec();
spec.setName(asyncName);
spec.setDescription(safeDescription(tool == null ? null : tool.getDescription()));
spec.setSubmitParametersSchema(toSchema(tool == null ? null : tool.getParameters()));
spec.setApprovalRequired(Boolean.TRUE.equals(binding.getHitlEnabled()));
if (Boolean.TRUE.equals(binding.getHitlEnabled())) {
spec.setApprovalRequest(buildBindingApprovalRequest(binding));
}
spec.getMetadata().put("bindingId", binding.getId());
spec.getMetadata().put("targetId", binding.getTargetId());
spec.getMetadata().put("toolType", binding.getToolType());
// 异步子工具名服务 runtime 调用,事件和聊天展示必须保留业务名称。
spec.getMetadata().put("toolDisplayName", toolDisplayName);
return spec;
}
private AgentToolResult invokeSafely(String toolName, ToolCall call) {
try {
Object result = call.invoke();
return AgentToolResult.success(result == null ? "" : String.valueOf(result));
} catch (Exception e) {
return AgentToolResult.failure(e.getMessage() == null ? "工具执行失败" : e.getMessage());
}
}
private AgentToolExecutionMode executionMode(AgentToolBinding binding) {
Object value = binding == null || binding.getOptionsJson() == null ? null : binding.getOptionsJson().get("executionMode");
return AgentToolExecutionMode.from(value == null ? null : String.valueOf(value));
}
private Workflow requireWorkflow(AgentToolBinding binding) {
Workflow workflow = snapshotOrPublishedWorkflow(binding);
if (workflow == null) {
throw new BusinessException("绑定工作流不存在");
}
return workflow;
}
private PluginItem requirePlugin(AgentToolBinding binding) {
PluginItem pluginItem = snapshotOrCurrentPlugin(binding);
if (pluginItem == null) {
throw new BusinessException("绑定插件不存在");
}
return pluginItem;
}
private AgentToolSpec toToolSpec(Tool tool, AgentToolBinding binding) {
AgentToolSpec spec = new AgentToolSpec();
String name = resolveRuntimeToolName(tool, binding);
spec.setName(name);
spec.setDescription(safeDescription(tool == null ? null : tool.getDescription()));
spec.setCategory(AgentToolCategory.valueOf(AgentToolType.from(binding.getToolType()).name()));
spec.setParametersSchema(toSchema(tool == null ? null : tool.getParameters()));
spec.setApprovalRequired(Boolean.TRUE.equals(binding.getHitlEnabled()));
if (Boolean.TRUE.equals(binding.getHitlEnabled())) {
spec.setApprovalRequest(buildBindingApprovalRequest(binding));
}
spec.getMetadata().put("bindingId", binding.getId());
spec.getMetadata().put("targetId", binding.getTargetId());
spec.getMetadata().put("toolType", binding.getToolType());
spec.getMetadata().put("toolDisplayName", displayName(tool, binding.getToolName()));
return spec;
}
private AgentToolApprovalRequest buildBindingApprovalRequest(AgentToolBinding binding) {
AgentToolApprovalRequest request = new AgentToolApprovalRequest();
String name = binding == null ? "工具" : binding.getToolName();
request.setApprovalPrompt(stringValue(binding == null ? null : binding.getHitlConfigJson(), "prompt", "是否批准执行工具:" + name));
Map<String, Object> metadata = sanitizedHitlMetadata(binding == null ? null : binding.getHitlConfigJson());
if (binding != null) {
metadata.put("toolType", binding.getToolType());
metadata.put("bindingId", binding.getId());
metadata.put("targetId", binding.getTargetId());
}
request.setMetadata(metadata);
return request;
}
private McpSpec buildMcpSpec(AgentToolBinding binding) {
Mcp mcp = snapshotOrCurrentMcp(binding);
if (mcp == null) {
throw new BusinessException("绑定 MCP 不存在");
}
Map.Entry<String, Map<String, Object>> server = firstMcpServer(mcp);
Map<String, Object> serverConfig = server.getValue();
McpSpec spec = new McpSpec();
spec.setName(mcpRuntimeName(mcp));
spec.setDescription(firstNonBlank(mcp.getDescription(), mcp.getTitle()));
spec.setTransportType(parseMcpTransportType(mcp, serverConfig));
spec.setCommand(resolveMcpInput(stringValue(serverConfig, "command", null)));
spec.setArgs(resolveMcpInputs(stringListValue(serverConfig, "args")));
spec.setEnv(resolveMcpInputMap(stringMapValue(serverConfig, "env")));
spec.setUrl(resolveMcpInput(stringValue(serverConfig, "url", null)));
spec.setHeaders(resolveMcpInputMap(stringMapValue(serverConfig, "headers")));
spec.setQueryParams(resolveMcpInputMap(stringMapValue(serverConfig, "queryParams")));
Duration timeout = durationValue(serverConfig, "timeout");
if (timeout != null) {
spec.setTimeout(timeout);
}
Duration initializationTimeout = durationValue(serverConfig, "initializationTimeout");
if (initializationTimeout != null) {
spec.setInitializationTimeout(initializationTimeout);
}
spec.setGroupName(mcpRuntimeName(mcp));
spec.setApprovalRequired(Boolean.TRUE.equals(mcp.getApprovalRequired()));
spec.setApprovalRequest(buildMcpApprovalRequest(mcp));
spec.setToolNamePrefix(mcpRuntimeToolPrefix(mcp.getId()));
spec.getMetadata().put("toolType", AgentToolType.MCP.name());
spec.getMetadata().put("mcpId", String.valueOf(mcp.getId()));
spec.getMetadata().put("mcpTitle", mcp.getTitle());
spec.getMetadata().put("serverName", server.getKey());
return spec;
}
private void applyMcpToolBinding(McpSpec spec, AgentToolBinding binding) {
if (Boolean.TRUE.equals(binding.getHitlEnabled())) {
spec.setApprovalRequired(true);
spec.setApprovalRequest(buildBindingApprovalRequest(binding));
}
}
private AgentToolApprovalRequest buildMcpApprovalRequest(Mcp mcp) {
AgentToolApprovalRequest request = new AgentToolApprovalRequest();
request.setApprovalPrompt("是否批准执行 MCP 工具:" + firstNonBlank(mcp.getTitle(), mcpRuntimeName(mcp)));
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("toolType", AgentToolType.MCP.name());
metadata.put("mcpId", String.valueOf(mcp.getId()));
metadata.put("mcpTitle", mcp.getTitle());
request.setMetadata(metadata);
return request;
}
private Workflow snapshotOrPublishedWorkflow(AgentToolBinding binding) {
if (binding.getResourceSnapshot() != null && !binding.getResourceSnapshot().isEmpty()) {
Workflow workflow = objectMapper.convertValue(binding.getResourceSnapshot(), Workflow.class);
workflow.setId(firstNonNull(workflow.getId(), binding.getTargetId()));
return workflow;
}
return workflowService.getPublishedById(binding.getTargetId());
}
private PluginItem snapshotOrCurrentPlugin(AgentToolBinding binding) {
if (binding.getResourceSnapshot() != null && !binding.getResourceSnapshot().isEmpty()) {
PluginItem pluginItem = objectMapper.convertValue(binding.getResourceSnapshot(), PluginItem.class);
pluginItem.setId(firstNonNull(pluginItem.getId(), binding.getTargetId()));
return pluginItem;
}
return pluginItemService.getById(binding.getTargetId());
}
private Mcp snapshotOrCurrentMcp(AgentToolBinding binding) {
if (binding.getResourceSnapshot() != null && !binding.getResourceSnapshot().isEmpty()) {
Mcp mcp = objectMapper.convertValue(binding.getResourceSnapshot(), Mcp.class);
mcp.setId(firstNonNull(mcp.getId(), binding.getTargetId()));
return mcp;
}
return mcpService.getById(binding.getTargetId());
}
private Map<String, Object> toSchema(Parameter[] parameters) {
Map<String, Object> schema = new LinkedHashMap<>();
Map<String, Object> properties = new LinkedHashMap<>();
List<String> required = new ArrayList<>();
if (parameters != null) {
for (Parameter parameter : parameters) {
properties.put(parameter.getName(), parameterSchema(parameter));
if (parameter.isRequired()) {
required.add(parameter.getName());
}
}
}
schema.put("type", "object");
schema.put("properties", properties);
schema.put("required", required);
return schema;
}
private Map<String, Object> parameterSchema(Parameter parameter) {
Map<String, Object> schema = new LinkedHashMap<>();
schema.put("type", parameter.getType() == null ? "string" : parameter.getType());
putOptionalString(schema, "description", parameter.getDescription());
if (parameter.getChildren() != null && !parameter.getChildren().isEmpty()) {
Map<String, Object> children = new LinkedHashMap<>();
for (Parameter child : parameter.getChildren()) {
if (child != null && child.getName() != null && !child.getName().isBlank()) {
children.put(child.getName(), parameterSchema(child));
}
}
if ("array".equalsIgnoreCase(parameter.getType())) {
schema.put("items", firstArrayItemSchema(parameter.getChildren()));
} else {
schema.put("properties", children);
}
}
return schema;
}
private Map<String, Object> firstArrayItemSchema(List<Parameter> children) {
return children.stream().filter(Objects::nonNull).findFirst()
.map(this::parameterSchema)
.orElse(Map.of("type", "string"));
}
private void putOptionalString(Map<String, Object> target, String key, String value) {
if (value != null && !value.isBlank()) {
target.put(key, value);
}
}
private String resolveRuntimeToolName(Tool tool, AgentToolBinding binding) {
String bindingName = binding == null ? null : binding.getToolName();
if (ChatToolNameHelper.isSafeToolName(bindingName)) {
return bindingName;
}
String toolName = tool == null ? null : tool.getName();
if (ChatToolNameHelper.isSafeToolName(toolName)) {
return toolName;
}
BigInteger targetId = binding == null ? null : binding.getTargetId();
return ChatToolNameHelper.buildFallbackName("tool", targetId);
}
private String asyncToolName(Tool tool, AgentToolBinding binding, String fallbackPrefix) {
String base = resolveRuntimeToolName(tool, binding).toLowerCase(Locale.ROOT)
.replaceAll("[^a-z0-9_]", "_")
.replaceAll("_+", "_");
if (!base.isBlank() && Character.isDigit(base.charAt(0))) {
base = fallbackPrefix + "_" + base;
}
if (ASYNC_SAFE_NAME.matcher(base).matches()) {
return base;
}
return fallbackPrefix + "_" + (binding == null || binding.getTargetId() == null ? "unknown" : binding.getTargetId());
}
private String displayName(Tool tool, String fallback) {
String value = tool == null ? null : tool.getName();
return firstNonBlank(firstNonBlank(fallback, value), "工具调用");
}
private String safeDescription(String description) {
return description == null || description.isBlank() ? "EasyFlow Agent 工具" : description;
}
private Map<String, Object> sanitizedHitlMetadata(Map<String, Object> config) {
Map<String, Object> metadata = new LinkedHashMap<>();
if (config != null) {
config.forEach((key, value) -> {
if (!isHitlPromptKey(key)) {
metadata.put(key, value);
}
});
}
return metadata;
}
private boolean isHitlPromptKey(String key) {
if (key == null) {
return false;
}
String normalized = key.trim();
return "prompt".equalsIgnoreCase(normalized)
|| "question".equalsIgnoreCase(normalized)
|| "approvalPrompt".equalsIgnoreCase(normalized);
}
private Map.Entry<String, Map<String, Object>> firstMcpServer(Mcp mcp) {
Map<String, Object> config = parseMcpConfig(mcp);
Map<String, Object> servers = mapValue(config, "mcpServers");
if (servers.isEmpty()) {
throw new BusinessException("MCP 配置 JSON 中没有找到任何 MCP 服务名称");
}
Map.Entry<String, Object> first = servers.entrySet().iterator().next();
if (!(first.getValue() instanceof Map<?, ?> rawServer)) {
throw new BusinessException("MCP 服务配置必须是对象:" + first.getKey());
}
Map<String, Object> serverConfig = new LinkedHashMap<>();
rawServer.forEach((key, value) -> serverConfig.put(String.valueOf(key), value));
return Map.entry(first.getKey(), serverConfig);
}
private Map<String, Object> parseMcpConfig(Mcp mcp) {
String configJson = mcp == null ? null : mcp.getConfigJson();
if (configJson == null || configJson.isBlank()) {
throw new BusinessException("MCP 配置 JSON 不能为空");
}
try {
return objectMapper.readValue(configJson, new com.fasterxml.jackson.core.type.TypeReference<>() {});
} catch (Exception e) {
throw new BusinessException("MCP 配置 JSON 格式错误");
}
}
private McpTransportType parseMcpTransportType(Mcp mcp, Map<String, Object> serverConfig) {
String transport = firstNonBlank(mcp == null ? null : mcp.getTransportType(), stringValue(serverConfig, "transport", null));
return McpTransportType.from(transport);
}
private String mcpRuntimeName(Mcp mcp) {
BigInteger id = mcp == null ? null : mcp.getId();
return "mcp_" + safeToolNameSegment(id == null ? "unknown" : String.valueOf(id));
}
private String mcpRuntimeToolPrefix(BigInteger mcpId) {
return "mcp_" + safeToolNameSegment(String.valueOf(mcpId)) + "_";
}
private String safeToolNameSegment(String value) {
String normalized = String.valueOf(value == null ? "" : value).trim()
.replaceAll("[^A-Za-z0-9_-]", "_")
.replaceAll("_+", "_");
return normalized.isBlank() ? "tool" : normalized;
}
private List<String> stringListValue(Map<String, Object> map, String key) {
Object value = map == null ? null : map.get(key);
if (value == null) {
return new ArrayList<>();
}
if (value instanceof Collection<?> collection) {
List<String> result = new ArrayList<>();
for (Object item : collection) {
if (item != null) {
result.add(String.valueOf(item));
}
}
return result;
}
throw new BusinessException("Agent 配置字段必须是数组:" + key);
}
private Duration durationValue(Map<String, Object> map, String key) {
Object value = map == null ? null : map.get(key);
if (value == null) {
return null;
}
if (value instanceof Number number) {
return Duration.ofSeconds(number.longValue());
}
String text = String.valueOf(value).trim();
if (text.isEmpty()) {
return null;
}
try {
return Duration.parse(text);
} catch (Exception ignored) {
try {
return Duration.ofSeconds(Long.parseLong(text));
} catch (NumberFormatException e) {
throw new BusinessException("Agent 配置字段必须是秒数或 Duration" + key);
}
}
}
private List<String> resolveMcpInputs(List<String> values) {
if (values == null || values.isEmpty()) {
return new ArrayList<>();
}
List<String> result = new ArrayList<>(values.size());
for (String value : values) {
result.add(resolveMcpInput(value));
}
return result;
}
private Map<String, String> resolveMcpInputMap(Map<String, String> values) {
if (values == null || values.isEmpty()) {
return new LinkedHashMap<>();
}
Map<String, String> result = new LinkedHashMap<>();
values.forEach((key, value) -> result.put(key, resolveMcpInput(value)));
return result;
}
private String resolveMcpInput(String value) {
if (value == null || value.isBlank()) {
return value;
}
Matcher matcher = MCP_INPUT_PATTERN.matcher(value);
StringBuffer resolved = new StringBuffer();
while (matcher.find()) {
String inputKey = matcher.group(1);
String resolvedValue = System.getProperty("mcp.input." + inputKey);
if (resolvedValue == null || resolvedValue.isBlank()) {
throw new BusinessException("MCP 输入变量未解析:" + inputKey);
}
matcher.appendReplacement(resolved, Matcher.quoteReplacement(resolvedValue));
}
matcher.appendTail(resolved);
return resolved.toString();
}
private Map<String, Object> mapValue(Map<String, Object> map, String key) {
Object value = map == null ? null : map.get(key);
if (value == null) {
return new LinkedHashMap<>();
}
if (value instanceof Map<?, ?> raw) {
Map<String, Object> result = new LinkedHashMap<>();
raw.forEach((rawKey, rawValue) -> result.put(String.valueOf(rawKey), rawValue));
return result;
}
throw new BusinessException("Agent 配置字段必须是对象:" + key);
}
private Map<String, String> stringMapValue(Map<String, Object> map, String key) {
Map<String, Object> raw = mapValue(map, key);
Map<String, String> result = new LinkedHashMap<>();
raw.forEach((rawKey, rawValue) -> {
if (rawValue != null) {
result.put(rawKey, String.valueOf(rawValue));
}
});
return result;
}
private String stringValue(Map<String, Object> map, String key, String defaultValue) {
Object value = map == null ? null : map.get(key);
if (value == null) {
return defaultValue;
}
String text = String.valueOf(value);
return text.isBlank() ? defaultValue : text;
}
private String firstNonBlank(String first, String second) {
return first == null || first.isBlank() ? second : first;
}
private BigInteger firstNonNull(BigInteger first, BigInteger second) {
return first == null ? second : first;
}
private record CompiledSyncTool(AgentToolSpec spec, AgentToolInvoker invoker) {
}
private interface ToolCall {
/**
* 调用工具。
*
* @return 工具结果
*/
Object invoke();
}
}

View File

@@ -0,0 +1,36 @@
package tech.easyflow.agent.runtime.tool;
import com.easyagents.core.model.chat.tool.Tool;
import org.springframework.stereotype.Service;
import tech.easyflow.ai.entity.PluginItem;
import java.util.Map;
/**
* Agent Plugin 工具执行器。
*/
@Service
public class PluginToolExecutor {
/**
* 构建 Plugin 工具声明来源。
*
* @param pluginItem 插件工具
* @return 工具声明来源
*/
public Tool buildTool(PluginItem pluginItem) {
return pluginItem.toFunction();
}
/**
* 执行 Plugin 工具。
*
* @param pluginItem 插件工具
* @param arguments 执行参数
* @return 执行结果
*/
public AgentToolExecutionResult execute(PluginItem pluginItem, Map<String, Object> arguments) {
Object result = buildTool(pluginItem).invoke(arguments == null ? Map.of() : arguments);
return new AgentToolExecutionResult(result, null);
}
}

View File

@@ -0,0 +1,71 @@
package tech.easyflow.agent.runtime.tool;
import com.easyagents.flow.core.chain.runtime.ChainExecutor;
import com.easyagents.core.model.chat.tool.Tool;
import org.springframework.stereotype.Service;
import tech.easyflow.ai.easyagents.tool.WorkflowTool;
import tech.easyflow.ai.easyagentsflow.support.PublishedWorkflowDefinitionIds;
import tech.easyflow.ai.entity.Workflow;
import java.util.Map;
/**
* Agent Workflow 工具执行器。
*/
@Service
public class WorkflowToolExecutor {
private final ChainExecutor chainExecutor;
/**
* 创建 Workflow 工具执行器。
*
* @param chainExecutor 工作流执行器
*/
public WorkflowToolExecutor(ChainExecutor chainExecutor) {
this.chainExecutor = chainExecutor;
}
/**
* 构建 Workflow 工具声明来源。
*
* @param workflow 工作流
* @return 工具声明来源
*/
public Tool buildTool(Workflow workflow) {
return new WorkflowTool(workflow, true, definitionId(workflow));
}
/**
* 执行 Workflow 工具。
*
* @param workflow 工作流
* @param arguments 执行参数
* @return 执行结果
*/
public AgentToolExecutionResult execute(Workflow workflow, Map<String, Object> arguments) {
Object result = chainExecutor.execute(definitionId(workflow), arguments == null ? Map.of() : arguments);
return new AgentToolExecutionResult(result, resolveBusinessExecutionId(result));
}
private String definitionId(Workflow workflow) {
return PublishedWorkflowDefinitionIds.published(String.valueOf(workflow == null ? null : workflow.getId()));
}
private String resolveBusinessExecutionId(Object result) {
if (result instanceof Map<?, ?> map) {
Object value = firstValue(map, "executionId", "executeId", "chainId", "runId");
return value == null ? null : String.valueOf(value);
}
return null;
}
private Object firstValue(Map<?, ?> map, String... keys) {
for (String key : keys) {
if (map.containsKey(key)) {
return map.get(key);
}
}
return null;
}
}

View File

@@ -8,6 +8,7 @@ import org.junit.Test;
import tech.easyflow.agent.entity.Agent; import tech.easyflow.agent.entity.Agent;
import tech.easyflow.agent.entity.AgentToolBinding; import tech.easyflow.agent.entity.AgentToolBinding;
import tech.easyflow.agent.enums.AgentToolType; import tech.easyflow.agent.enums.AgentToolType;
import tech.easyflow.agent.runtime.tool.AgentToolRuntimeCompiler;
import tech.easyflow.ai.entity.Mcp; import tech.easyflow.ai.entity.Mcp;
import tech.easyflow.ai.entity.Model; import tech.easyflow.ai.entity.Model;
import tech.easyflow.ai.entity.ModelProvider; import tech.easyflow.ai.entity.ModelProvider;
@@ -36,10 +37,13 @@ public class AgentDefinitionCompilerMcpTest {
BigInteger mcpId = BigInteger.valueOf(20L); BigInteger mcpId = BigInteger.valueOf(20L);
Model model = model(modelId); Model model = model(modelId);
Mcp mcp = mcp(mcpId); Mcp mcp = mcp(mcpId);
AgentDefinitionCompiler compiler = new AgentDefinitionCompiler(); AgentRuntimeCompiler compiler = new AgentRuntimeCompiler();
AgentToolRuntimeCompiler toolCompiler = new AgentToolRuntimeCompiler();
setField(compiler, "objectMapper", new com.fasterxml.jackson.databind.ObjectMapper()); setField(compiler, "objectMapper", new com.fasterxml.jackson.databind.ObjectMapper());
setField(compiler, "modelService", modelService(model)); setField(compiler, "modelService", modelService(model));
setField(compiler, "mcpService", mcpService(mcp)); setField(toolCompiler, "objectMapper", new com.fasterxml.jackson.databind.ObjectMapper());
setField(toolCompiler, "mcpService", mcpService(mcp));
setField(compiler, "agentToolRuntimeCompiler", toolCompiler);
Agent agent = agent(modelId, mcpId); Agent agent = agent(modelId, mcpId);

View File

@@ -429,11 +429,11 @@ public class AgentRunServiceDraftAndHitlTest {
@Test @Test
public void startRuntimeShouldUseDraftSessionStoreWithoutBindingMysqlSession() throws Exception { public void startRuntimeShouldUseDraftSessionStoreWithoutBindingMysqlSession() throws Exception {
AgentRunService service = new AgentRunService(); AgentRunService service = new AgentRunService();
RecordingAgentDefinitionCompiler compiler = new RecordingAgentDefinitionCompiler(); RecordingAgentRuntimeCompiler compiler = new RecordingAgentRuntimeCompiler();
RecordingAgentRuntime runtime = new RecordingAgentRuntime(); RecordingAgentRuntime runtime = new RecordingAgentRuntime();
RecordingAgentRuntimeFactory runtimeFactory = new RecordingAgentRuntimeFactory(runtime); RecordingAgentRuntimeFactory runtimeFactory = new RecordingAgentRuntimeFactory(runtime);
AgentSessionStore draftStore = new InMemoryAgentSessionStore(); AgentSessionStore draftStore = new InMemoryAgentSessionStore();
setField(service, "agentDefinitionCompiler", compiler); setField(service, "agentRuntimeCompiler", compiler);
setField(service, "agentRuntimeFactory", runtimeFactory); setField(service, "agentRuntimeFactory", runtimeFactory);
setField(service, "agentRunRegistry", new AgentRunRegistry()); setField(service, "agentRunRegistry", new AgentRunRegistry());
@@ -1001,7 +1001,7 @@ public class AgentRunServiceDraftAndHitlTest {
} }
} }
private static class RecordingAgentDefinitionCompiler extends AgentDefinitionCompiler { private static class RecordingAgentRuntimeCompiler extends AgentRuntimeCompiler {
@Override @Override
public AgentRuntimeBundle compile(Agent agent) { public AgentRuntimeBundle compile(Agent agent) {

View File

@@ -0,0 +1,195 @@
package tech.easyflow.agent.runtime.asynctool;
import com.easyagents.agent.runtime.tool.AgentToolContext;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolCancelRequest;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolObserveRequest;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolResultRequest;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSubmitResult;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskView;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.UnaryOperator;
/**
* EasyFlow 异步业务工具基类测试。
*/
public class AbstractAgentAsyncSubToolsTest {
/**
* 验证 submit、observe、result 与 list 的基础任务生命周期。
*
* @throws Exception 等待后台执行超时时抛出
*/
@Test
public void asyncSubToolsShouldSubmitObserveResultAndListCurrentSessionTasks() throws Exception {
ThreadPoolTaskExecutor executor = executor();
try {
InMemoryTaskStore store = new InMemoryTaskStore();
TestAsyncSubTools subTools = new TestAsyncSubTools(store, executor);
AgentToolContext context = context("session-a");
AsyncToolSubmitResult submitted = subTools.submit(Map.of("keyword", "hello"), context);
Assert.assertEquals(AsyncToolTaskStatus.PENDING, submitted.getStatus());
Assert.assertTrue(submitted.getTaskId().startsWith("async_"));
AsyncToolTaskView completed = waitTerminal(subTools, submitted.getTaskId(), context);
Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, completed.getStatus());
Assert.assertEquals(Map.of("echo", "hello"), completed.getResult());
Assert.assertTrue(completed.getNextCursor() >= 2L);
AsyncToolResultRequest resultRequest = new AsyncToolResultRequest();
resultRequest.setTaskId(submitted.getTaskId());
resultRequest.setCursor(1L);
AsyncToolTaskView result = subTools.result(resultRequest, context);
Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, result.getStatus());
Assert.assertEquals(Map.of("echo", "hello"), result.getResult());
Assert.assertFalse(result.getEvents().isEmpty());
Assert.assertEquals(1, subTools.list(null, context).getTasks().size());
Assert.assertTrue(subTools.list(null, context("session-b")).getTasks().isEmpty());
AsyncToolTaskView crossedSessionView = observe(subTools, submitted.getTaskId(), context("session-b"));
Assert.assertEquals(AsyncToolTaskStatus.FAILED, crossedSessionView.getStatus());
Assert.assertEquals("TASK_NOT_FOUND", crossedSessionView.getErrorType());
} finally {
executor.shutdown();
}
}
/**
* 验证首版取消语义返回明确失败结果。
*/
@Test
public void cancelShouldReturnUnsupportedFailure() {
TestAsyncSubTools subTools = new TestAsyncSubTools(new InMemoryTaskStore(), executor());
AsyncToolCancelRequest request = new AsyncToolCancelRequest();
request.setTaskId("task-1");
var result = subTools.cancel(request, context("session-a"));
Assert.assertEquals(AsyncToolTaskStatus.FAILED, result.getStatus());
Assert.assertEquals("不支持取消", result.getMessage());
}
private AsyncToolTaskView waitTerminal(TestAsyncSubTools subTools, String taskId, AgentToolContext context) throws Exception {
long deadline = System.currentTimeMillis() + 3000L;
AsyncToolTaskView view = observe(subTools, taskId, context);
while (!Boolean.TRUE.equals(view.getTerminal()) && System.currentTimeMillis() < deadline) {
Thread.sleep(20L);
view = observe(subTools, taskId, context);
}
Assert.assertTrue("异步任务应在测试超时前完成", Boolean.TRUE.equals(view.getTerminal()));
return view;
}
private AsyncToolTaskView observe(TestAsyncSubTools subTools, String taskId, AgentToolContext context) {
AsyncToolObserveRequest request = new AsyncToolObserveRequest();
request.setTaskId(taskId);
request.setCursor(0L);
return subTools.observe(request, context);
}
private AgentToolContext context(String sessionId) {
AgentToolContext context = new AgentToolContext();
context.setRequestId("request-1");
context.setTraceId("trace-1");
context.setSessionId(sessionId);
context.setAgentId("agent-1");
context.setToolCallId("tool-call-1");
return context;
}
private ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setQueueCapacity(4);
executor.setThreadNamePrefix("async-sub-tools-test-");
executor.initialize();
return executor;
}
private static final class TestAsyncSubTools extends AbstractAgentAsyncSubTools {
private TestAsyncSubTools(AgentAsyncToolTaskStore taskStore, ThreadPoolTaskExecutor taskExecutor) {
super(taskStore, taskExecutor);
}
@Override
protected String toolType() {
return "PLUGIN";
}
@Override
protected String toolName() {
return "test_tool";
}
@Override
protected String displayName() {
return "测试工具";
}
@Override
protected String businessId() {
return "business-1";
}
@Override
protected AgentToolExecutionResult executeBusiness(Map<String, Object> arguments) {
return new AgentToolExecutionResult(Map.of("echo", arguments.get("keyword")), "business-run-1");
}
}
private static final class InMemoryTaskStore implements AgentAsyncToolTaskStore {
private final Map<String, AgentAsyncToolTaskRecord> records = new ConcurrentHashMap<>();
@Override
public void create(AgentAsyncToolTaskRecord record) {
record.setSessionScopedKey(key(record.getSessionId(), record.getTaskId()));
records.put(record.getSessionScopedKey(), record);
}
@Override
public Optional<AgentAsyncToolTaskRecord> get(String sessionId, String taskId) {
return Optional.ofNullable(records.get(key(sessionId, taskId)));
}
@Override
public Optional<AgentAsyncToolTaskRecord> update(String sessionId,
String taskId,
UnaryOperator<AgentAsyncToolTaskRecord> updater) {
String key = key(sessionId, taskId);
AgentAsyncToolTaskRecord updated = records.computeIfPresent(key,
(ignored, existing) -> updater == null ? existing : updater.apply(existing));
return Optional.ofNullable(updated);
}
@Override
public List<AgentAsyncToolTaskRecord> list(String sessionId, AsyncToolTaskStatus status) {
List<AgentAsyncToolTaskRecord> result = new ArrayList<>();
for (AgentAsyncToolTaskRecord record : records.values()) {
if (sessionId.equals(record.getSessionId()) && (status == null || status == record.getStatus())) {
result.add(record);
}
}
result.sort(Comparator.comparing(AgentAsyncToolTaskRecord::getCreatedAt).reversed());
return result;
}
private String key(String sessionId, String taskId) {
return sessionId + ":" + taskId;
}
}
}

View File

@@ -0,0 +1,213 @@
package tech.easyflow.agent.runtime.asynctool;
import com.easyagents.agent.runtime.tool.AgentToolContext;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolObserveRequest;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolResultRequest;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSubmitResult;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus;
import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskView;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult;
import tech.easyflow.agent.runtime.tool.PluginToolExecutor;
import tech.easyflow.agent.runtime.tool.WorkflowToolExecutor;
import tech.easyflow.ai.entity.PluginItem;
import tech.easyflow.ai.entity.Workflow;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.UnaryOperator;
/**
* Workflow 与 Plugin 异步子工具测试。
*/
public class WorkflowPluginAsyncSubToolsTest {
/**
* 验证 Workflow 异步子工具会把业务执行结果保留到任务视图。
*
* @throws Exception 等待后台执行超时时抛出
*/
@Test
public void workflowAsyncSubToolsShouldKeepBusinessResultInTaskView() throws Exception {
ThreadPoolTaskExecutor executor = executor();
try {
Map<String, Object> businessResult = Map.of("workflowOutput", "ok");
WorkflowAsyncSubTools subTools = new WorkflowAsyncSubTools(workflow(),
"workflow_demo",
"测试工作流",
new StubWorkflowToolExecutor(businessResult),
new InMemoryTaskStore(),
executor);
AsyncToolTaskView view = submitAndResult(subTools);
Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, view.getStatus());
Assert.assertEquals(businessResult, view.getResult());
Assert.assertEquals("workflow-run-1", view.getPayload().get("businessExecutionId"));
} finally {
executor.shutdown();
}
}
/**
* 验证 Plugin 异步子工具会把业务执行结果保留到任务视图。
*
* @throws Exception 等待后台执行超时时抛出
*/
@Test
public void pluginAsyncSubToolsShouldKeepBusinessResultInTaskView() throws Exception {
ThreadPoolTaskExecutor executor = executor();
try {
Map<String, Object> businessResult = Map.of("pluginOutput", List.of("a", "b"));
PluginAsyncSubTools subTools = new PluginAsyncSubTools(pluginItem(),
"plugin_demo",
"测试插件",
new StubPluginToolExecutor(businessResult),
new InMemoryTaskStore(),
executor);
AsyncToolTaskView view = submitAndResult(subTools);
Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, view.getStatus());
Assert.assertEquals(businessResult, view.getResult());
} finally {
executor.shutdown();
}
}
private AsyncToolTaskView submitAndResult(AbstractAgentAsyncSubTools subTools) throws Exception {
AgentToolContext context = context();
AsyncToolSubmitResult submitted = subTools.submit(Map.of("keyword", "hello"), context);
waitTerminal(subTools, submitted.getTaskId(), context);
AsyncToolResultRequest request = new AsyncToolResultRequest();
request.setTaskId(submitted.getTaskId());
return subTools.result(request, context);
}
private void waitTerminal(AbstractAgentAsyncSubTools subTools, String taskId, AgentToolContext context) throws Exception {
long deadline = System.currentTimeMillis() + 3000L;
AsyncToolTaskView view = observe(subTools, taskId, context);
while (!Boolean.TRUE.equals(view.getTerminal()) && System.currentTimeMillis() < deadline) {
Thread.sleep(20L);
view = observe(subTools, taskId, context);
}
Assert.assertTrue("异步任务应在测试超时前完成", Boolean.TRUE.equals(view.getTerminal()));
}
private AsyncToolTaskView observe(AbstractAgentAsyncSubTools subTools, String taskId, AgentToolContext context) {
AsyncToolObserveRequest request = new AsyncToolObserveRequest();
request.setTaskId(taskId);
return subTools.observe(request, context);
}
private AgentToolContext context() {
AgentToolContext context = new AgentToolContext();
context.setRequestId("request-1");
context.setTraceId("trace-1");
context.setSessionId("session-1");
context.setAgentId("agent-1");
context.setToolCallId("tool-call-1");
return context;
}
private Workflow workflow() {
Workflow workflow = new Workflow();
workflow.setId(BigInteger.valueOf(101L));
workflow.setTitle("测试工作流");
return workflow;
}
private PluginItem pluginItem() {
PluginItem pluginItem = new PluginItem();
pluginItem.setId(BigInteger.valueOf(102L));
pluginItem.setName("测试插件");
return pluginItem;
}
private ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setQueueCapacity(4);
executor.setThreadNamePrefix("workflow-plugin-async-test-");
executor.initialize();
return executor;
}
private static final class StubWorkflowToolExecutor extends WorkflowToolExecutor {
private final Map<String, Object> businessResult;
private StubWorkflowToolExecutor(Map<String, Object> businessResult) {
super(null);
this.businessResult = businessResult;
}
@Override
public AgentToolExecutionResult execute(Workflow workflow, Map<String, Object> arguments) {
return new AgentToolExecutionResult(businessResult, "workflow-run-1");
}
}
private static final class StubPluginToolExecutor extends PluginToolExecutor {
private final Map<String, Object> businessResult;
private StubPluginToolExecutor(Map<String, Object> businessResult) {
this.businessResult = businessResult;
}
@Override
public AgentToolExecutionResult execute(PluginItem pluginItem, Map<String, Object> arguments) {
return new AgentToolExecutionResult(businessResult, null);
}
}
private static final class InMemoryTaskStore implements AgentAsyncToolTaskStore {
private final Map<String, AgentAsyncToolTaskRecord> records = new ConcurrentHashMap<>();
@Override
public void create(AgentAsyncToolTaskRecord record) {
record.setSessionScopedKey(key(record.getSessionId(), record.getTaskId()));
records.put(record.getSessionScopedKey(), record);
}
@Override
public Optional<AgentAsyncToolTaskRecord> get(String sessionId, String taskId) {
return Optional.ofNullable(records.get(key(sessionId, taskId)));
}
@Override
public Optional<AgentAsyncToolTaskRecord> update(String sessionId,
String taskId,
UnaryOperator<AgentAsyncToolTaskRecord> updater) {
AgentAsyncToolTaskRecord updated = records.computeIfPresent(key(sessionId, taskId),
(ignored, existing) -> updater == null ? existing : updater.apply(existing));
return Optional.ofNullable(updated);
}
@Override
public List<AgentAsyncToolTaskRecord> list(String sessionId, AsyncToolTaskStatus status) {
List<AgentAsyncToolTaskRecord> result = new ArrayList<>();
for (AgentAsyncToolTaskRecord record : records.values()) {
if (sessionId.equals(record.getSessionId()) && (status == null || status == record.getStatus())) {
result.add(record);
}
}
result.sort(Comparator.comparing(AgentAsyncToolTaskRecord::getCreatedAt).reversed());
return result;
}
private String key(String sessionId, String taskId) {
return sessionId + ":" + taskId;
}
}
}

View File

@@ -0,0 +1,239 @@
package tech.easyflow.agent.runtime.tool;
import com.easyagents.agent.runtime.tool.AgentToolSpec;
import com.easyagents.core.model.chat.tool.Parameter;
import com.easyagents.core.model.chat.tool.Tool;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import tech.easyflow.agent.entity.Agent;
import tech.easyflow.agent.entity.AgentToolBinding;
import tech.easyflow.agent.enums.AgentToolType;
import tech.easyflow.ai.entity.PluginItem;
import tech.easyflow.ai.entity.Workflow;
import tech.easyflow.common.web.exceptions.BusinessException;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Agent 工具运行时编译测试。
*/
public class AgentToolRuntimeCompilerTest {
/**
* 验证 Workflow 默认按同步工具编译。
*
* @throws Exception 反射注入依赖失败时抛出
*/
@Test
public void compileShouldUseSyncModeByDefault() throws Exception {
AgentToolRuntimeCompiler compiler = compiler();
AgentToolRuntimeCompilation compilation = compiler.compile(agent(workflowBinding(null, false, "flow-sync")));
Assert.assertEquals(List.of("flow-sync"), toolNames(compilation));
Assert.assertEquals(1, compilation.getToolInvokers().size());
Assert.assertFalse(compilation.getToolSpecs().get(0).isApprovalRequired());
}
/**
* 验证非法执行模式会回退为同步工具。
*
* @throws Exception 反射注入依赖失败时抛出
*/
@Test
public void compileShouldFallbackToSyncWhenExecutionModeInvalid() throws Exception {
AgentToolRuntimeCompiler compiler = compiler();
AgentToolRuntimeCompilation compilation = compiler.compile(agent(workflowBinding("BAD", false, "flow-sync")));
Assert.assertEquals(List.of("flow-sync"), toolNames(compilation));
Assert.assertEquals(1, compilation.getToolInvokers().size());
}
/**
* 验证 Workflow 异步模式会展开为五个固定子工具。
*
* @throws Exception 反射注入依赖失败时抛出
*/
@Test
public void compileShouldExpandWorkflowAsyncSubToolsAndNormalizeName() throws Exception {
AgentToolRuntimeCompiler compiler = compiler();
AgentToolRuntimeCompilation compilation = compiler.compile(agent(workflowBinding("ASYNC", true, "flow-alpha")));
Assert.assertEquals(List.of(
"flow_alpha_submit",
"flow_alpha_observe",
"flow_alpha_result",
"flow_alpha_cancel",
"flow_alpha_list"
), toolNames(compilation));
Assert.assertEquals(5, compilation.getToolInvokers().size());
Assert.assertEquals(List.of("keyword"), compilation.getToolSpecs().get(0).getParametersSchema().get("required"));
Assert.assertTrue(compilation.getToolSpecs().get(0).isApprovalRequired());
Assert.assertEquals("确认执行?", compilation.getToolSpecs().get(0).getApprovalRequest().getApprovalPrompt());
Assert.assertFalse(compilation.getToolSpecs().get(1).isApprovalRequired());
Assert.assertEquals("flow_alpha", compilation.getToolSpecs().get(0).getMetadata().get("asyncToolName"));
Assert.assertEquals("submit", compilation.getToolSpecs().get(0).getMetadata().get("asyncToolPhase"));
}
/**
* 验证 Plugin 异步模式同样展开为五个固定子工具。
*
* @throws Exception 反射注入依赖失败时抛出
*/
@Test
public void compileShouldExpandPluginAsyncSubTools() throws Exception {
AgentToolRuntimeCompiler compiler = compiler();
AgentToolRuntimeCompilation compilation = compiler.compile(agent(pluginBinding("ASYNC", "plugin-tool")));
Assert.assertEquals(List.of(
"plugin_tool_submit",
"plugin_tool_observe",
"plugin_tool_result",
"plugin_tool_cancel",
"plugin_tool_list"
), toolNames(compilation));
Assert.assertEquals(5, compilation.getToolInvokers().size());
for (AgentToolSpec spec : compilation.getToolSpecs()) {
Assert.assertEquals(Boolean.TRUE, spec.getMetadata().get("asyncTool"));
Assert.assertEquals("插件工具", spec.getMetadata().get("toolDisplayName"));
}
}
/**
* 验证异步工具名归一化后发生冲突时会在编译阶段失败。
*
* @throws Exception 反射注入依赖失败时抛出
*/
@Test
public void compileShouldRejectNormalizedAsyncToolNameCollision() throws Exception {
AgentToolRuntimeCompiler compiler = compiler();
AgentToolBinding first = workflowBinding("ASYNC", false, "flow-alpha");
AgentToolBinding second = workflowBinding("ASYNC", false, "flow_alpha");
second.setId(BigInteger.valueOf(13L));
second.setTargetId(BigInteger.valueOf(103L));
try {
compiler.compile(agent(List.of(first, second)));
Assert.fail("异步工具名冲突时应编译失败");
} catch (BusinessException e) {
Assert.assertTrue(e.getMessage().contains("flow_alpha_submit"));
}
}
private AgentToolRuntimeCompiler compiler() throws Exception {
AgentToolRuntimeCompiler compiler = new AgentToolRuntimeCompiler();
setField(compiler, "objectMapper", new ObjectMapper());
setField(compiler, "workflowToolExecutor", new StubWorkflowToolExecutor());
setField(compiler, "pluginToolExecutor", new StubPluginToolExecutor());
return compiler;
}
private Agent agent(AgentToolBinding binding) {
return agent(List.of(binding));
}
private Agent agent(List<AgentToolBinding> bindings) {
Agent agent = new Agent();
agent.setId(BigInteger.ONE);
agent.setToolBindings(bindings);
return agent;
}
private AgentToolBinding workflowBinding(String executionMode, boolean hitlEnabled, String toolName) {
AgentToolBinding binding = new AgentToolBinding();
binding.setId(BigInteger.valueOf(11L));
binding.setToolType(AgentToolType.WORKFLOW.name());
binding.setTargetId(BigInteger.valueOf(101L));
binding.setToolName(toolName);
binding.setEnabled(true);
binding.setHitlEnabled(hitlEnabled);
binding.setHitlConfigJson(Map.of("prompt", "确认执行?"));
binding.setOptionsJson(executionMode == null ? Map.of() : Map.of("executionMode", executionMode));
binding.setResourceSnapshot(Map.of(
"id", BigInteger.valueOf(101L),
"title", "客户检索工作流",
"description", "按关键词检索客户",
"englishName", "flow-alpha"
));
return binding;
}
private AgentToolBinding pluginBinding(String executionMode, String toolName) {
AgentToolBinding binding = new AgentToolBinding();
binding.setId(BigInteger.valueOf(12L));
binding.setToolType(AgentToolType.PLUGIN.name());
binding.setTargetId(BigInteger.valueOf(102L));
binding.setToolName(toolName);
binding.setEnabled(true);
binding.setOptionsJson(Map.of("executionMode", executionMode));
binding.setResourceSnapshot(Map.of(
"id", BigInteger.valueOf(102L),
"name", "插件工具",
"description", "调用插件",
"englishName", "plugin-tool"
));
return binding;
}
private List<String> toolNames(AgentToolRuntimeCompilation compilation) {
return compilation.getToolSpecs().stream().map(AgentToolSpec::getName).collect(Collectors.toList());
}
private void setField(Object target, String fieldName, Object value) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}
private Tool testTool(String name, String description) {
Parameter parameter = new Parameter();
parameter.setName("keyword");
parameter.setDescription("关键词");
parameter.setType("string");
parameter.setRequired(true);
return Tool.builder()
.name(name)
.description(description)
.addParameter(parameter)
.function(arguments -> Map.of("ok", true))
.build();
}
private final class StubWorkflowToolExecutor extends WorkflowToolExecutor {
private StubWorkflowToolExecutor() {
super(null);
}
@Override
public Tool buildTool(Workflow workflow) {
return testTool(workflow.getEnglishName(), workflow.getDescription());
}
@Override
public AgentToolExecutionResult execute(Workflow workflow, Map<String, Object> arguments) {
return new AgentToolExecutionResult(Map.of("ok", true), "wf-run-1");
}
}
private final class StubPluginToolExecutor extends PluginToolExecutor {
@Override
public Tool buildTool(PluginItem pluginItem) {
return testTool(pluginItem.getEnglishName(), pluginItem.getDescription());
}
@Override
public AgentToolExecutionResult execute(PluginItem pluginItem, Map<String, Object> arguments) {
return new AgentToolExecutionResult(Map.of("ok", true), null);
}
}
}

View File

@@ -10,6 +10,7 @@ public class EasyFlowThreadPoolProperties {
private Pool sse = new Pool(4, 16, 2000, 30, true); private Pool sse = new Pool(4, 16, 2000, 30, true);
private Pool documentImport = new Pool(2, 4, 200, 60, true); private Pool documentImport = new Pool(2, 4, 200, 60, true);
private Pool agentAsyncTool = new Pool(2, 8, 200, 60, true);
/** /**
* 获取 SSE 线程池配置。 * 获取 SSE 线程池配置。
@@ -47,6 +48,24 @@ public class EasyFlowThreadPoolProperties {
this.documentImport = documentImport; this.documentImport = documentImport;
} }
/**
* 获取 Agent 异步工具后台执行线程池配置。
*
* @return Agent 异步工具线程池配置
*/
public Pool getAgentAsyncTool() {
return agentAsyncTool;
}
/**
* 设置 Agent 异步工具后台执行线程池配置。
*
* @param agentAsyncTool Agent 异步工具线程池配置
*/
public void setAgentAsyncTool(Pool agentAsyncTool) {
this.agentAsyncTool = agentAsyncTool;
}
/** /**
* 线程池配置项。 * 线程池配置项。
*/ */

View File

@@ -78,4 +78,30 @@ public class ThreadPoolConfig {
executor.initialize(); executor.initialize();
return executor; return executor;
} }
/**
* 创建 Agent 异步工具后台执行线程池。
*
* @return Agent 异步工具执行线程池
*/
@Bean(name = "agentAsyncToolExecutor")
public ThreadPoolTaskExecutor agentAsyncToolExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
EasyFlowThreadPoolProperties.Pool pool = properties.getAgentAsyncTool();
executor.setCorePoolSize(pool.getCoreSize());
executor.setMaxPoolSize(pool.getMaxSize());
executor.setQueueCapacity(pool.getQueueCapacity());
executor.setKeepAliveSeconds(pool.getKeepAliveSeconds());
executor.setAllowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
executor.setThreadNamePrefix("agent-async-tool-");
executor.setRejectedExecutionHandler((runnable, executorService) -> {
log.error("Agent异步工具线程池过载核心线程数{},最大线程数:{},队列任务数:{}",
executorService.getCorePoolSize(),
executorService.getMaximumPoolSize(),
executorService.getQueue().size());
throw new BusinessException("Agent 异步工具任务繁忙,请稍后重试");
});
executor.initialize();
return executor;
}
} }

View File

@@ -5,6 +5,7 @@ import type {
ChatTimelineKnowledgeHit, ChatTimelineKnowledgeHit,
ChatTimelineMessageItem, ChatTimelineMessageItem,
ChatTimelineToolApprovalPayload, ChatTimelineToolApprovalPayload,
ChatTimelineToolStatus,
} from '@easyflow/common-ui'; } from '@easyflow/common-ui';
import {ChatTimelineBuilder} from '@easyflow/common-ui'; import {ChatTimelineBuilder} from '@easyflow/common-ui';
@@ -30,6 +31,17 @@ function asArray(value: unknown): any[] {
return Array.isArray(value) ? value : []; return Array.isArray(value) ? value : [];
} }
function asyncToolTimelineStatus(
payload: Record<string, any>,
): ChatTimelineToolStatus {
const status = asText(payload.status).toUpperCase();
if (status === 'SUCCEEDED') return 'success';
if (status === 'FAILED' || status === 'TIMEOUT' || status === 'CANCELLED') {
return 'error';
}
return 'running';
}
function asTimestamp(value: unknown) { function asTimestamp(value: unknown) {
if (!value) { if (!value) {
return Date.now(); return Date.now();
@@ -427,19 +439,29 @@ export function applyAgentSseEnvelope(
return; return;
} }
if (domain === 'TOOL' && (type === 'TOOL_CALL' || type === 'TOOL_RESULT')) { if (domain === 'TOOL' && (type === 'TOOL_CALL' || type === 'TOOL_RESULT')) {
const asyncTool = payload.asyncTool === true;
const toolName = normalizeToolName(
payload.toolDisplayName ?? payload.toolName ?? payload.name,
);
ChatTimelineBuilder.upsertToolCall(items, { ChatTimelineBuilder.upsertToolCall(items, {
input: payload.input ?? payload.toolInput, input: payload.input ?? payload.toolInput,
output: payload.output ?? payload.result ?? payload.text, output: asyncTool
status: type === 'TOOL_RESULT' ? 'success' : 'running', ? payload.summary ?? payload.label ?? payload.output ?? payload.result ?? payload.text
: payload.output ?? payload.result ?? payload.text,
status: asyncTool
? asyncToolTimelineStatus(payload)
: type === 'TOOL_RESULT'
? 'success'
: 'running',
statusKey: statusKeyForProjection( statusKey: statusKeyForProjection(
payload, payload,
metadata, metadata,
'knowledge-retrieval', 'knowledge-retrieval',
), ),
toolCallId: normalizeToolCallId(payload), toolCallId: asyncTool
toolName: normalizeToolName( ? asText(payload.toolCallId ?? payload.taskId ?? payload.id)
payload.toolDisplayName ?? payload.toolName ?? payload.name, : normalizeToolCallId(payload),
), toolName,
}); });
return; return;
} }

View File

@@ -79,6 +79,19 @@ const selectedMcpTools = computed(() => {
const selectedMcpToolCount = computed(() => selectedMcpTools.value.length); const selectedMcpToolCount = computed(() => selectedMcpTools.value.length);
const asyncExecutionEnabled = computed({
get() {
return String(props.binding.optionsJson?.executionMode || '').toUpperCase() === 'ASYNC';
},
set(value: boolean) {
props.binding.optionsJson = {
...(props.binding.optionsJson || {}),
executionMode: value ? 'ASYNC' : 'SYNC',
};
emit('change');
},
});
function handleTargetChange(value: string) { function handleTargetChange(value: string) {
const option = props.options.find( const option = props.options.find(
(item) => String(item.value) === String(value), (item) => String(item.value) === String(value),
@@ -147,6 +160,9 @@ function handleTargetChange(value: string) {
<ElFormItem label="执行前确认"> <ElFormItem label="执行前确认">
<ElSwitch v-model="binding.hitlEnabled" @change="emit('change')" /> <ElSwitch v-model="binding.hitlEnabled" @change="emit('change')" />
</ElFormItem> </ElFormItem>
<ElFormItem v-if="kind !== 'mcp'" label="异步执行">
<ElSwitch v-model="asyncExecutionEnabled" />
</ElFormItem>
<ElButton <ElButton
class="agent-form__danger" class="agent-form__danger"
type="danger" type="danger"

View File

@@ -145,6 +145,12 @@ function normalizeToolBinding(
binding: AgentToolBinding, binding: AgentToolBinding,
index: number, index: number,
): AgentToolBinding { ): AgentToolBinding {
const optionsJson = {
...(binding.optionsJson || {}),
};
if (String(optionsJson.executionMode || '').toUpperCase() !== 'ASYNC') {
optionsJson.executionMode = 'SYNC';
}
return { return {
...binding, ...binding,
enabled: binding.enabled !== false, enabled: binding.enabled !== false,
@@ -154,9 +160,9 @@ function normalizeToolBinding(
String( String(
binding.id || binding.id ||
createLocalId(String(binding.toolType || 'tool').toLowerCase()), createLocalId(String(binding.toolType || 'tool').toLowerCase()),
), ),
toolName: normalizeBindingToolName(binding), toolName: normalizeBindingToolName(binding),
optionsJson: binding.optionsJson || {}, optionsJson,
sortNo: binding.sortNo ?? index + 1, sortNo: binding.sortNo ?? index + 1,
}; };
} }

View File

@@ -2,6 +2,7 @@ import type {
ChatTimelineItem, ChatTimelineItem,
ChatTimelineKnowledgeHit, ChatTimelineKnowledgeHit,
ChatTimelineMessageItem, ChatTimelineMessageItem,
ChatTimelineToolStatus,
} from '@easyflow/common-ui'; } from '@easyflow/common-ui';
import {ChatTimelineBuilder} from '@easyflow/common-ui'; import {ChatTimelineBuilder} from '@easyflow/common-ui';
@@ -503,18 +504,25 @@ function projectEventToTimeline(
const displayToolName = asText( const displayToolName = asText(
payload.toolDisplayName ?? rawToolName ?? '工具', payload.toolDisplayName ?? rawToolName ?? '工具',
); );
const asyncTool = payload.asyncTool === true;
ChatTimelineBuilder.upsertToolCall(items, { ChatTimelineBuilder.upsertToolCall(items, {
input: payload.input ?? payload.toolInput, input: payload.input ?? payload.toolInput,
output: payload.output ?? payload.result ?? payload.text, output: asyncTool
status: type === 'TOOL_RESULT' ? 'success' : 'running', ? payload.summary ?? payload.label ?? payload.output ?? payload.result ?? payload.text
: payload.output ?? payload.result ?? payload.text,
status: asyncTool
? asyncToolTimelineStatus(payload)
: type === 'TOOL_RESULT'
? 'success'
: 'running',
statusKey: statusKeyForProjection( statusKey: statusKeyForProjection(
payload, payload,
roundId, roundId,
variantIndex, variantIndex,
'knowledge-retrieval', 'knowledge-retrieval',
), ),
toolCallId: asText(payload.toolCallId ?? payload.tool_call_id ?? payload.id), toolCallId: asText(payload.toolCallId ?? payload.taskId ?? payload.tool_call_id ?? payload.id),
toolName: isHiddenToolName(rawToolName) ? rawToolName : displayToolName, toolName: asyncTool ? displayToolName : isHiddenToolName(rawToolName) ? rawToolName : displayToolName,
}); });
return; return;
} }
@@ -560,6 +568,15 @@ function projectEventToTimeline(
} }
} }
function asyncToolTimelineStatus(payload: Record<string, unknown>): ChatTimelineToolStatus {
const status = asText(payload.status).toUpperCase();
if (status === 'SUCCEEDED') return 'success';
if (status === 'FAILED' || status === 'TIMEOUT' || status === 'CANCELLED') {
return 'error';
}
return 'running';
}
function sortedRounds(rounds: Map<string, AgentTryoutRawRound>) { function sortedRounds(rounds: Map<string, AgentTryoutRawRound>) {
return [...rounds.values()].sort( return [...rounds.values()].sort(
(first, second) => (first, second) =>