diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentRuntimeProperties.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentRuntimeProperties.java index c68190c..e414968 100644 --- a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentRuntimeProperties.java +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/config/AgentRuntimeProperties.java @@ -67,6 +67,11 @@ public class AgentRuntimeProperties { */ private Duration lockRenewInterval = Duration.ofMinutes(1); + /** + * Agent 异步工具任务 Redis 运行态 TTL。 + */ + private Duration asyncToolTaskTtl = Duration.ofHours(24); + /** * 获取 Redis 热态 session 缓存 TTL。 * @@ -258,6 +263,24 @@ public class AgentRuntimeProperties { this.lockRenewInterval = lockRenewInterval == null ? Duration.ofMinutes(1) : lockRenewInterval; } + /** + * 获取 Agent 异步工具任务 Redis 运行态 TTL。 + * + * @return 任务 TTL + */ + public Duration getAsyncToolTaskTtl() { + return asyncToolTaskTtl; + } + + /** + * 设置 Agent 异步工具任务 Redis 运行态 TTL。 + * + * @param asyncToolTaskTtl 任务 TTL + */ + public void setAsyncToolTaskTtl(Duration asyncToolTaskTtl) { + this.asyncToolTaskTtl = asyncToolTaskTtl == null ? Duration.ofHours(24) : asyncToolTaskTtl; + } + private static String defaultInstanceId() { String envInstanceId = System.getenv("EASYFLOW_INSTANCE_ID"); if (StringUtils.hasText(envInstanceId)) { diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentRunService.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentRunService.java index ece253f..ee98012 100644 --- a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentRunService.java +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentRunService.java @@ -70,7 +70,7 @@ public class AgentRunService { @Resource private AgentService agentService; @Resource - private AgentDefinitionCompiler agentDefinitionCompiler; + private AgentRuntimeCompiler agentRuntimeCompiler; @Resource private AgentRuntimeFactory agentRuntimeFactory; @Resource @@ -363,7 +363,7 @@ public class AgentRunService { if (persistChatlog) { bindAgentSession(agent, runtimeSessionId, chatContext); } - AgentRuntimeBundle bundle = agentDefinitionCompiler.compile(agent); + AgentRuntimeBundle bundle = agentRuntimeCompiler.compile(agent); AgentRuntime runtime = agentRuntimeFactory.create(); // 会话初始化请求 AgentInitRequest request = new AgentInitRequest(); @@ -554,16 +554,23 @@ public class AgentRunService { } return; } + if (isAsyncToolEvent(event.getEventType())) { + if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, asyncToolChatType(event), buildAsyncToolEventPayload(event))) { + cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog); + } + return; + } if (event.getEventType() == AgentRuntimeEventType.TOOL_CALL) { LOG.info("Agent runtime tool call, requestId={}, toolCallId={}, payload={}, metadata={}", requestId, event.getToolCallId(), event.getPayload(), event.getMetadata()); + Map toolPayload = buildToolEventPayload(event); assistantAccumulator.appendToolCall( - firstText(event.getToolCallId(), stringPayload(event, "toolCallId")), - firstText(stringPayload(event, "toolName"), stringPayload(event, "name")), - stringPayload(event, "toolDisplayName"), - firstNonNull(event.getPayload().get("input"), event.getPayload().get("toolInput")) + firstText(stringValue(toolPayload, "toolCallId"), event.getToolCallId()), + firstText(stringValue(toolPayload, "toolName"), stringValue(toolPayload, "name")), + stringValue(toolPayload, "toolDisplayName"), + firstNonNull(toolPayload.get("input"), toolPayload.get("toolInput")) ); - if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_CALL, buildToolEventPayload(event))) { + if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_CALL, toolPayload)) { cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog); } return; @@ -571,14 +578,15 @@ public class AgentRunService { if (event.getEventType() == AgentRuntimeEventType.TOOL_RESULT) { LOG.info("Agent runtime tool result, requestId={}, toolCallId={}, payload={}, metadata={}", requestId, event.getToolCallId(), event.getPayload(), event.getMetadata()); + Map toolPayload = buildToolEventPayload(event); assistantAccumulator.appendToolResult( - firstText(event.getToolCallId(), stringPayload(event, "toolCallId")), - firstText(stringPayload(event, "toolName"), stringPayload(event, "name")), - stringPayload(event, "toolDisplayName"), - firstNonNull(firstNonNull(event.getPayload().get("output"), event.getPayload().get("result")), - event.getPayload().get("text")) + firstText(stringValue(toolPayload, "toolCallId"), event.getToolCallId()), + firstText(stringValue(toolPayload, "toolName"), stringValue(toolPayload, "name")), + stringValue(toolPayload, "toolDisplayName"), + firstNonNull(firstNonNull(toolPayload.get("output"), toolPayload.get("result")), + toolPayload.get("text")) ); - if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_RESULT, buildToolEventPayload(event))) { + if (!sendEnvelope(chatSseEmitter, ChatDomain.TOOL, ChatType.TOOL_RESULT, toolPayload)) { cancelDisconnectedRun(requestId, chatContext, answer, assistantAccumulator, finished, persistChatlog); } return; @@ -1181,9 +1189,81 @@ public class AgentRunService { if (toolCallId != null && !toolCallId.isBlank()) { payload.put("toolCallId", toolCallId); } + if (Boolean.TRUE.equals(event.getMetadata().get("asyncTool"))) { + enrichAsyncToolPayload(payload, event.getMetadata(), toolCallId); + String taskId = stringValue(payload, "taskId"); + if (taskId != null && !taskId.isBlank()) { + payload.put("toolCallId", taskId); + } + } return payload; } + private boolean isAsyncToolEvent(AgentRuntimeEventType type) { + return type == AgentRuntimeEventType.ASYNC_TOOL_SUBMITTED + || type == AgentRuntimeEventType.ASYNC_TOOL_OBSERVED + || type == AgentRuntimeEventType.ASYNC_TOOL_RESULT + || type == AgentRuntimeEventType.ASYNC_TOOL_CANCELLED + || type == AgentRuntimeEventType.ASYNC_TOOL_LISTED + || type == AgentRuntimeEventType.ASYNC_TOOL_FAILED; + } + + private ChatType asyncToolChatType(AgentRuntimeEvent event) { + String status = stringPayload(event, "status"); + if ("SUCCEEDED".equalsIgnoreCase(status) + || "FAILED".equalsIgnoreCase(status) + || "CANCELLED".equalsIgnoreCase(status) + || "TIMEOUT".equalsIgnoreCase(status) + || event.getEventType() == AgentRuntimeEventType.ASYNC_TOOL_RESULT + || event.getEventType() == AgentRuntimeEventType.ASYNC_TOOL_FAILED + || event.getEventType() == AgentRuntimeEventType.ASYNC_TOOL_CANCELLED) { + return ChatType.TOOL_RESULT; + } + return ChatType.TOOL_CALL; + } + + private Map buildAsyncToolEventPayload(AgentRuntimeEvent event) { + Map payload = new LinkedHashMap<>(event.getPayload() == null ? Map.of() : event.getPayload()); + String taskId = stringValue(payload, "taskId"); + String toolCallId = firstText(taskId, event.getToolCallId()); + if (toolCallId != null && !toolCallId.isBlank()) { + payload.put("toolCallId", toolCallId); + } + enrichAsyncToolPayload(payload, event.getMetadata(), toolCallId); + return payload; + } + + private void enrichAsyncToolPayload(Map payload, Map metadata, String fallbackId) { + Map safeMetadata = metadata == null ? Map.of() : metadata; + payload.put("asyncTool", true); + putIfPresent(payload, "asyncToolName", firstText(stringValue(payload, "asyncToolName"), stringValue(safeMetadata, "asyncToolName"))); + putIfPresent(payload, "phase", firstText(stringValue(payload, "phase"), stringValue(safeMetadata, "asyncToolPhase"))); + putIfPresent(payload, "taskId", firstText(stringValue(payload, "taskId"), stringValue(safeMetadata, "taskId"))); + putIfPresent(payload, "status", firstText(stringValue(payload, "status"), stringValue(safeMetadata, "status"))); + String displayName = firstText(stringValue(payload, "toolDisplayName"), + firstText(stringValue(safeMetadata, "toolDisplayName"), stringValue(payload, "asyncToolName"))); + putIfPresent(payload, "toolDisplayName", displayName); + putIfPresent(payload, "toolName", displayName); + putIfPresent(payload, "name", displayName); + String statusKey = "async-tool:" + firstText(stringValue(payload, "taskId"), fallbackId); + payload.put("statusKey", statusKey); + payload.put("label", asyncToolLabel(stringValue(payload, "status"), stringValue(payload, "phase"), displayName)); + } + + private String asyncToolLabel(String status, String phase, String displayName) { + String name = displayName == null || displayName.isBlank() ? "异步工具" : displayName; + if ("SUCCEEDED".equalsIgnoreCase(status)) { + return name + "已完成"; + } + if ("FAILED".equalsIgnoreCase(status) || "TIMEOUT".equalsIgnoreCase(status)) { + return name + "执行失败"; + } + if ("PENDING".equalsIgnoreCase(status) || "submit".equalsIgnoreCase(phase)) { + return name + "已提交"; + } + return name + "执行中"; + } + /** * 构建知识库检索状态载荷,确保前端可按稳定 key 合并同一轮状态行。 * diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentDefinitionCompiler.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentRuntimeCompiler.java similarity index 96% rename from easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentDefinitionCompiler.java rename to easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentRuntimeCompiler.java index 4b0ce4a..453bd4b 100644 --- a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentDefinitionCompiler.java +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/AgentRuntimeCompiler.java @@ -29,6 +29,8 @@ import tech.easyflow.agent.entity.Agent; import tech.easyflow.agent.entity.AgentKnowledgeBinding; import tech.easyflow.agent.entity.AgentToolBinding; import tech.easyflow.agent.enums.AgentToolType; +import tech.easyflow.agent.runtime.tool.AgentToolRuntimeCompilation; +import tech.easyflow.agent.runtime.tool.AgentToolRuntimeCompiler; import tech.easyflow.ai.easyagents.tool.ChatToolNameHelper; import tech.easyflow.ai.easyagents.tool.WorkflowTool; import tech.easyflow.ai.easyagentsflow.support.PublishedWorkflowDefinitionIds; @@ -46,12 +48,12 @@ import java.util.regex.Pattern; import java.util.*; /** - * 将 Agent 发布快照编译为 easy-agents-agent-runtime 可执行定义。 + * 将 Agent 发布快照编译为可执行定义。 */ @Component -public class AgentDefinitionCompiler { +public class AgentRuntimeCompiler { - private static final Logger LOG = LoggerFactory.getLogger(AgentDefinitionCompiler.class); + private static final Logger LOG = LoggerFactory.getLogger(AgentRuntimeCompiler.class); private static final int LOG_TEXT_MAX_LENGTH = 500; private static final Pattern MCP_INPUT_PATTERN = Pattern.compile("\\$\\{input:([A-Za-z0-9_.-]+)}"); @@ -67,6 +69,8 @@ public class AgentDefinitionCompiler { private DocumentCollectionService documentCollectionService; @Resource private ObjectMapper objectMapper; + @Resource + private AgentToolRuntimeCompiler agentToolRuntimeCompiler; /** * 编译 Agent 运行时定义和调用器。 @@ -209,35 +213,10 @@ public class AgentDefinitionCompiler { } private void compileTools(Agent agent, AgentDefinition definition, AgentRuntimeBundle bundle) { - if (agent.getToolBindings() == null) { - return; - } - List specs = new ArrayList<>(); - Map invokers = new LinkedHashMap<>(); - List mcpSpecs = new ArrayList<>(); - Map mcpSpecMap = new LinkedHashMap<>(); - for (AgentToolBinding binding : agent.getToolBindings()) { - if (!Boolean.TRUE.equals(binding.getEnabled())) { - continue; - } - AgentToolType type = AgentToolType.from(binding.getToolType()); - if (type == AgentToolType.MCP) { - McpSpec mcpSpec = mcpSpecMap.computeIfAbsent(binding.getTargetId(), - ignored -> buildMcpSpec(binding)); - applyMcpToolBinding(mcpSpec, binding); - if (!mcpSpecs.contains(mcpSpec)) { - mcpSpecs.add(mcpSpec); - } - continue; - } - Tool tool = buildTool(binding); - AgentToolSpec spec = toToolSpec(tool, binding); - specs.add(spec); - invokers.put(spec.getName(), (arguments, context) -> invokeTool(tool, arguments)); - } - definition.setToolSpecs(specs); - definition.setMcpSpecs(mcpSpecs); - bundle.setToolInvokers(invokers); + AgentToolRuntimeCompilation compilation = agentToolRuntimeCompiler.compile(agent); + definition.setToolSpecs(compilation.getToolSpecs()); + definition.setMcpSpecs(compilation.getMcpSpecs()); + bundle.setToolInvokers(compilation.getToolInvokers()); } private Tool buildTool(AgentToolBinding binding) { diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AbstractAgentAsyncSubTools.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AbstractAgentAsyncSubTools.java new file mode 100644 index 0000000..ef571d1 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AbstractAgentAsyncSubTools.java @@ -0,0 +1,310 @@ +package tech.easyflow.agent.runtime.asynctool; + +import com.easyagents.agent.runtime.tool.AgentToolContext; +import com.easyagents.agent.runtime.tool.asynctool.*; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.StringUtils; +import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult; +import tech.easyflow.common.web.exceptions.BusinessException; + +import java.time.Instant; +import java.util.*; + +/** + * EasyFlow Agent 异步业务工具基类。 + */ +public abstract class AbstractAgentAsyncSubTools implements AsyncSubTools { + + private static final String ERROR_TYPE_NOT_FOUND = "TASK_NOT_FOUND"; + private static final String ERROR_TYPE_EXCEPTION = "EXCEPTION"; + + private final AgentAsyncToolTaskStore taskStore; + private final ThreadPoolTaskExecutor taskExecutor; + + /** + * 创建异步业务工具基类。 + * + * @param taskStore 任务存储 + * @param taskExecutor 后台执行器 + */ + protected AbstractAgentAsyncSubTools(AgentAsyncToolTaskStore taskStore, + ThreadPoolTaskExecutor taskExecutor) { + this.taskStore = taskStore; + this.taskExecutor = taskExecutor; + } + + /** + * 获取工具类型。 + * + * @return 工具类型 + */ + protected abstract String toolType(); + + /** + * 获取运行时工具名。 + * + * @return 运行时工具名 + */ + protected abstract String toolName(); + + /** + * 获取用户可见工具名称。 + * + * @return 用户可见工具名称 + */ + protected abstract String displayName(); + + /** + * 获取业务资源 ID。 + * + * @return 业务资源 ID + */ + protected abstract String businessId(); + + /** + * 执行业务工具。 + * + * @param arguments 调用参数 + * @return 执行结果 + */ + protected abstract AgentToolExecutionResult executeBusiness(Map arguments); + + /** + * {@inheritDoc} + */ + @Override + public AsyncToolSubmitResult submit(Map arguments, AgentToolContext context) { + String sessionId = requireSessionId(context); + String taskId = newTaskId(); + AgentAsyncToolTaskRecord record = new AgentAsyncToolTaskRecord(); + record.setTaskId(taskId); + record.setToolType(toolType()); + record.setToolName(toolName()); + record.setBusinessId(businessId()); + record.setStatus(AsyncToolTaskStatus.PENDING); + record.setArguments(arguments == null ? Map.of() : new LinkedHashMap<>(arguments)); + record.setSummary(displayName() + "任务已提交"); + record.setRequestId(context == null ? null : context.getRequestId()); + record.setTraceId(context == null ? null : context.getTraceId()); + record.setSessionId(sessionId); + record.setAgentId(context == null ? null : context.getAgentId()); + record.setToolCallId(context == null ? null : context.getToolCallId()); + record.getMetadata().put("toolDisplayName", displayName()); + appendEvent(record, "SUBMITTED", displayName() + "任务已提交"); + taskStore.create(record); + dispatch(sessionId, record.getTaskId(), record.getArguments()); + + AsyncToolSubmitResult result = new AsyncToolSubmitResult(); + result.setTaskId(taskId); + result.setStatus(AsyncToolTaskStatus.PENDING); + result.setCursor(0L); + result.setSummary(record.getSummary()); + result.setNextAction(toolName() + "_observe 查看任务进度。"); + result.getMetadata().put("toolDisplayName", displayName()); + return result; + } + + /** + * {@inheritDoc} + */ + @Override + public AsyncToolTaskView observe(AsyncToolObserveRequest request, AgentToolContext context) { + return taskView(request == null ? null : request.getTaskId(), + request == null ? null : request.getCursor(), + request == null ? null : request.getLimit(), + context); + } + + /** + * {@inheritDoc} + */ + @Override + public AsyncToolTaskView result(AsyncToolResultRequest request, AgentToolContext context) { + return taskView(request == null ? null : request.getTaskId(), + request == null ? null : request.getCursor(), + request == null ? null : request.getLimit(), + context); + } + + /** + * {@inheritDoc} + */ + @Override + public AsyncToolCancelResult cancel(AsyncToolCancelRequest request, AgentToolContext context) { + AsyncToolCancelResult result = new AsyncToolCancelResult(); + result.setTaskId(request == null ? null : request.getTaskId()); + result.setStatus(AsyncToolTaskStatus.FAILED); + result.setErrorMessage("当前异步工具不支持取消正在执行的任务"); + result.setMessage("不支持取消"); + result.getMetadata().put("toolDisplayName", displayName()); + return result; + } + + /** + * {@inheritDoc} + */ + @Override + public AsyncToolTaskListResult list(AsyncToolListRequest request, AgentToolContext context) { + String sessionId = requireSessionId(context); + AsyncToolTaskStatus status = request == null ? null : request.getStatus(); + List tasks = new ArrayList<>(); + for (AgentAsyncToolTaskRecord record : taskStore.list(sessionId, status)) { + tasks.add(summary(record)); + } + AsyncToolTaskListResult result = new AsyncToolTaskListResult(); + result.setTasks(tasks); + result.getMetadata().put("toolDisplayName", displayName()); + return result; + } + + private void dispatch(String sessionId, String taskId, Map arguments) { + try { + taskExecutor.execute(() -> executeTask(sessionId, taskId, arguments)); + } catch (Exception e) { + taskStore.update(sessionId, taskId, record -> fail(record, e)); + throw new BusinessException("提交异步工具任务失败:" + safeMessage(e)); + } + } + + private void executeTask(String sessionId, String taskId, Map arguments) { + try { + taskStore.update(sessionId, taskId, record -> { + record.setStatus(AsyncToolTaskStatus.RUNNING); + record.setSummary(displayName() + "任务执行中"); + appendEvent(record, "RUNNING", displayName() + "任务执行中"); + return record; + }); + AgentToolExecutionResult executionResult = executeBusiness(arguments); + taskStore.update(sessionId, taskId, record -> { + record.setStatus(AsyncToolTaskStatus.SUCCEEDED); + record.setSummary(displayName() + "任务已完成"); + record.setResult(executionResult == null ? null : executionResult.getResult()); + record.setBusinessExecutionId(executionResult == null ? null : executionResult.getBusinessExecutionId()); + appendEvent(record, "SUCCEEDED", displayName() + "任务已完成"); + return record; + }); + } catch (Exception e) { + taskStore.update(sessionId, taskId, record -> fail(record, e)); + } + } + + private AgentAsyncToolTaskRecord fail(AgentAsyncToolTaskRecord record, Exception error) { + record.setStatus(AsyncToolTaskStatus.FAILED); + record.setSummary(displayName() + "任务执行失败"); + record.setErrorType(ERROR_TYPE_EXCEPTION); + record.setErrorMessage(safeMessage(error)); + appendEvent(record, "FAILED", record.getErrorMessage()); + return record; + } + + private AsyncToolTaskView taskView(String taskId, Long cursor, Integer limit, AgentToolContext context) { + String sessionId = requireSessionId(context); + if (!StringUtils.hasText(taskId)) { + return notFoundView(taskId, cursor, "任务 ID 不能为空"); + } + return taskStore.get(sessionId, taskId) + .map(record -> toView(record, cursor, limit)) + .orElseGet(() -> notFoundView(taskId, cursor, "异步工具任务不存在或已过期")); + } + + private AsyncToolTaskView toView(AgentAsyncToolTaskRecord record, Long cursor, Integer limit) { + long safeCursor = cursor == null ? 0L : Math.max(0L, cursor); + int safeLimit = limit == null || limit <= 0 ? 20 : Math.min(limit, 100); + List events = new ArrayList<>(); + for (AsyncToolTaskEvent event : record.getEvents()) { + if (event.getSequence() != null && event.getSequence() > safeCursor) { + events.add(event); + } + if (events.size() >= safeLimit) { + break; + } + } + Long nextCursor = events.isEmpty() + ? safeCursor + : events.get(events.size() - 1).getSequence(); + AsyncToolTaskView view = new AsyncToolTaskView(); + view.setTaskId(record.getTaskId()); + view.setStatus(record.getStatus()); + view.setCursor(safeCursor); + view.setNextCursor(nextCursor); + view.setSummary(record.getSummary()); + view.setNextAction(nextAction(record.getStatus())); + view.setEvents(events); + view.setResult(record.getResult()); + view.setErrorMessage(record.getErrorMessage()); + view.setErrorType(record.getErrorType()); + view.setTerminal(record.getStatus() != null && record.getStatus().isTerminal()); + view.setResultAvailable(record.getStatus() == AsyncToolTaskStatus.SUCCEEDED && record.getResult() != null); + view.getMetadata().put("toolDisplayName", displayName()); + putIfNotNull(view.getPayload(), "businessId", record.getBusinessId()); + putIfNotNull(view.getPayload(), "businessExecutionId", record.getBusinessExecutionId()); + return view; + } + + private AsyncToolTaskView notFoundView(String taskId, Long cursor, String message) { + AsyncToolTaskView view = new AsyncToolTaskView(); + view.setTaskId(taskId); + view.setStatus(AsyncToolTaskStatus.FAILED); + view.setCursor(cursor == null ? 0L : cursor); + view.setNextCursor(cursor == null ? 0L : cursor); + view.setSummary(message); + view.setErrorType(ERROR_TYPE_NOT_FOUND); + view.setErrorMessage(message); + view.setTerminal(true); + view.setResultAvailable(false); + view.getMetadata().put("toolDisplayName", displayName()); + return view; + } + + private AsyncToolTaskSummary summary(AgentAsyncToolTaskRecord record) { + AsyncToolTaskSummary summary = new AsyncToolTaskSummary(); + summary.setTaskId(record.getTaskId()); + summary.setStatus(record.getStatus()); + summary.setSummary(record.getSummary()); + summary.setCreatedAt(record.getCreatedAt()); + summary.setUpdatedAt(record.getUpdatedAt()); + summary.getPayload().put("toolName", record.getToolName()); + summary.getPayload().put("toolDisplayName", displayName()); + return summary; + } + + private void appendEvent(AgentAsyncToolTaskRecord record, String type, String text) { + AsyncToolTaskEvent event = new AsyncToolTaskEvent(); + event.setSequence((long) record.getEvents().size() + 1L); + event.setType(type); + event.setText(text); + event.setCreatedAt(Instant.now()); + record.getEvents().add(event); + } + + private String nextAction(AsyncToolTaskStatus status) { + if (status != null && status.isTerminal()) { + return "任务已结束。"; + } + return toolName() + "_observe 继续查看任务进度。"; + } + + private String requireSessionId(AgentToolContext context) { + if (context == null || !StringUtils.hasText(context.getSessionId())) { + throw new BusinessException("异步工具任务缺少 Agent session 上下文"); + } + return context.getSessionId(); + } + + private String newTaskId() { + String idPart = UUID.randomUUID().toString().replace("-", ""); + return "async_" + idPart; + } + + private void putIfNotNull(Map target, String key, Object value) { + if (value != null) { + target.put(key, value); + } + } + + private String safeMessage(Exception e) { + return e == null || e.getMessage() == null || e.getMessage().isBlank() + ? "异步工具任务执行失败" + : e.getMessage(); + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AgentAsyncToolTaskRecord.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AgentAsyncToolTaskRecord.java new file mode 100644 index 0000000..b213eed --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AgentAsyncToolTaskRecord.java @@ -0,0 +1,362 @@ +package tech.easyflow.agent.runtime.asynctool; + +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskEvent; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Agent 异步工具任务 Redis 运行态记录。 + */ +public class AgentAsyncToolTaskRecord { + + private String taskId; + private String toolType; + private String toolName; + private String businessId; + private String businessExecutionId; + private String sessionScopedKey; + private Long ttlSeconds; + private AsyncToolTaskStatus status = AsyncToolTaskStatus.PENDING; + private Map arguments = new LinkedHashMap<>(); + private String summary; + private Object result; + private String errorMessage; + private String errorType; + private List events = new ArrayList<>(); + private String requestId; + private String traceId; + private String sessionId; + private String agentId; + private String toolCallId; + private Instant createdAt = Instant.now(); + private Instant updatedAt = Instant.now(); + private Map payload = new LinkedHashMap<>(); + private Map metadata = new LinkedHashMap<>(); + + /** + * 获取任务 ID。 + * + * @return 任务 ID + */ + public String getTaskId() { return taskId; } + + /** + * 设置任务 ID。 + * + * @param taskId 任务 ID + */ + public void setTaskId(String taskId) { this.taskId = taskId; } + + /** + * 获取工具类型。 + * + * @return 工具类型 + */ + public String getToolType() { return toolType; } + + /** + * 设置工具类型。 + * + * @param toolType 工具类型 + */ + public void setToolType(String toolType) { this.toolType = toolType; } + + /** + * 获取工具名称。 + * + * @return 工具名称 + */ + public String getToolName() { return toolName; } + + /** + * 设置工具名称。 + * + * @param toolName 工具名称 + */ + public void setToolName(String toolName) { this.toolName = toolName; } + + /** + * 获取业务资源 ID。 + * + * @return 业务资源 ID + */ + public String getBusinessId() { return businessId; } + + /** + * 设置业务资源 ID。 + * + * @param businessId 业务资源 ID + */ + public void setBusinessId(String businessId) { this.businessId = businessId; } + + /** + * 获取业务执行记录 ID。 + * + * @return 业务执行记录 ID + */ + public String getBusinessExecutionId() { return businessExecutionId; } + + /** + * 设置业务执行记录 ID。 + * + * @param businessExecutionId 业务执行记录 ID + */ + public void setBusinessExecutionId(String businessExecutionId) { this.businessExecutionId = businessExecutionId; } + + /** + * 获取会话内任务存储 key。 + * + * @return 会话内任务存储 key + */ + public String getSessionScopedKey() { return sessionScopedKey; } + + /** + * 设置会话内任务存储 key。 + * + * @param sessionScopedKey 会话内任务存储 key + */ + public void setSessionScopedKey(String sessionScopedKey) { this.sessionScopedKey = sessionScopedKey; } + + /** + * 获取任务 TTL 秒数。 + * + * @return TTL 秒数 + */ + public Long getTtlSeconds() { return ttlSeconds; } + + /** + * 设置任务 TTL 秒数。 + * + * @param ttlSeconds TTL 秒数 + */ + public void setTtlSeconds(Long ttlSeconds) { this.ttlSeconds = ttlSeconds; } + + /** + * 获取任务状态。 + * + * @return 任务状态 + */ + public AsyncToolTaskStatus getStatus() { return status; } + + /** + * 设置任务状态。 + * + * @param status 任务状态 + */ + public void setStatus(AsyncToolTaskStatus status) { this.status = status == null ? AsyncToolTaskStatus.PENDING : status; } + + /** + * 获取任务参数。 + * + * @return 任务参数 + */ + public Map getArguments() { return arguments; } + + /** + * 设置任务参数。 + * + * @param arguments 任务参数 + */ + public void setArguments(Map arguments) { this.arguments = arguments == null ? new LinkedHashMap<>() : arguments; } + + /** + * 获取任务摘要。 + * + * @return 任务摘要 + */ + public String getSummary() { return summary; } + + /** + * 设置任务摘要。 + * + * @param summary 任务摘要 + */ + public void setSummary(String summary) { this.summary = summary; } + + /** + * 获取任务结果。 + * + * @return 任务结果 + */ + public Object getResult() { return result; } + + /** + * 设置任务结果。 + * + * @param result 任务结果 + */ + public void setResult(Object result) { this.result = result; } + + /** + * 获取错误消息。 + * + * @return 错误消息 + */ + public String getErrorMessage() { return errorMessage; } + + /** + * 设置错误消息。 + * + * @param errorMessage 错误消息 + */ + public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; } + + /** + * 获取错误类型。 + * + * @return 错误类型 + */ + public String getErrorType() { return errorType; } + + /** + * 设置错误类型。 + * + * @param errorType 错误类型 + */ + public void setErrorType(String errorType) { this.errorType = errorType; } + + /** + * 获取任务事件列表。 + * + * @return 任务事件列表 + */ + public List getEvents() { return events; } + + /** + * 设置任务事件列表。 + * + * @param events 任务事件列表 + */ + public void setEvents(List events) { this.events = events == null ? new ArrayList<>() : events; } + + /** + * 获取请求 ID。 + * + * @return 请求 ID + */ + public String getRequestId() { return requestId; } + + /** + * 设置请求 ID。 + * + * @param requestId 请求 ID + */ + public void setRequestId(String requestId) { this.requestId = requestId; } + + /** + * 获取链路 ID。 + * + * @return 链路 ID + */ + public String getTraceId() { return traceId; } + + /** + * 设置链路 ID。 + * + * @param traceId 链路 ID + */ + public void setTraceId(String traceId) { this.traceId = traceId; } + + /** + * 获取 Agent Runtime session ID。 + * + * @return session ID + */ + public String getSessionId() { return sessionId; } + + /** + * 设置 Agent Runtime session ID。 + * + * @param sessionId session ID + */ + public void setSessionId(String sessionId) { this.sessionId = sessionId; } + + /** + * 获取 Agent ID。 + * + * @return Agent ID + */ + public String getAgentId() { return agentId; } + + /** + * 设置 Agent ID。 + * + * @param agentId Agent ID + */ + public void setAgentId(String agentId) { this.agentId = agentId; } + + /** + * 获取工具调用 ID。 + * + * @return 工具调用 ID + */ + public String getToolCallId() { return toolCallId; } + + /** + * 设置工具调用 ID。 + * + * @param toolCallId 工具调用 ID + */ + public void setToolCallId(String toolCallId) { this.toolCallId = toolCallId; } + + /** + * 获取创建时间。 + * + * @return 创建时间 + */ + public Instant getCreatedAt() { return createdAt; } + + /** + * 设置创建时间。 + * + * @param createdAt 创建时间 + */ + public void setCreatedAt(Instant createdAt) { this.createdAt = createdAt == null ? Instant.now() : createdAt; } + + /** + * 获取更新时间。 + * + * @return 更新时间 + */ + public Instant getUpdatedAt() { return updatedAt; } + + /** + * 设置更新时间。 + * + * @param updatedAt 更新时间 + */ + public void setUpdatedAt(Instant updatedAt) { this.updatedAt = updatedAt == null ? Instant.now() : updatedAt; } + + /** + * 获取业务载荷。 + * + * @return 业务载荷 + */ + public Map getPayload() { return payload; } + + /** + * 设置业务载荷。 + * + * @param payload 业务载荷 + */ + public void setPayload(Map payload) { this.payload = payload == null ? new LinkedHashMap<>() : payload; } + + /** + * 获取元数据。 + * + * @return 元数据 + */ + public Map getMetadata() { return metadata; } + + /** + * 设置元数据。 + * + * @param metadata 元数据 + */ + public void setMetadata(Map metadata) { this.metadata = metadata == null ? new LinkedHashMap<>() : metadata; } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AgentAsyncToolTaskStore.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AgentAsyncToolTaskStore.java new file mode 100644 index 0000000..987351b --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/AgentAsyncToolTaskStore.java @@ -0,0 +1,48 @@ +package tech.easyflow.agent.runtime.asynctool; + +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus; + +import java.util.List; +import java.util.Optional; +import java.util.function.UnaryOperator; + +/** + * Agent 异步工具任务运行态存储。 + */ +public interface AgentAsyncToolTaskStore { + + /** + * 创建任务记录。 + * + * @param record 任务记录 + */ + void create(AgentAsyncToolTaskRecord record); + + /** + * 获取当前 session 下的任务记录。 + * + * @param sessionId Agent Runtime session ID + * @param taskId 任务 ID + * @return 任务记录 + */ + Optional get(String sessionId, String taskId); + + /** + * 更新当前 session 下的任务记录。 + * + * @param sessionId Agent Runtime session ID + * @param taskId 任务 ID + * @param updater 更新函数 + * @return 更新后的任务记录 + */ + Optional update(String sessionId, String taskId, UnaryOperator updater); + + /** + * 查询当前 session 下可见任务。 + * + * @param sessionId Agent Runtime session ID + * @param status 状态过滤;为空时返回全部未过期任务 + * @return 任务列表 + */ + List list(String sessionId, AsyncToolTaskStatus status); +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/PluginAsyncSubTools.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/PluginAsyncSubTools.java new file mode 100644 index 0000000..378dece --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/PluginAsyncSubTools.java @@ -0,0 +1,83 @@ +package tech.easyflow.agent.runtime.asynctool; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import tech.easyflow.agent.enums.AgentToolType; +import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult; +import tech.easyflow.agent.runtime.tool.PluginToolExecutor; +import tech.easyflow.ai.entity.PluginItem; + +import java.util.Map; + +/** + * Plugin 异步工具子能力实现。 + */ +public class PluginAsyncSubTools extends AbstractAgentAsyncSubTools { + + private final PluginItem pluginItem; + private final String toolName; + private final String displayName; + private final PluginToolExecutor pluginToolExecutor; + + /** + * 创建 Plugin 异步工具子能力。 + * + * @param pluginItem 插件工具快照 + * @param toolName runtime 工具名 + * @param displayName 用户可见名称 + * @param pluginToolExecutor Plugin 执行器 + * @param taskStore 任务存储 + * @param taskExecutor 后台执行器 + */ + public PluginAsyncSubTools(PluginItem pluginItem, + String toolName, + String displayName, + PluginToolExecutor pluginToolExecutor, + AgentAsyncToolTaskStore taskStore, + ThreadPoolTaskExecutor taskExecutor) { + super(taskStore, taskExecutor); + this.pluginItem = pluginItem; + this.toolName = toolName; + this.displayName = displayName; + this.pluginToolExecutor = pluginToolExecutor; + } + + /** + * {@inheritDoc} + */ + @Override + protected String toolType() { + return AgentToolType.PLUGIN.name(); + } + + /** + * {@inheritDoc} + */ + @Override + protected String toolName() { + return toolName; + } + + /** + * {@inheritDoc} + */ + @Override + protected String displayName() { + return displayName; + } + + /** + * {@inheritDoc} + */ + @Override + protected String businessId() { + return pluginItem == null || pluginItem.getId() == null ? null : String.valueOf(pluginItem.getId()); + } + + /** + * {@inheritDoc} + */ + @Override + protected AgentToolExecutionResult executeBusiness(Map arguments) { + return pluginToolExecutor.execute(pluginItem, arguments); + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/RedisAgentAsyncToolTaskStore.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/RedisAgentAsyncToolTaskStore.java new file mode 100644 index 0000000..4080649 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/RedisAgentAsyncToolTaskStore.java @@ -0,0 +1,172 @@ +package tech.easyflow.agent.runtime.asynctool; + +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; +import tech.easyflow.agent.config.AgentRuntimeProperties; +import tech.easyflow.common.web.exceptions.BusinessException; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.UnaryOperator; + +/** + * 基于 Redis 单 key 的 Agent 异步工具任务存储。 + */ +@Service +public class RedisAgentAsyncToolTaskStore implements AgentAsyncToolTaskStore { + + private static final String KEY_PREFIX = "easyflow:agent:async-tool:"; + + private final StringRedisTemplate stringRedisTemplate; + private final ObjectMapper objectMapper; + private final AgentRuntimeProperties properties; + + /** + * 创建 Redis 任务存储。 + * + * @param stringRedisTemplate Redis 字符串模板 + * @param objectMapper JSON mapper + * @param properties Agent runtime 配置 + */ + public RedisAgentAsyncToolTaskStore(StringRedisTemplate stringRedisTemplate, + ObjectMapper objectMapper, + AgentRuntimeProperties properties) { + this.stringRedisTemplate = stringRedisTemplate; + this.objectMapper = objectMapper; + this.properties = properties; + } + + /** + * {@inheritDoc} + */ + @Override + public void create(AgentAsyncToolTaskRecord record) { + if (record == null) { + throw new BusinessException("异步工具任务不能为空"); + } + String sessionId = requireText(record.getSessionId(), "异步工具任务 sessionId 不能为空"); + String taskId = requireText(record.getTaskId(), "异步工具任务 taskId 不能为空"); + record.setSessionScopedKey(key(sessionId, taskId)); + Duration ttl = taskTtl(); + record.setTtlSeconds(ttl.toSeconds()); + write(record, ttl); + } + + /** + * {@inheritDoc} + */ + @Override + public Optional get(String sessionId, String taskId) { + String value = stringRedisTemplate.opsForValue().get(key(sessionId, taskId)); + if (!StringUtils.hasText(value)) { + return Optional.empty(); + } + return Optional.of(read(value)); + } + + /** + * {@inheritDoc} + */ + @Override + public Optional update(String sessionId, + String taskId, + UnaryOperator updater) { + Optional existing = get(sessionId, taskId); + if (existing.isEmpty()) { + return Optional.empty(); + } + AgentAsyncToolTaskRecord updated = updater == null ? existing.get() : updater.apply(existing.get()); + if (updated == null) { + return Optional.empty(); + } + updated.setUpdatedAt(Instant.now()); + write(updated, remainingTtl(sessionId, taskId)); + return Optional.of(updated); + } + + /** + * {@inheritDoc} + */ + @Override + public List list(String sessionId, AsyncToolTaskStatus status) { + String safeSessionId = requireText(sessionId, "异步工具任务 sessionId 不能为空"); + List result = new ArrayList<>(); + ScanOptions options = ScanOptions.scanOptions().match(KEY_PREFIX + safeSessionId + ":*").count(100).build(); + try (Cursor cursor = stringRedisTemplate.scan(options)) { + while (cursor.hasNext()) { + String key = cursor.next(); + String value = stringRedisTemplate.opsForValue().get(key); + if (!StringUtils.hasText(value)) { + continue; + } + AgentAsyncToolTaskRecord record = read(value); + if (status == null || status == record.getStatus()) { + result.add(record); + } + } + } + result.sort(Comparator.comparing(AgentAsyncToolTaskRecord::getCreatedAt, + Comparator.nullsLast(Comparator.reverseOrder()))); + return result; + } + + private void write(AgentAsyncToolTaskRecord record, Duration ttl) { + try { + stringRedisTemplate.opsForValue().set(record.getSessionScopedKey(), + objectMapper.writeValueAsString(record), Math.max(1L, ttl.toSeconds()), TimeUnit.SECONDS); + } catch (Exception e) { + throw new BusinessException("写入异步工具任务状态失败:" + safeMessage(e)); + } + } + + private AgentAsyncToolTaskRecord read(String value) { + try { + return objectMapper.readValue(value, AgentAsyncToolTaskRecord.class); + } catch (Exception e) { + throw new BusinessException("读取异步工具任务状态失败:" + safeMessage(e)); + } + } + + private Duration remainingTtl(String sessionId, String taskId) { + Long seconds = stringRedisTemplate.getExpire(key(sessionId, taskId), TimeUnit.SECONDS); + if (seconds == null || seconds <= 0L) { + return taskTtl(); + } + return Duration.ofSeconds(seconds); + } + + private Duration taskTtl() { + Duration ttl = properties == null ? Duration.ofHours(24) : properties.getAsyncToolTaskTtl(); + return ttl == null || ttl.isZero() || ttl.isNegative() ? Duration.ofHours(24) : ttl; + } + + private String key(String sessionId, String taskId) { + return KEY_PREFIX + + requireText(sessionId, "异步工具任务 sessionId 不能为空") + + ":" + + requireText(taskId, "异步工具任务 taskId 不能为空"); + } + + private String requireText(String value, String message) { + if (!StringUtils.hasText(value)) { + throw new BusinessException(message); + } + return value.trim(); + } + + private String safeMessage(Exception e) { + return e == null || e.getMessage() == null || e.getMessage().isBlank() + ? "未知错误" + : e.getMessage(); + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/WorkflowAsyncSubTools.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/WorkflowAsyncSubTools.java new file mode 100644 index 0000000..19aa50c --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/asynctool/WorkflowAsyncSubTools.java @@ -0,0 +1,83 @@ +package tech.easyflow.agent.runtime.asynctool; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import tech.easyflow.agent.enums.AgentToolType; +import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult; +import tech.easyflow.agent.runtime.tool.WorkflowToolExecutor; +import tech.easyflow.ai.entity.Workflow; + +import java.util.Map; + +/** + * Workflow 异步工具子能力实现。 + */ +public class WorkflowAsyncSubTools extends AbstractAgentAsyncSubTools { + + private final Workflow workflow; + private final String toolName; + private final String displayName; + private final WorkflowToolExecutor workflowToolExecutor; + + /** + * 创建 Workflow 异步工具子能力。 + * + * @param workflow 工作流快照 + * @param toolName runtime 工具名 + * @param displayName 用户可见名称 + * @param workflowToolExecutor Workflow 执行器 + * @param taskStore 任务存储 + * @param taskExecutor 后台执行器 + */ + public WorkflowAsyncSubTools(Workflow workflow, + String toolName, + String displayName, + WorkflowToolExecutor workflowToolExecutor, + AgentAsyncToolTaskStore taskStore, + ThreadPoolTaskExecutor taskExecutor) { + super(taskStore, taskExecutor); + this.workflow = workflow; + this.toolName = toolName; + this.displayName = displayName; + this.workflowToolExecutor = workflowToolExecutor; + } + + /** + * {@inheritDoc} + */ + @Override + protected String toolType() { + return AgentToolType.WORKFLOW.name(); + } + + /** + * {@inheritDoc} + */ + @Override + protected String toolName() { + return toolName; + } + + /** + * {@inheritDoc} + */ + @Override + protected String displayName() { + return displayName; + } + + /** + * {@inheritDoc} + */ + @Override + protected String businessId() { + return workflow == null || workflow.getId() == null ? null : String.valueOf(workflow.getId()); + } + + /** + * {@inheritDoc} + */ + @Override + protected AgentToolExecutionResult executeBusiness(Map arguments) { + return workflowToolExecutor.execute(workflow, arguments); + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolExecutionMode.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolExecutionMode.java new file mode 100644 index 0000000..816be6b --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolExecutionMode.java @@ -0,0 +1,35 @@ +package tech.easyflow.agent.runtime.tool; + +/** + * Agent 工具执行模式。 + */ +public enum AgentToolExecutionMode { + + /** + * 同步执行。 + */ + SYNC, + + /** + * 异步执行。 + */ + ASYNC; + + /** + * 解析执行模式。 + * + * @param value 原始配置值 + * @return 执行模式;非法或为空时返回 SYNC + */ + public static AgentToolExecutionMode from(String value) { + if (value == null || value.isBlank()) { + return SYNC; + } + for (AgentToolExecutionMode mode : values()) { + if (mode.name().equalsIgnoreCase(value.trim())) { + return mode; + } + } + return SYNC; + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolExecutionResult.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolExecutionResult.java new file mode 100644 index 0000000..c670b52 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolExecutionResult.java @@ -0,0 +1,57 @@ +package tech.easyflow.agent.runtime.tool; + +/** + * Agent 业务工具执行结果。 + */ +public class AgentToolExecutionResult { + + private Object result; + private String businessExecutionId; + + /** + * 创建执行结果。 + * + * @param result 业务结果 + * @param businessExecutionId 业务执行记录 ID + */ + public AgentToolExecutionResult(Object result, String businessExecutionId) { + this.result = result; + this.businessExecutionId = businessExecutionId; + } + + /** + * 获取业务结果。 + * + * @return 业务结果 + */ + public Object getResult() { + return result; + } + + /** + * 设置业务结果。 + * + * @param result 业务结果 + */ + public void setResult(Object result) { + this.result = result; + } + + /** + * 获取业务执行记录 ID。 + * + * @return 业务执行记录 ID + */ + public String getBusinessExecutionId() { + return businessExecutionId; + } + + /** + * 设置业务执行记录 ID。 + * + * @param businessExecutionId 业务执行记录 ID + */ + public void setBusinessExecutionId(String businessExecutionId) { + this.businessExecutionId = businessExecutionId; + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompilation.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompilation.java new file mode 100644 index 0000000..76b4191 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompilation.java @@ -0,0 +1,74 @@ +package tech.easyflow.agent.runtime.tool; + +import com.easyagents.agent.runtime.mcp.McpSpec; +import com.easyagents.agent.runtime.tool.AgentToolInvoker; +import com.easyagents.agent.runtime.tool.AgentToolSpec; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Agent 工具运行时编译结果。 + */ +public class AgentToolRuntimeCompilation { + + private List toolSpecs = new ArrayList<>(); + private List mcpSpecs = new ArrayList<>(); + private Map toolInvokers = new LinkedHashMap<>(); + + /** + * 获取普通工具声明。 + * + * @return 普通工具声明 + */ + public List getToolSpecs() { + return toolSpecs; + } + + /** + * 设置普通工具声明。 + * + * @param toolSpecs 普通工具声明 + */ + public void setToolSpecs(List toolSpecs) { + this.toolSpecs = toolSpecs == null ? new ArrayList<>() : toolSpecs; + } + + /** + * 获取 MCP 声明。 + * + * @return MCP 声明 + */ + public List getMcpSpecs() { + return mcpSpecs; + } + + /** + * 设置 MCP 声明。 + * + * @param mcpSpecs MCP 声明 + */ + public void setMcpSpecs(List mcpSpecs) { + this.mcpSpecs = mcpSpecs == null ? new ArrayList<>() : mcpSpecs; + } + + /** + * 获取工具调用器。 + * + * @return 工具调用器 + */ + public Map getToolInvokers() { + return toolInvokers; + } + + /** + * 设置工具调用器。 + * + * @param toolInvokers 工具调用器 + */ + public void setToolInvokers(Map toolInvokers) { + this.toolInvokers = toolInvokers == null ? new LinkedHashMap<>() : toolInvokers; + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompiler.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompiler.java new file mode 100644 index 0000000..4d392b4 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompiler.java @@ -0,0 +1,627 @@ +package tech.easyflow.agent.runtime.tool; + +import com.easyagents.agent.runtime.hitl.AgentToolApprovalRequest; +import com.easyagents.agent.runtime.mcp.McpSpec; +import com.easyagents.agent.runtime.mcp.McpTransportType; +import com.easyagents.agent.runtime.tool.*; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSpec; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSpecExpander; +import com.easyagents.core.model.chat.tool.Parameter; +import com.easyagents.core.model.chat.tool.Tool; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import tech.easyflow.agent.entity.Agent; +import tech.easyflow.agent.entity.AgentToolBinding; +import tech.easyflow.agent.enums.AgentToolType; +import tech.easyflow.agent.runtime.asynctool.AgentAsyncToolTaskStore; +import tech.easyflow.agent.runtime.asynctool.PluginAsyncSubTools; +import tech.easyflow.agent.runtime.asynctool.WorkflowAsyncSubTools; +import tech.easyflow.ai.easyagents.tool.ChatToolNameHelper; +import tech.easyflow.ai.entity.Mcp; +import tech.easyflow.ai.entity.PluginItem; +import tech.easyflow.ai.entity.Workflow; +import tech.easyflow.ai.service.McpService; +import tech.easyflow.ai.service.PluginItemService; +import tech.easyflow.ai.service.WorkflowService; +import tech.easyflow.common.web.exceptions.BusinessException; + +import javax.annotation.Resource; +import java.math.BigInteger; +import java.time.Duration; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Agent 工具运行时编译器。 + */ +@Component +public class AgentToolRuntimeCompiler { + + private static final Pattern MCP_INPUT_PATTERN = Pattern.compile("\\$\\{input:([A-Za-z0-9_.-]+)}"); + private static final Pattern ASYNC_SAFE_NAME = Pattern.compile("^[a-z][a-z0-9_]*$"); + + @Resource + private WorkflowService workflowService; + @Resource + private PluginItemService pluginItemService; + @Resource + private McpService mcpService; + @Resource + private ObjectMapper objectMapper; + @Resource + private WorkflowToolExecutor workflowToolExecutor; + @Resource + private PluginToolExecutor pluginToolExecutor; + @Resource + private AgentAsyncToolTaskStore asyncToolTaskStore; + @Resource(name = "agentAsyncToolExecutor") + private ThreadPoolTaskExecutor agentAsyncToolExecutor; + + /** + * 编译 Agent 工具配置。 + * + * @param agent Agent 业务定义 + * @return 工具编译结果 + */ + public AgentToolRuntimeCompilation compile(Agent agent) { + AgentToolRuntimeCompilation compilation = new AgentToolRuntimeCompilation(); + if (agent == null || agent.getToolBindings() == null) { + return compilation; + } + List specs = new ArrayList<>(); + Map invokers = new LinkedHashMap<>(); + List mcpSpecs = new ArrayList<>(); + Map mcpSpecMap = new LinkedHashMap<>(); + Set compiledToolNames = new LinkedHashSet<>(); + AsyncToolSpecExpander asyncExpander = new AsyncToolSpecExpander(); + for (AgentToolBinding binding : agent.getToolBindings()) { + if (!Boolean.TRUE.equals(binding.getEnabled())) { + continue; + } + AgentToolType type = AgentToolType.from(binding.getToolType()); + if (type == AgentToolType.MCP) { + McpSpec mcpSpec = mcpSpecMap.computeIfAbsent(binding.getTargetId(), ignored -> buildMcpSpec(binding)); + applyMcpToolBinding(mcpSpec, binding); + if (!mcpSpecs.contains(mcpSpec)) { + mcpSpecs.add(mcpSpec); + } + continue; + } + if (executionMode(binding) == AgentToolExecutionMode.ASYNC) { + AsyncToolSpec asyncSpec = buildAsyncToolSpec(type, binding); + addExpandedTools(specs, invokers, compiledToolNames, + asyncExpander.expandSpecs(asyncSpec), + asyncExpander.expandInvokers(asyncSpec)); + continue; + } + CompiledSyncTool syncTool = buildSyncTool(type, binding); + addCompiledTool(specs, invokers, compiledToolNames, syncTool.spec(), syncTool.invoker()); + } + compilation.setToolSpecs(specs); + compilation.setMcpSpecs(mcpSpecs); + compilation.setToolInvokers(invokers); + return compilation; + } + + private void addExpandedTools(List specs, + Map invokers, + Set compiledToolNames, + List expandedSpecs, + Map expandedInvokers) { + for (AgentToolSpec spec : expandedSpecs) { + addCompiledTool(specs, invokers, compiledToolNames, spec, + expandedInvokers == null ? null : expandedInvokers.get(spec.getName())); + } + } + + private void addCompiledTool(List specs, + Map invokers, + Set compiledToolNames, + AgentToolSpec spec, + AgentToolInvoker invoker) { + String name = spec == null ? null : spec.getName(); + if (name == null || name.isBlank()) { + throw new BusinessException("Agent 工具运行名不能为空"); + } + if (!compiledToolNames.add(name)) { + throw new BusinessException("Agent 工具运行名冲突:" + name + ",请调整工具名称"); + } + specs.add(spec); + if (invoker != null) { + invokers.put(name, invoker); + } + } + + private CompiledSyncTool buildSyncTool(AgentToolType type, AgentToolBinding binding) { + if (type == AgentToolType.WORKFLOW) { + Workflow workflow = requireWorkflow(binding); + Tool tool = workflowToolExecutor.buildTool(workflow); + AgentToolSpec spec = toToolSpec(tool, binding); + AgentToolInvoker invoker = (arguments, context) -> invokeSafely(spec.getName(), + () -> workflowToolExecutor.execute(workflow, arguments).getResult()); + return new CompiledSyncTool(spec, invoker); + } + if (type == AgentToolType.PLUGIN) { + PluginItem pluginItem = requirePlugin(binding); + Tool tool = pluginToolExecutor.buildTool(pluginItem); + AgentToolSpec spec = toToolSpec(tool, binding); + AgentToolInvoker invoker = (arguments, context) -> invokeSafely(spec.getName(), + () -> pluginToolExecutor.execute(pluginItem, arguments).getResult()); + return new CompiledSyncTool(spec, invoker); + } + throw new BusinessException("不支持的 Agent 工具类型:" + type.name()); + } + + private AsyncToolSpec buildAsyncToolSpec(AgentToolType type, AgentToolBinding binding) { + if (type == AgentToolType.WORKFLOW) { + Workflow workflow = requireWorkflow(binding); + Tool tool = workflowToolExecutor.buildTool(workflow); + String asyncName = asyncToolName(tool, binding, "workflow"); + String toolDisplayName = displayName(tool, workflow.getTitle()); + AsyncToolSpec spec = baseAsyncSpec(asyncName, tool, binding, toolDisplayName); + spec.setSubTools(new WorkflowAsyncSubTools(workflow, asyncName, toolDisplayName, + workflowToolExecutor, asyncToolTaskStore, agentAsyncToolExecutor)); + return spec; + } + if (type == AgentToolType.PLUGIN) { + PluginItem pluginItem = requirePlugin(binding); + Tool tool = pluginToolExecutor.buildTool(pluginItem); + String asyncName = asyncToolName(tool, binding, "plugin"); + String toolDisplayName = displayName(tool, pluginItem.getName()); + AsyncToolSpec spec = baseAsyncSpec(asyncName, tool, binding, toolDisplayName); + spec.setSubTools(new PluginAsyncSubTools(pluginItem, asyncName, toolDisplayName, + pluginToolExecutor, asyncToolTaskStore, agentAsyncToolExecutor)); + return spec; + } + throw new BusinessException("不支持的 Agent 异步工具类型:" + type.name()); + } + + private AsyncToolSpec baseAsyncSpec(String asyncName, Tool tool, AgentToolBinding binding, String toolDisplayName) { + AsyncToolSpec spec = new AsyncToolSpec(); + spec.setName(asyncName); + spec.setDescription(safeDescription(tool == null ? null : tool.getDescription())); + spec.setSubmitParametersSchema(toSchema(tool == null ? null : tool.getParameters())); + spec.setApprovalRequired(Boolean.TRUE.equals(binding.getHitlEnabled())); + if (Boolean.TRUE.equals(binding.getHitlEnabled())) { + spec.setApprovalRequest(buildBindingApprovalRequest(binding)); + } + spec.getMetadata().put("bindingId", binding.getId()); + spec.getMetadata().put("targetId", binding.getTargetId()); + spec.getMetadata().put("toolType", binding.getToolType()); + // 异步子工具名服务 runtime 调用,事件和聊天展示必须保留业务名称。 + spec.getMetadata().put("toolDisplayName", toolDisplayName); + return spec; + } + + private AgentToolResult invokeSafely(String toolName, ToolCall call) { + try { + Object result = call.invoke(); + return AgentToolResult.success(result == null ? "" : String.valueOf(result)); + } catch (Exception e) { + return AgentToolResult.failure(e.getMessage() == null ? "工具执行失败" : e.getMessage()); + } + } + + private AgentToolExecutionMode executionMode(AgentToolBinding binding) { + Object value = binding == null || binding.getOptionsJson() == null ? null : binding.getOptionsJson().get("executionMode"); + return AgentToolExecutionMode.from(value == null ? null : String.valueOf(value)); + } + + private Workflow requireWorkflow(AgentToolBinding binding) { + Workflow workflow = snapshotOrPublishedWorkflow(binding); + if (workflow == null) { + throw new BusinessException("绑定工作流不存在"); + } + return workflow; + } + + private PluginItem requirePlugin(AgentToolBinding binding) { + PluginItem pluginItem = snapshotOrCurrentPlugin(binding); + if (pluginItem == null) { + throw new BusinessException("绑定插件不存在"); + } + return pluginItem; + } + + private AgentToolSpec toToolSpec(Tool tool, AgentToolBinding binding) { + AgentToolSpec spec = new AgentToolSpec(); + String name = resolveRuntimeToolName(tool, binding); + spec.setName(name); + spec.setDescription(safeDescription(tool == null ? null : tool.getDescription())); + spec.setCategory(AgentToolCategory.valueOf(AgentToolType.from(binding.getToolType()).name())); + spec.setParametersSchema(toSchema(tool == null ? null : tool.getParameters())); + spec.setApprovalRequired(Boolean.TRUE.equals(binding.getHitlEnabled())); + if (Boolean.TRUE.equals(binding.getHitlEnabled())) { + spec.setApprovalRequest(buildBindingApprovalRequest(binding)); + } + spec.getMetadata().put("bindingId", binding.getId()); + spec.getMetadata().put("targetId", binding.getTargetId()); + spec.getMetadata().put("toolType", binding.getToolType()); + spec.getMetadata().put("toolDisplayName", displayName(tool, binding.getToolName())); + return spec; + } + + private AgentToolApprovalRequest buildBindingApprovalRequest(AgentToolBinding binding) { + AgentToolApprovalRequest request = new AgentToolApprovalRequest(); + String name = binding == null ? "工具" : binding.getToolName(); + request.setApprovalPrompt(stringValue(binding == null ? null : binding.getHitlConfigJson(), "prompt", "是否批准执行工具:" + name)); + Map metadata = sanitizedHitlMetadata(binding == null ? null : binding.getHitlConfigJson()); + if (binding != null) { + metadata.put("toolType", binding.getToolType()); + metadata.put("bindingId", binding.getId()); + metadata.put("targetId", binding.getTargetId()); + } + request.setMetadata(metadata); + return request; + } + + private McpSpec buildMcpSpec(AgentToolBinding binding) { + Mcp mcp = snapshotOrCurrentMcp(binding); + if (mcp == null) { + throw new BusinessException("绑定 MCP 不存在"); + } + Map.Entry> server = firstMcpServer(mcp); + Map serverConfig = server.getValue(); + McpSpec spec = new McpSpec(); + spec.setName(mcpRuntimeName(mcp)); + spec.setDescription(firstNonBlank(mcp.getDescription(), mcp.getTitle())); + spec.setTransportType(parseMcpTransportType(mcp, serverConfig)); + spec.setCommand(resolveMcpInput(stringValue(serverConfig, "command", null))); + spec.setArgs(resolveMcpInputs(stringListValue(serverConfig, "args"))); + spec.setEnv(resolveMcpInputMap(stringMapValue(serverConfig, "env"))); + spec.setUrl(resolveMcpInput(stringValue(serverConfig, "url", null))); + spec.setHeaders(resolveMcpInputMap(stringMapValue(serverConfig, "headers"))); + spec.setQueryParams(resolveMcpInputMap(stringMapValue(serverConfig, "queryParams"))); + Duration timeout = durationValue(serverConfig, "timeout"); + if (timeout != null) { + spec.setTimeout(timeout); + } + Duration initializationTimeout = durationValue(serverConfig, "initializationTimeout"); + if (initializationTimeout != null) { + spec.setInitializationTimeout(initializationTimeout); + } + spec.setGroupName(mcpRuntimeName(mcp)); + spec.setApprovalRequired(Boolean.TRUE.equals(mcp.getApprovalRequired())); + spec.setApprovalRequest(buildMcpApprovalRequest(mcp)); + spec.setToolNamePrefix(mcpRuntimeToolPrefix(mcp.getId())); + spec.getMetadata().put("toolType", AgentToolType.MCP.name()); + spec.getMetadata().put("mcpId", String.valueOf(mcp.getId())); + spec.getMetadata().put("mcpTitle", mcp.getTitle()); + spec.getMetadata().put("serverName", server.getKey()); + return spec; + } + + private void applyMcpToolBinding(McpSpec spec, AgentToolBinding binding) { + if (Boolean.TRUE.equals(binding.getHitlEnabled())) { + spec.setApprovalRequired(true); + spec.setApprovalRequest(buildBindingApprovalRequest(binding)); + } + } + + private AgentToolApprovalRequest buildMcpApprovalRequest(Mcp mcp) { + AgentToolApprovalRequest request = new AgentToolApprovalRequest(); + request.setApprovalPrompt("是否批准执行 MCP 工具:" + firstNonBlank(mcp.getTitle(), mcpRuntimeName(mcp))); + Map metadata = new LinkedHashMap<>(); + metadata.put("toolType", AgentToolType.MCP.name()); + metadata.put("mcpId", String.valueOf(mcp.getId())); + metadata.put("mcpTitle", mcp.getTitle()); + request.setMetadata(metadata); + return request; + } + + private Workflow snapshotOrPublishedWorkflow(AgentToolBinding binding) { + if (binding.getResourceSnapshot() != null && !binding.getResourceSnapshot().isEmpty()) { + Workflow workflow = objectMapper.convertValue(binding.getResourceSnapshot(), Workflow.class); + workflow.setId(firstNonNull(workflow.getId(), binding.getTargetId())); + return workflow; + } + return workflowService.getPublishedById(binding.getTargetId()); + } + + private PluginItem snapshotOrCurrentPlugin(AgentToolBinding binding) { + if (binding.getResourceSnapshot() != null && !binding.getResourceSnapshot().isEmpty()) { + PluginItem pluginItem = objectMapper.convertValue(binding.getResourceSnapshot(), PluginItem.class); + pluginItem.setId(firstNonNull(pluginItem.getId(), binding.getTargetId())); + return pluginItem; + } + return pluginItemService.getById(binding.getTargetId()); + } + + private Mcp snapshotOrCurrentMcp(AgentToolBinding binding) { + if (binding.getResourceSnapshot() != null && !binding.getResourceSnapshot().isEmpty()) { + Mcp mcp = objectMapper.convertValue(binding.getResourceSnapshot(), Mcp.class); + mcp.setId(firstNonNull(mcp.getId(), binding.getTargetId())); + return mcp; + } + return mcpService.getById(binding.getTargetId()); + } + + private Map toSchema(Parameter[] parameters) { + Map schema = new LinkedHashMap<>(); + Map properties = new LinkedHashMap<>(); + List required = new ArrayList<>(); + if (parameters != null) { + for (Parameter parameter : parameters) { + properties.put(parameter.getName(), parameterSchema(parameter)); + if (parameter.isRequired()) { + required.add(parameter.getName()); + } + } + } + schema.put("type", "object"); + schema.put("properties", properties); + schema.put("required", required); + return schema; + } + + private Map parameterSchema(Parameter parameter) { + Map schema = new LinkedHashMap<>(); + schema.put("type", parameter.getType() == null ? "string" : parameter.getType()); + putOptionalString(schema, "description", parameter.getDescription()); + if (parameter.getChildren() != null && !parameter.getChildren().isEmpty()) { + Map children = new LinkedHashMap<>(); + for (Parameter child : parameter.getChildren()) { + if (child != null && child.getName() != null && !child.getName().isBlank()) { + children.put(child.getName(), parameterSchema(child)); + } + } + if ("array".equalsIgnoreCase(parameter.getType())) { + schema.put("items", firstArrayItemSchema(parameter.getChildren())); + } else { + schema.put("properties", children); + } + } + return schema; + } + + private Map firstArrayItemSchema(List children) { + return children.stream().filter(Objects::nonNull).findFirst() + .map(this::parameterSchema) + .orElse(Map.of("type", "string")); + } + + private void putOptionalString(Map target, String key, String value) { + if (value != null && !value.isBlank()) { + target.put(key, value); + } + } + + private String resolveRuntimeToolName(Tool tool, AgentToolBinding binding) { + String bindingName = binding == null ? null : binding.getToolName(); + if (ChatToolNameHelper.isSafeToolName(bindingName)) { + return bindingName; + } + String toolName = tool == null ? null : tool.getName(); + if (ChatToolNameHelper.isSafeToolName(toolName)) { + return toolName; + } + BigInteger targetId = binding == null ? null : binding.getTargetId(); + return ChatToolNameHelper.buildFallbackName("tool", targetId); + } + + private String asyncToolName(Tool tool, AgentToolBinding binding, String fallbackPrefix) { + String base = resolveRuntimeToolName(tool, binding).toLowerCase(Locale.ROOT) + .replaceAll("[^a-z0-9_]", "_") + .replaceAll("_+", "_"); + if (!base.isBlank() && Character.isDigit(base.charAt(0))) { + base = fallbackPrefix + "_" + base; + } + if (ASYNC_SAFE_NAME.matcher(base).matches()) { + return base; + } + return fallbackPrefix + "_" + (binding == null || binding.getTargetId() == null ? "unknown" : binding.getTargetId()); + } + + private String displayName(Tool tool, String fallback) { + String value = tool == null ? null : tool.getName(); + return firstNonBlank(firstNonBlank(fallback, value), "工具调用"); + } + + private String safeDescription(String description) { + return description == null || description.isBlank() ? "EasyFlow Agent 工具" : description; + } + + private Map sanitizedHitlMetadata(Map config) { + Map metadata = new LinkedHashMap<>(); + if (config != null) { + config.forEach((key, value) -> { + if (!isHitlPromptKey(key)) { + metadata.put(key, value); + } + }); + } + return metadata; + } + + private boolean isHitlPromptKey(String key) { + if (key == null) { + return false; + } + String normalized = key.trim(); + return "prompt".equalsIgnoreCase(normalized) + || "question".equalsIgnoreCase(normalized) + || "approvalPrompt".equalsIgnoreCase(normalized); + } + + private Map.Entry> firstMcpServer(Mcp mcp) { + Map config = parseMcpConfig(mcp); + Map servers = mapValue(config, "mcpServers"); + if (servers.isEmpty()) { + throw new BusinessException("MCP 配置 JSON 中没有找到任何 MCP 服务名称"); + } + Map.Entry first = servers.entrySet().iterator().next(); + if (!(first.getValue() instanceof Map rawServer)) { + throw new BusinessException("MCP 服务配置必须是对象:" + first.getKey()); + } + Map serverConfig = new LinkedHashMap<>(); + rawServer.forEach((key, value) -> serverConfig.put(String.valueOf(key), value)); + return Map.entry(first.getKey(), serverConfig); + } + + private Map parseMcpConfig(Mcp mcp) { + String configJson = mcp == null ? null : mcp.getConfigJson(); + if (configJson == null || configJson.isBlank()) { + throw new BusinessException("MCP 配置 JSON 不能为空"); + } + try { + return objectMapper.readValue(configJson, new com.fasterxml.jackson.core.type.TypeReference<>() {}); + } catch (Exception e) { + throw new BusinessException("MCP 配置 JSON 格式错误"); + } + } + + private McpTransportType parseMcpTransportType(Mcp mcp, Map serverConfig) { + String transport = firstNonBlank(mcp == null ? null : mcp.getTransportType(), stringValue(serverConfig, "transport", null)); + return McpTransportType.from(transport); + } + + private String mcpRuntimeName(Mcp mcp) { + BigInteger id = mcp == null ? null : mcp.getId(); + return "mcp_" + safeToolNameSegment(id == null ? "unknown" : String.valueOf(id)); + } + + private String mcpRuntimeToolPrefix(BigInteger mcpId) { + return "mcp_" + safeToolNameSegment(String.valueOf(mcpId)) + "_"; + } + + private String safeToolNameSegment(String value) { + String normalized = String.valueOf(value == null ? "" : value).trim() + .replaceAll("[^A-Za-z0-9_-]", "_") + .replaceAll("_+", "_"); + return normalized.isBlank() ? "tool" : normalized; + } + + private List stringListValue(Map map, String key) { + Object value = map == null ? null : map.get(key); + if (value == null) { + return new ArrayList<>(); + } + if (value instanceof Collection collection) { + List result = new ArrayList<>(); + for (Object item : collection) { + if (item != null) { + result.add(String.valueOf(item)); + } + } + return result; + } + throw new BusinessException("Agent 配置字段必须是数组:" + key); + } + + private Duration durationValue(Map map, String key) { + Object value = map == null ? null : map.get(key); + if (value == null) { + return null; + } + if (value instanceof Number number) { + return Duration.ofSeconds(number.longValue()); + } + String text = String.valueOf(value).trim(); + if (text.isEmpty()) { + return null; + } + try { + return Duration.parse(text); + } catch (Exception ignored) { + try { + return Duration.ofSeconds(Long.parseLong(text)); + } catch (NumberFormatException e) { + throw new BusinessException("Agent 配置字段必须是秒数或 Duration:" + key); + } + } + } + + private List resolveMcpInputs(List values) { + if (values == null || values.isEmpty()) { + return new ArrayList<>(); + } + List result = new ArrayList<>(values.size()); + for (String value : values) { + result.add(resolveMcpInput(value)); + } + return result; + } + + private Map resolveMcpInputMap(Map values) { + if (values == null || values.isEmpty()) { + return new LinkedHashMap<>(); + } + Map result = new LinkedHashMap<>(); + values.forEach((key, value) -> result.put(key, resolveMcpInput(value))); + return result; + } + + private String resolveMcpInput(String value) { + if (value == null || value.isBlank()) { + return value; + } + Matcher matcher = MCP_INPUT_PATTERN.matcher(value); + StringBuffer resolved = new StringBuffer(); + while (matcher.find()) { + String inputKey = matcher.group(1); + String resolvedValue = System.getProperty("mcp.input." + inputKey); + if (resolvedValue == null || resolvedValue.isBlank()) { + throw new BusinessException("MCP 输入变量未解析:" + inputKey); + } + matcher.appendReplacement(resolved, Matcher.quoteReplacement(resolvedValue)); + } + matcher.appendTail(resolved); + return resolved.toString(); + } + + private Map mapValue(Map map, String key) { + Object value = map == null ? null : map.get(key); + if (value == null) { + return new LinkedHashMap<>(); + } + if (value instanceof Map raw) { + Map result = new LinkedHashMap<>(); + raw.forEach((rawKey, rawValue) -> result.put(String.valueOf(rawKey), rawValue)); + return result; + } + throw new BusinessException("Agent 配置字段必须是对象:" + key); + } + + private Map stringMapValue(Map map, String key) { + Map raw = mapValue(map, key); + Map result = new LinkedHashMap<>(); + raw.forEach((rawKey, rawValue) -> { + if (rawValue != null) { + result.put(rawKey, String.valueOf(rawValue)); + } + }); + return result; + } + + private String stringValue(Map map, String key, String defaultValue) { + Object value = map == null ? null : map.get(key); + if (value == null) { + return defaultValue; + } + String text = String.valueOf(value); + return text.isBlank() ? defaultValue : text; + } + + private String firstNonBlank(String first, String second) { + return first == null || first.isBlank() ? second : first; + } + + private BigInteger firstNonNull(BigInteger first, BigInteger second) { + return first == null ? second : first; + } + + private record CompiledSyncTool(AgentToolSpec spec, AgentToolInvoker invoker) { + } + + private interface ToolCall { + + /** + * 调用工具。 + * + * @return 工具结果 + */ + Object invoke(); + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/PluginToolExecutor.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/PluginToolExecutor.java new file mode 100644 index 0000000..34ec06f --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/PluginToolExecutor.java @@ -0,0 +1,36 @@ +package tech.easyflow.agent.runtime.tool; + +import com.easyagents.core.model.chat.tool.Tool; +import org.springframework.stereotype.Service; +import tech.easyflow.ai.entity.PluginItem; + +import java.util.Map; + +/** + * Agent Plugin 工具执行器。 + */ +@Service +public class PluginToolExecutor { + + /** + * 构建 Plugin 工具声明来源。 + * + * @param pluginItem 插件工具 + * @return 工具声明来源 + */ + public Tool buildTool(PluginItem pluginItem) { + return pluginItem.toFunction(); + } + + /** + * 执行 Plugin 工具。 + * + * @param pluginItem 插件工具 + * @param arguments 执行参数 + * @return 执行结果 + */ + public AgentToolExecutionResult execute(PluginItem pluginItem, Map arguments) { + Object result = buildTool(pluginItem).invoke(arguments == null ? Map.of() : arguments); + return new AgentToolExecutionResult(result, null); + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/WorkflowToolExecutor.java b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/WorkflowToolExecutor.java new file mode 100644 index 0000000..40af284 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/main/java/tech/easyflow/agent/runtime/tool/WorkflowToolExecutor.java @@ -0,0 +1,71 @@ +package tech.easyflow.agent.runtime.tool; + +import com.easyagents.flow.core.chain.runtime.ChainExecutor; +import com.easyagents.core.model.chat.tool.Tool; +import org.springframework.stereotype.Service; +import tech.easyflow.ai.easyagents.tool.WorkflowTool; +import tech.easyflow.ai.easyagentsflow.support.PublishedWorkflowDefinitionIds; +import tech.easyflow.ai.entity.Workflow; + +import java.util.Map; + +/** + * Agent Workflow 工具执行器。 + */ +@Service +public class WorkflowToolExecutor { + + private final ChainExecutor chainExecutor; + + /** + * 创建 Workflow 工具执行器。 + * + * @param chainExecutor 工作流执行器 + */ + public WorkflowToolExecutor(ChainExecutor chainExecutor) { + this.chainExecutor = chainExecutor; + } + + /** + * 构建 Workflow 工具声明来源。 + * + * @param workflow 工作流 + * @return 工具声明来源 + */ + public Tool buildTool(Workflow workflow) { + return new WorkflowTool(workflow, true, definitionId(workflow)); + } + + /** + * 执行 Workflow 工具。 + * + * @param workflow 工作流 + * @param arguments 执行参数 + * @return 执行结果 + */ + public AgentToolExecutionResult execute(Workflow workflow, Map arguments) { + Object result = chainExecutor.execute(definitionId(workflow), arguments == null ? Map.of() : arguments); + return new AgentToolExecutionResult(result, resolveBusinessExecutionId(result)); + } + + private String definitionId(Workflow workflow) { + return PublishedWorkflowDefinitionIds.published(String.valueOf(workflow == null ? null : workflow.getId())); + } + + private String resolveBusinessExecutionId(Object result) { + if (result instanceof Map map) { + Object value = firstValue(map, "executionId", "executeId", "chainId", "runId"); + return value == null ? null : String.valueOf(value); + } + return null; + } + + private Object firstValue(Map map, String... keys) { + for (String key : keys) { + if (map.containsKey(key)) { + return map.get(key); + } + } + return null; + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentDefinitionCompilerMcpTest.java b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentDefinitionCompilerMcpTest.java index 6dca542..4c37ee3 100644 --- a/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentDefinitionCompilerMcpTest.java +++ b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentDefinitionCompilerMcpTest.java @@ -8,6 +8,7 @@ import org.junit.Test; import tech.easyflow.agent.entity.Agent; import tech.easyflow.agent.entity.AgentToolBinding; import tech.easyflow.agent.enums.AgentToolType; +import tech.easyflow.agent.runtime.tool.AgentToolRuntimeCompiler; import tech.easyflow.ai.entity.Mcp; import tech.easyflow.ai.entity.Model; import tech.easyflow.ai.entity.ModelProvider; @@ -36,10 +37,13 @@ public class AgentDefinitionCompilerMcpTest { BigInteger mcpId = BigInteger.valueOf(20L); Model model = model(modelId); Mcp mcp = mcp(mcpId); - AgentDefinitionCompiler compiler = new AgentDefinitionCompiler(); + AgentRuntimeCompiler compiler = new AgentRuntimeCompiler(); + AgentToolRuntimeCompiler toolCompiler = new AgentToolRuntimeCompiler(); setField(compiler, "objectMapper", new com.fasterxml.jackson.databind.ObjectMapper()); setField(compiler, "modelService", modelService(model)); - setField(compiler, "mcpService", mcpService(mcp)); + setField(toolCompiler, "objectMapper", new com.fasterxml.jackson.databind.ObjectMapper()); + setField(toolCompiler, "mcpService", mcpService(mcp)); + setField(compiler, "agentToolRuntimeCompiler", toolCompiler); Agent agent = agent(modelId, mcpId); diff --git a/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentRunServiceDraftAndHitlTest.java b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentRunServiceDraftAndHitlTest.java index 3e13130..77d33e0 100644 --- a/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentRunServiceDraftAndHitlTest.java +++ b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/AgentRunServiceDraftAndHitlTest.java @@ -429,11 +429,11 @@ public class AgentRunServiceDraftAndHitlTest { @Test public void startRuntimeShouldUseDraftSessionStoreWithoutBindingMysqlSession() throws Exception { AgentRunService service = new AgentRunService(); - RecordingAgentDefinitionCompiler compiler = new RecordingAgentDefinitionCompiler(); + RecordingAgentRuntimeCompiler compiler = new RecordingAgentRuntimeCompiler(); RecordingAgentRuntime runtime = new RecordingAgentRuntime(); RecordingAgentRuntimeFactory runtimeFactory = new RecordingAgentRuntimeFactory(runtime); AgentSessionStore draftStore = new InMemoryAgentSessionStore(); - setField(service, "agentDefinitionCompiler", compiler); + setField(service, "agentRuntimeCompiler", compiler); setField(service, "agentRuntimeFactory", runtimeFactory); setField(service, "agentRunRegistry", new AgentRunRegistry()); @@ -1001,7 +1001,7 @@ public class AgentRunServiceDraftAndHitlTest { } } - private static class RecordingAgentDefinitionCompiler extends AgentDefinitionCompiler { + private static class RecordingAgentRuntimeCompiler extends AgentRuntimeCompiler { @Override public AgentRuntimeBundle compile(Agent agent) { diff --git a/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/asynctool/AbstractAgentAsyncSubToolsTest.java b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/asynctool/AbstractAgentAsyncSubToolsTest.java new file mode 100644 index 0000000..53bc9d5 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/asynctool/AbstractAgentAsyncSubToolsTest.java @@ -0,0 +1,195 @@ +package tech.easyflow.agent.runtime.asynctool; + +import com.easyagents.agent.runtime.tool.AgentToolContext; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolCancelRequest; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolObserveRequest; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolResultRequest; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSubmitResult; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskView; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.UnaryOperator; + +/** + * EasyFlow 异步业务工具基类测试。 + */ +public class AbstractAgentAsyncSubToolsTest { + + /** + * 验证 submit、observe、result 与 list 的基础任务生命周期。 + * + * @throws Exception 等待后台执行超时时抛出 + */ + @Test + public void asyncSubToolsShouldSubmitObserveResultAndListCurrentSessionTasks() throws Exception { + ThreadPoolTaskExecutor executor = executor(); + try { + InMemoryTaskStore store = new InMemoryTaskStore(); + TestAsyncSubTools subTools = new TestAsyncSubTools(store, executor); + AgentToolContext context = context("session-a"); + + AsyncToolSubmitResult submitted = subTools.submit(Map.of("keyword", "hello"), context); + Assert.assertEquals(AsyncToolTaskStatus.PENDING, submitted.getStatus()); + Assert.assertTrue(submitted.getTaskId().startsWith("async_")); + + AsyncToolTaskView completed = waitTerminal(subTools, submitted.getTaskId(), context); + Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, completed.getStatus()); + Assert.assertEquals(Map.of("echo", "hello"), completed.getResult()); + Assert.assertTrue(completed.getNextCursor() >= 2L); + + AsyncToolResultRequest resultRequest = new AsyncToolResultRequest(); + resultRequest.setTaskId(submitted.getTaskId()); + resultRequest.setCursor(1L); + AsyncToolTaskView result = subTools.result(resultRequest, context); + Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, result.getStatus()); + Assert.assertEquals(Map.of("echo", "hello"), result.getResult()); + Assert.assertFalse(result.getEvents().isEmpty()); + + Assert.assertEquals(1, subTools.list(null, context).getTasks().size()); + Assert.assertTrue(subTools.list(null, context("session-b")).getTasks().isEmpty()); + + AsyncToolTaskView crossedSessionView = observe(subTools, submitted.getTaskId(), context("session-b")); + Assert.assertEquals(AsyncToolTaskStatus.FAILED, crossedSessionView.getStatus()); + Assert.assertEquals("TASK_NOT_FOUND", crossedSessionView.getErrorType()); + } finally { + executor.shutdown(); + } + } + + /** + * 验证首版取消语义返回明确失败结果。 + */ + @Test + public void cancelShouldReturnUnsupportedFailure() { + TestAsyncSubTools subTools = new TestAsyncSubTools(new InMemoryTaskStore(), executor()); + AsyncToolCancelRequest request = new AsyncToolCancelRequest(); + request.setTaskId("task-1"); + + var result = subTools.cancel(request, context("session-a")); + + Assert.assertEquals(AsyncToolTaskStatus.FAILED, result.getStatus()); + Assert.assertEquals("不支持取消", result.getMessage()); + } + + private AsyncToolTaskView waitTerminal(TestAsyncSubTools subTools, String taskId, AgentToolContext context) throws Exception { + long deadline = System.currentTimeMillis() + 3000L; + AsyncToolTaskView view = observe(subTools, taskId, context); + while (!Boolean.TRUE.equals(view.getTerminal()) && System.currentTimeMillis() < deadline) { + Thread.sleep(20L); + view = observe(subTools, taskId, context); + } + Assert.assertTrue("异步任务应在测试超时前完成", Boolean.TRUE.equals(view.getTerminal())); + return view; + } + + private AsyncToolTaskView observe(TestAsyncSubTools subTools, String taskId, AgentToolContext context) { + AsyncToolObserveRequest request = new AsyncToolObserveRequest(); + request.setTaskId(taskId); + request.setCursor(0L); + return subTools.observe(request, context); + } + + private AgentToolContext context(String sessionId) { + AgentToolContext context = new AgentToolContext(); + context.setRequestId("request-1"); + context.setTraceId("trace-1"); + context.setSessionId(sessionId); + context.setAgentId("agent-1"); + context.setToolCallId("tool-call-1"); + return context; + } + + private ThreadPoolTaskExecutor executor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(1); + executor.setMaxPoolSize(1); + executor.setQueueCapacity(4); + executor.setThreadNamePrefix("async-sub-tools-test-"); + executor.initialize(); + return executor; + } + + private static final class TestAsyncSubTools extends AbstractAgentAsyncSubTools { + + private TestAsyncSubTools(AgentAsyncToolTaskStore taskStore, ThreadPoolTaskExecutor taskExecutor) { + super(taskStore, taskExecutor); + } + + @Override + protected String toolType() { + return "PLUGIN"; + } + + @Override + protected String toolName() { + return "test_tool"; + } + + @Override + protected String displayName() { + return "测试工具"; + } + + @Override + protected String businessId() { + return "business-1"; + } + + @Override + protected AgentToolExecutionResult executeBusiness(Map arguments) { + return new AgentToolExecutionResult(Map.of("echo", arguments.get("keyword")), "business-run-1"); + } + } + + private static final class InMemoryTaskStore implements AgentAsyncToolTaskStore { + + private final Map records = new ConcurrentHashMap<>(); + + @Override + public void create(AgentAsyncToolTaskRecord record) { + record.setSessionScopedKey(key(record.getSessionId(), record.getTaskId())); + records.put(record.getSessionScopedKey(), record); + } + + @Override + public Optional get(String sessionId, String taskId) { + return Optional.ofNullable(records.get(key(sessionId, taskId))); + } + + @Override + public Optional update(String sessionId, + String taskId, + UnaryOperator updater) { + String key = key(sessionId, taskId); + AgentAsyncToolTaskRecord updated = records.computeIfPresent(key, + (ignored, existing) -> updater == null ? existing : updater.apply(existing)); + return Optional.ofNullable(updated); + } + + @Override + public List list(String sessionId, AsyncToolTaskStatus status) { + List result = new ArrayList<>(); + for (AgentAsyncToolTaskRecord record : records.values()) { + if (sessionId.equals(record.getSessionId()) && (status == null || status == record.getStatus())) { + result.add(record); + } + } + result.sort(Comparator.comparing(AgentAsyncToolTaskRecord::getCreatedAt).reversed()); + return result; + } + + private String key(String sessionId, String taskId) { + return sessionId + ":" + taskId; + } + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/asynctool/WorkflowPluginAsyncSubToolsTest.java b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/asynctool/WorkflowPluginAsyncSubToolsTest.java new file mode 100644 index 0000000..d5771d8 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/asynctool/WorkflowPluginAsyncSubToolsTest.java @@ -0,0 +1,213 @@ +package tech.easyflow.agent.runtime.asynctool; + +import com.easyagents.agent.runtime.tool.AgentToolContext; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolObserveRequest; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolResultRequest; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolSubmitResult; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskStatus; +import com.easyagents.agent.runtime.tool.asynctool.AsyncToolTaskView; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import tech.easyflow.agent.runtime.tool.AgentToolExecutionResult; +import tech.easyflow.agent.runtime.tool.PluginToolExecutor; +import tech.easyflow.agent.runtime.tool.WorkflowToolExecutor; +import tech.easyflow.ai.entity.PluginItem; +import tech.easyflow.ai.entity.Workflow; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.UnaryOperator; + +/** + * Workflow 与 Plugin 异步子工具测试。 + */ +public class WorkflowPluginAsyncSubToolsTest { + + /** + * 验证 Workflow 异步子工具会把业务执行结果保留到任务视图。 + * + * @throws Exception 等待后台执行超时时抛出 + */ + @Test + public void workflowAsyncSubToolsShouldKeepBusinessResultInTaskView() throws Exception { + ThreadPoolTaskExecutor executor = executor(); + try { + Map businessResult = Map.of("workflowOutput", "ok"); + WorkflowAsyncSubTools subTools = new WorkflowAsyncSubTools(workflow(), + "workflow_demo", + "测试工作流", + new StubWorkflowToolExecutor(businessResult), + new InMemoryTaskStore(), + executor); + + AsyncToolTaskView view = submitAndResult(subTools); + + Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, view.getStatus()); + Assert.assertEquals(businessResult, view.getResult()); + Assert.assertEquals("workflow-run-1", view.getPayload().get("businessExecutionId")); + } finally { + executor.shutdown(); + } + } + + /** + * 验证 Plugin 异步子工具会把业务执行结果保留到任务视图。 + * + * @throws Exception 等待后台执行超时时抛出 + */ + @Test + public void pluginAsyncSubToolsShouldKeepBusinessResultInTaskView() throws Exception { + ThreadPoolTaskExecutor executor = executor(); + try { + Map businessResult = Map.of("pluginOutput", List.of("a", "b")); + PluginAsyncSubTools subTools = new PluginAsyncSubTools(pluginItem(), + "plugin_demo", + "测试插件", + new StubPluginToolExecutor(businessResult), + new InMemoryTaskStore(), + executor); + + AsyncToolTaskView view = submitAndResult(subTools); + + Assert.assertEquals(AsyncToolTaskStatus.SUCCEEDED, view.getStatus()); + Assert.assertEquals(businessResult, view.getResult()); + } finally { + executor.shutdown(); + } + } + + private AsyncToolTaskView submitAndResult(AbstractAgentAsyncSubTools subTools) throws Exception { + AgentToolContext context = context(); + AsyncToolSubmitResult submitted = subTools.submit(Map.of("keyword", "hello"), context); + waitTerminal(subTools, submitted.getTaskId(), context); + AsyncToolResultRequest request = new AsyncToolResultRequest(); + request.setTaskId(submitted.getTaskId()); + return subTools.result(request, context); + } + + private void waitTerminal(AbstractAgentAsyncSubTools subTools, String taskId, AgentToolContext context) throws Exception { + long deadline = System.currentTimeMillis() + 3000L; + AsyncToolTaskView view = observe(subTools, taskId, context); + while (!Boolean.TRUE.equals(view.getTerminal()) && System.currentTimeMillis() < deadline) { + Thread.sleep(20L); + view = observe(subTools, taskId, context); + } + Assert.assertTrue("异步任务应在测试超时前完成", Boolean.TRUE.equals(view.getTerminal())); + } + + private AsyncToolTaskView observe(AbstractAgentAsyncSubTools subTools, String taskId, AgentToolContext context) { + AsyncToolObserveRequest request = new AsyncToolObserveRequest(); + request.setTaskId(taskId); + return subTools.observe(request, context); + } + + private AgentToolContext context() { + AgentToolContext context = new AgentToolContext(); + context.setRequestId("request-1"); + context.setTraceId("trace-1"); + context.setSessionId("session-1"); + context.setAgentId("agent-1"); + context.setToolCallId("tool-call-1"); + return context; + } + + private Workflow workflow() { + Workflow workflow = new Workflow(); + workflow.setId(BigInteger.valueOf(101L)); + workflow.setTitle("测试工作流"); + return workflow; + } + + private PluginItem pluginItem() { + PluginItem pluginItem = new PluginItem(); + pluginItem.setId(BigInteger.valueOf(102L)); + pluginItem.setName("测试插件"); + return pluginItem; + } + + private ThreadPoolTaskExecutor executor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(1); + executor.setMaxPoolSize(1); + executor.setQueueCapacity(4); + executor.setThreadNamePrefix("workflow-plugin-async-test-"); + executor.initialize(); + return executor; + } + + private static final class StubWorkflowToolExecutor extends WorkflowToolExecutor { + + private final Map businessResult; + + private StubWorkflowToolExecutor(Map businessResult) { + super(null); + this.businessResult = businessResult; + } + + @Override + public AgentToolExecutionResult execute(Workflow workflow, Map arguments) { + return new AgentToolExecutionResult(businessResult, "workflow-run-1"); + } + } + + private static final class StubPluginToolExecutor extends PluginToolExecutor { + + private final Map businessResult; + + private StubPluginToolExecutor(Map businessResult) { + this.businessResult = businessResult; + } + + @Override + public AgentToolExecutionResult execute(PluginItem pluginItem, Map arguments) { + return new AgentToolExecutionResult(businessResult, null); + } + } + + private static final class InMemoryTaskStore implements AgentAsyncToolTaskStore { + + private final Map records = new ConcurrentHashMap<>(); + + @Override + public void create(AgentAsyncToolTaskRecord record) { + record.setSessionScopedKey(key(record.getSessionId(), record.getTaskId())); + records.put(record.getSessionScopedKey(), record); + } + + @Override + public Optional get(String sessionId, String taskId) { + return Optional.ofNullable(records.get(key(sessionId, taskId))); + } + + @Override + public Optional update(String sessionId, + String taskId, + UnaryOperator updater) { + AgentAsyncToolTaskRecord updated = records.computeIfPresent(key(sessionId, taskId), + (ignored, existing) -> updater == null ? existing : updater.apply(existing)); + return Optional.ofNullable(updated); + } + + @Override + public List list(String sessionId, AsyncToolTaskStatus status) { + List result = new ArrayList<>(); + for (AgentAsyncToolTaskRecord record : records.values()) { + if (sessionId.equals(record.getSessionId()) && (status == null || status == record.getStatus())) { + result.add(record); + } + } + result.sort(Comparator.comparing(AgentAsyncToolTaskRecord::getCreatedAt).reversed()); + return result; + } + + private String key(String sessionId, String taskId) { + return sessionId + ":" + taskId; + } + } +} diff --git a/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompilerTest.java b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompilerTest.java new file mode 100644 index 0000000..b5a9cd2 --- /dev/null +++ b/easyflow-modules/easyflow-module-agent/src/test/java/tech/easyflow/agent/runtime/tool/AgentToolRuntimeCompilerTest.java @@ -0,0 +1,239 @@ +package tech.easyflow.agent.runtime.tool; + +import com.easyagents.agent.runtime.tool.AgentToolSpec; +import com.easyagents.core.model.chat.tool.Parameter; +import com.easyagents.core.model.chat.tool.Tool; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; +import tech.easyflow.agent.entity.Agent; +import tech.easyflow.agent.entity.AgentToolBinding; +import tech.easyflow.agent.enums.AgentToolType; +import tech.easyflow.ai.entity.PluginItem; +import tech.easyflow.ai.entity.Workflow; +import tech.easyflow.common.web.exceptions.BusinessException; + +import java.lang.reflect.Field; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Agent 工具运行时编译测试。 + */ +public class AgentToolRuntimeCompilerTest { + + /** + * 验证 Workflow 默认按同步工具编译。 + * + * @throws Exception 反射注入依赖失败时抛出 + */ + @Test + public void compileShouldUseSyncModeByDefault() throws Exception { + AgentToolRuntimeCompiler compiler = compiler(); + + AgentToolRuntimeCompilation compilation = compiler.compile(agent(workflowBinding(null, false, "flow-sync"))); + + Assert.assertEquals(List.of("flow-sync"), toolNames(compilation)); + Assert.assertEquals(1, compilation.getToolInvokers().size()); + Assert.assertFalse(compilation.getToolSpecs().get(0).isApprovalRequired()); + } + + /** + * 验证非法执行模式会回退为同步工具。 + * + * @throws Exception 反射注入依赖失败时抛出 + */ + @Test + public void compileShouldFallbackToSyncWhenExecutionModeInvalid() throws Exception { + AgentToolRuntimeCompiler compiler = compiler(); + + AgentToolRuntimeCompilation compilation = compiler.compile(agent(workflowBinding("BAD", false, "flow-sync"))); + + Assert.assertEquals(List.of("flow-sync"), toolNames(compilation)); + Assert.assertEquals(1, compilation.getToolInvokers().size()); + } + + /** + * 验证 Workflow 异步模式会展开为五个固定子工具。 + * + * @throws Exception 反射注入依赖失败时抛出 + */ + @Test + public void compileShouldExpandWorkflowAsyncSubToolsAndNormalizeName() throws Exception { + AgentToolRuntimeCompiler compiler = compiler(); + + AgentToolRuntimeCompilation compilation = compiler.compile(agent(workflowBinding("ASYNC", true, "flow-alpha"))); + + Assert.assertEquals(List.of( + "flow_alpha_submit", + "flow_alpha_observe", + "flow_alpha_result", + "flow_alpha_cancel", + "flow_alpha_list" + ), toolNames(compilation)); + Assert.assertEquals(5, compilation.getToolInvokers().size()); + Assert.assertEquals(List.of("keyword"), compilation.getToolSpecs().get(0).getParametersSchema().get("required")); + Assert.assertTrue(compilation.getToolSpecs().get(0).isApprovalRequired()); + Assert.assertEquals("确认执行?", compilation.getToolSpecs().get(0).getApprovalRequest().getApprovalPrompt()); + Assert.assertFalse(compilation.getToolSpecs().get(1).isApprovalRequired()); + Assert.assertEquals("flow_alpha", compilation.getToolSpecs().get(0).getMetadata().get("asyncToolName")); + Assert.assertEquals("submit", compilation.getToolSpecs().get(0).getMetadata().get("asyncToolPhase")); + } + + /** + * 验证 Plugin 异步模式同样展开为五个固定子工具。 + * + * @throws Exception 反射注入依赖失败时抛出 + */ + @Test + public void compileShouldExpandPluginAsyncSubTools() throws Exception { + AgentToolRuntimeCompiler compiler = compiler(); + + AgentToolRuntimeCompilation compilation = compiler.compile(agent(pluginBinding("ASYNC", "plugin-tool"))); + + Assert.assertEquals(List.of( + "plugin_tool_submit", + "plugin_tool_observe", + "plugin_tool_result", + "plugin_tool_cancel", + "plugin_tool_list" + ), toolNames(compilation)); + Assert.assertEquals(5, compilation.getToolInvokers().size()); + for (AgentToolSpec spec : compilation.getToolSpecs()) { + Assert.assertEquals(Boolean.TRUE, spec.getMetadata().get("asyncTool")); + Assert.assertEquals("插件工具", spec.getMetadata().get("toolDisplayName")); + } + } + + /** + * 验证异步工具名归一化后发生冲突时会在编译阶段失败。 + * + * @throws Exception 反射注入依赖失败时抛出 + */ + @Test + public void compileShouldRejectNormalizedAsyncToolNameCollision() throws Exception { + AgentToolRuntimeCompiler compiler = compiler(); + AgentToolBinding first = workflowBinding("ASYNC", false, "flow-alpha"); + AgentToolBinding second = workflowBinding("ASYNC", false, "flow_alpha"); + second.setId(BigInteger.valueOf(13L)); + second.setTargetId(BigInteger.valueOf(103L)); + + try { + compiler.compile(agent(List.of(first, second))); + Assert.fail("异步工具名冲突时应编译失败"); + } catch (BusinessException e) { + Assert.assertTrue(e.getMessage().contains("flow_alpha_submit")); + } + } + + private AgentToolRuntimeCompiler compiler() throws Exception { + AgentToolRuntimeCompiler compiler = new AgentToolRuntimeCompiler(); + setField(compiler, "objectMapper", new ObjectMapper()); + setField(compiler, "workflowToolExecutor", new StubWorkflowToolExecutor()); + setField(compiler, "pluginToolExecutor", new StubPluginToolExecutor()); + return compiler; + } + + private Agent agent(AgentToolBinding binding) { + return agent(List.of(binding)); + } + + private Agent agent(List bindings) { + Agent agent = new Agent(); + agent.setId(BigInteger.ONE); + agent.setToolBindings(bindings); + return agent; + } + + private AgentToolBinding workflowBinding(String executionMode, boolean hitlEnabled, String toolName) { + AgentToolBinding binding = new AgentToolBinding(); + binding.setId(BigInteger.valueOf(11L)); + binding.setToolType(AgentToolType.WORKFLOW.name()); + binding.setTargetId(BigInteger.valueOf(101L)); + binding.setToolName(toolName); + binding.setEnabled(true); + binding.setHitlEnabled(hitlEnabled); + binding.setHitlConfigJson(Map.of("prompt", "确认执行?")); + binding.setOptionsJson(executionMode == null ? Map.of() : Map.of("executionMode", executionMode)); + binding.setResourceSnapshot(Map.of( + "id", BigInteger.valueOf(101L), + "title", "客户检索工作流", + "description", "按关键词检索客户", + "englishName", "flow-alpha" + )); + return binding; + } + + private AgentToolBinding pluginBinding(String executionMode, String toolName) { + AgentToolBinding binding = new AgentToolBinding(); + binding.setId(BigInteger.valueOf(12L)); + binding.setToolType(AgentToolType.PLUGIN.name()); + binding.setTargetId(BigInteger.valueOf(102L)); + binding.setToolName(toolName); + binding.setEnabled(true); + binding.setOptionsJson(Map.of("executionMode", executionMode)); + binding.setResourceSnapshot(Map.of( + "id", BigInteger.valueOf(102L), + "name", "插件工具", + "description", "调用插件", + "englishName", "plugin-tool" + )); + return binding; + } + + private List toolNames(AgentToolRuntimeCompilation compilation) { + return compilation.getToolSpecs().stream().map(AgentToolSpec::getName).collect(Collectors.toList()); + } + + private void setField(Object target, String fieldName, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private Tool testTool(String name, String description) { + Parameter parameter = new Parameter(); + parameter.setName("keyword"); + parameter.setDescription("关键词"); + parameter.setType("string"); + parameter.setRequired(true); + return Tool.builder() + .name(name) + .description(description) + .addParameter(parameter) + .function(arguments -> Map.of("ok", true)) + .build(); + } + + private final class StubWorkflowToolExecutor extends WorkflowToolExecutor { + + private StubWorkflowToolExecutor() { + super(null); + } + + @Override + public Tool buildTool(Workflow workflow) { + return testTool(workflow.getEnglishName(), workflow.getDescription()); + } + + @Override + public AgentToolExecutionResult execute(Workflow workflow, Map arguments) { + return new AgentToolExecutionResult(Map.of("ok", true), "wf-run-1"); + } + } + + private final class StubPluginToolExecutor extends PluginToolExecutor { + + @Override + public Tool buildTool(PluginItem pluginItem) { + return testTool(pluginItem.getEnglishName(), pluginItem.getDescription()); + } + + @Override + public AgentToolExecutionResult execute(PluginItem pluginItem, Map arguments) { + return new AgentToolExecutionResult(Map.of("ok", true), null); + } + } +} diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/EasyFlowThreadPoolProperties.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/EasyFlowThreadPoolProperties.java index cc699a4..adb9384 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/EasyFlowThreadPoolProperties.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/EasyFlowThreadPoolProperties.java @@ -10,6 +10,7 @@ public class EasyFlowThreadPoolProperties { private Pool sse = new Pool(4, 16, 2000, 30, true); private Pool documentImport = new Pool(2, 4, 200, 60, true); + private Pool agentAsyncTool = new Pool(2, 8, 200, 60, true); /** * 获取 SSE 线程池配置。 @@ -47,6 +48,24 @@ public class EasyFlowThreadPoolProperties { this.documentImport = documentImport; } + /** + * 获取 Agent 异步工具后台执行线程池配置。 + * + * @return Agent 异步工具线程池配置 + */ + public Pool getAgentAsyncTool() { + return agentAsyncTool; + } + + /** + * 设置 Agent 异步工具后台执行线程池配置。 + * + * @param agentAsyncTool Agent 异步工具线程池配置 + */ + public void setAgentAsyncTool(Pool agentAsyncTool) { + this.agentAsyncTool = agentAsyncTool; + } + /** * 线程池配置项。 */ diff --git a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java index daed5af..8896b4e 100644 --- a/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java +++ b/easyflow-modules/easyflow-module-ai/src/main/java/tech/easyflow/ai/config/ThreadPoolConfig.java @@ -78,4 +78,30 @@ public class ThreadPoolConfig { executor.initialize(); return executor; } + + /** + * 创建 Agent 异步工具后台执行线程池。 + * + * @return Agent 异步工具执行线程池 + */ + @Bean(name = "agentAsyncToolExecutor") + public ThreadPoolTaskExecutor agentAsyncToolExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + EasyFlowThreadPoolProperties.Pool pool = properties.getAgentAsyncTool(); + executor.setCorePoolSize(pool.getCoreSize()); + executor.setMaxPoolSize(pool.getMaxSize()); + executor.setQueueCapacity(pool.getQueueCapacity()); + executor.setKeepAliveSeconds(pool.getKeepAliveSeconds()); + executor.setAllowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); + executor.setThreadNamePrefix("agent-async-tool-"); + executor.setRejectedExecutionHandler((runnable, executorService) -> { + log.error("Agent异步工具线程池过载!核心线程数:{},最大线程数:{},队列任务数:{}", + executorService.getCorePoolSize(), + executorService.getMaximumPoolSize(), + executorService.getQueue().size()); + throw new BusinessException("Agent 异步工具任务繁忙,请稍后重试"); + }); + executor.initialize(); + return executor; + } } diff --git a/easyflow-ui-admin/app/src/views/ai/agent-chat/adapters/agentTimelineAdapter.ts b/easyflow-ui-admin/app/src/views/ai/agent-chat/adapters/agentTimelineAdapter.ts index d155074..41fbcdc 100644 --- a/easyflow-ui-admin/app/src/views/ai/agent-chat/adapters/agentTimelineAdapter.ts +++ b/easyflow-ui-admin/app/src/views/ai/agent-chat/adapters/agentTimelineAdapter.ts @@ -5,6 +5,7 @@ import type { ChatTimelineKnowledgeHit, ChatTimelineMessageItem, ChatTimelineToolApprovalPayload, + ChatTimelineToolStatus, } from '@easyflow/common-ui'; import {ChatTimelineBuilder} from '@easyflow/common-ui'; @@ -30,6 +31,17 @@ function asArray(value: unknown): any[] { return Array.isArray(value) ? value : []; } +function asyncToolTimelineStatus( + payload: Record, +): ChatTimelineToolStatus { + const status = asText(payload.status).toUpperCase(); + if (status === 'SUCCEEDED') return 'success'; + if (status === 'FAILED' || status === 'TIMEOUT' || status === 'CANCELLED') { + return 'error'; + } + return 'running'; +} + function asTimestamp(value: unknown) { if (!value) { return Date.now(); @@ -427,19 +439,29 @@ export function applyAgentSseEnvelope( return; } if (domain === 'TOOL' && (type === 'TOOL_CALL' || type === 'TOOL_RESULT')) { + const asyncTool = payload.asyncTool === true; + const toolName = normalizeToolName( + payload.toolDisplayName ?? payload.toolName ?? payload.name, + ); ChatTimelineBuilder.upsertToolCall(items, { input: payload.input ?? payload.toolInput, - output: payload.output ?? payload.result ?? payload.text, - status: type === 'TOOL_RESULT' ? 'success' : 'running', + output: asyncTool + ? payload.summary ?? payload.label ?? payload.output ?? payload.result ?? payload.text + : payload.output ?? payload.result ?? payload.text, + status: asyncTool + ? asyncToolTimelineStatus(payload) + : type === 'TOOL_RESULT' + ? 'success' + : 'running', statusKey: statusKeyForProjection( payload, metadata, 'knowledge-retrieval', ), - toolCallId: normalizeToolCallId(payload), - toolName: normalizeToolName( - payload.toolDisplayName ?? payload.toolName ?? payload.name, - ), + toolCallId: asyncTool + ? asText(payload.toolCallId ?? payload.taskId ?? payload.id) + : normalizeToolCallId(payload), + toolName, }); return; } diff --git a/easyflow-ui-admin/app/src/views/ai/agents/components/AgentToolForm.vue b/easyflow-ui-admin/app/src/views/ai/agents/components/AgentToolForm.vue index 508c117..4cad935 100644 --- a/easyflow-ui-admin/app/src/views/ai/agents/components/AgentToolForm.vue +++ b/easyflow-ui-admin/app/src/views/ai/agents/components/AgentToolForm.vue @@ -79,6 +79,19 @@ const selectedMcpTools = computed(() => { const selectedMcpToolCount = computed(() => selectedMcpTools.value.length); +const asyncExecutionEnabled = computed({ + get() { + return String(props.binding.optionsJson?.executionMode || '').toUpperCase() === 'ASYNC'; + }, + set(value: boolean) { + props.binding.optionsJson = { + ...(props.binding.optionsJson || {}), + executionMode: value ? 'ASYNC' : 'SYNC', + }; + emit('change'); + }, +}); + function handleTargetChange(value: string) { const option = props.options.find( (item) => String(item.value) === String(value), @@ -147,6 +160,9 @@ function handleTargetChange(value: string) { + + + ): ChatTimelineToolStatus { + const status = asText(payload.status).toUpperCase(); + if (status === 'SUCCEEDED') return 'success'; + if (status === 'FAILED' || status === 'TIMEOUT' || status === 'CANCELLED') { + return 'error'; + } + return 'running'; +} + function sortedRounds(rounds: Map) { return [...rounds.values()].sort( (first, second) =>