refactor: 重构升级为有状态 Agent

- 完善 hook 对接机制
- 提供更加明确的调用方式
This commit is contained in:
2026-05-23 21:17:50 +08:00
parent 8356560c26
commit 2b5e701ade
53 changed files with 5108 additions and 2523 deletions

View File

@@ -0,0 +1,130 @@
package com.easyagents.agent.runtime.event;
import com.easyagents.agent.runtime.AgentRuntimeException;
import com.easyagents.agent.runtime.AgentRuntimeExecutionContext;
import reactor.core.publisher.Sinks;
import java.util.Map;
import java.util.Optional;
/**
* AgentScope 运行时的旁路事件桥。
*
* <p>该类只负责 Easy-Agents 自己的旁路监察事件,不负责 AgentScope 主线路事件。
* 主线路事件来自 {@code agent.stream(...)},会进入模型会话、工具结果和最终输出的正常顺序;
* 旁路事件只给调用方观察运行过程,例如知识库检索、自动上下文压缩、工具审批和 Skill 步骤。</p>
*
* <p>旁路事件不会写入 AgentScope memory/session也不会修改 AgentScope {@code Msg}。仅作观察与展示使用</p>
*/
public class AgentRuntimeEventBridge {
private final AgentRuntimeExecutionContext fallbackContext;
private final AgentRuntimeTurnContextHolder turnContextHolder;
/**
* 创建旁路事件桥。
*
* @param fallbackContext 运行时级上下文,当前轮次未设置时作为身份信息来源
* @param turnContextHolder 当前运行轮次上下文持有器
*/
public AgentRuntimeEventBridge(AgentRuntimeExecutionContext fallbackContext,
AgentRuntimeTurnContextHolder turnContextHolder) {
this.fallbackContext = fallbackContext;
this.turnContextHolder = turnContextHolder;
}
/**
* 创建固定 sink 的旁路事件桥,主要用于旧测试和旧辅助构造路径。
*
* @param fallbackContext 运行时级上下文
* @param eventSink 旁路事件 sink
* @return 旁路事件桥
*/
public static AgentRuntimeEventBridge fixed(AgentRuntimeExecutionContext fallbackContext,
Sinks.Many<AgentRuntimeEvent> eventSink) {
AgentRuntimeTurnContextHolder holder = new AgentRuntimeTurnContextHolder();
AgentRuntimeEventBridge bridge = new AgentRuntimeEventBridge(fallbackContext, holder);
holder.set(new AgentRuntimeTurnContext(null, eventSink, bridge));
return bridge;
}
/**
* 创建指定类型的旁路事件,并自动补齐公共运行身份信息。
*
* @param type 旁路事件类型
* @return 已补齐公共字段的运行时事件
*/
public AgentRuntimeEvent event(AgentRuntimeEventType type) {
AgentRuntimeEvent event = AgentRuntimeEvent.of(type);
enrich(event);
return event;
}
/**
* 发射一条旁路事件。
*
* <p>当前没有运行轮次或没有旁路 sink 时直接忽略。这允许 adapter 在非流式测试、
* 初始化阶段或主线路未建立旁路订阅时保持无副作用。</p>
*
* @param event 旁路事件
*/
public void emit(AgentRuntimeEvent event) {
if (event == null) {
return;
}
Optional<Sinks.Many<AgentRuntimeEvent>> sink = eventSink();
if (sink.isEmpty()) {
return;
}
enrich(event);
Sinks.EmitResult result = sink.get().tryEmitNext(event);
if (result.isFailure()) {
throw new AgentRuntimeException("Failed to emit agent runtime side event: " + result);
}
}
/**
* 获取当前轮次合并后的执行上下文。
*
* @return 当前执行上下文
*/
public AgentRuntimeExecutionContext executionContext() {
return turnContextHolder == null ? fallbackContext : turnContextHolder.executionContext(fallbackContext);
}
/**
* 补齐旁路事件的公共身份信息。
*
* @param event 旁路事件
*/
public void enrich(AgentRuntimeEvent event) {
if (event == null) {
return;
}
AgentRuntimeExecutionContext context = executionContext();
if (context == null) {
return;
}
if (event.getTraceId() == null || event.getTraceId().isBlank()) {
event.setTraceId(context.getTraceId());
}
if (event.getSessionId() == null || event.getSessionId().isBlank()) {
event.setSessionId(context.getSessionId());
}
if ((event.getAgentId() == null || event.getAgentId().isBlank())
&& context.getAgentDefinition() != null) {
event.setAgentId(context.getAgentDefinition().getAgentId());
}
if (context.getRequestId() != null && !context.getRequestId().isBlank()) {
event.getMetadata().putIfAbsent("requestId", context.getRequestId());
}
Map<String, Object> metadata = context.getMetadata();
if (metadata != null && !metadata.isEmpty()) {
metadata.forEach(event.getMetadata()::putIfAbsent);
}
}
private Optional<Sinks.Many<AgentRuntimeEvent>> eventSink() {
return turnContextHolder == null ? Optional.empty() : turnContextHolder.eventSink();
}
}

View File

@@ -14,6 +14,16 @@ public enum AgentRuntimeEventType {
*/
REASONING_DELTA,
/**
* 智能体开始一次模型推理。
*/
REASONING_STARTED,
/**
* 智能体完成一次模型推理。
*/
REASONING_COMPLETED,
/**
* 流式输出内容,用于流式展示聊天内容。
*/
@@ -34,6 +44,16 @@ public enum AgentRuntimeEventType {
*/
KNOWLEDGE_RETRIEVAL,
/**
* 自动上下文压缩已开始。
*/
MEMORY_COMPRESSION_STARTED,
/**
* 自动上下文压缩已完成。
*/
MEMORY_COMPRESSION_COMPLETED,
/**
* 工具执行前需要人工审批。
*/
@@ -64,6 +84,11 @@ public enum AgentRuntimeEventType {
*/
COMPLETED,
/**
* 智能体运行已暂停,等待外部输入后继续。
*/
SUSPENDED,
/**
* 智能体运行失败。
*/

View File

@@ -0,0 +1,34 @@
package com.easyagents.agent.runtime.event;
import io.agentscope.core.hook.HookEvent;
import reactor.core.publisher.Mono;
/**
* AgentScope Hook 事件的主线路干预器。对接 AgentScope 的原生生命周期hook
*
* <p>干预器允许修改 AgentScope HookEvent因此会影响主线路执行。
* 典型场景包括 AutoContext 在推理前改写输入消息,或后续 HITL 在推理后调用
* {@code stopAgent()} 暂停工具执行。普通运行状态通知应使用 {@link AgentRuntimeObserver}。</p>
* <p>警告本干预器会影响到主线路agent 交互,谨慎使用
* </p>
*/
public interface AgentRuntimeInterceptor {
/**
* 处理并返回可能被修改的 AgentScope Hook 事件。
*
* @param event AgentScope Hook 事件
* @param <T> Hook 事件类型
* @return 处理后的 Hook 事件
*/
<T extends HookEvent> Mono<T> intercept(T event);
/**
* 获取干预器执行优先级。
*
* @return 优先级,数值越小越先执行
*/
default int priority() {
return 100;
}
}

View File

@@ -0,0 +1,72 @@
package com.easyagents.agent.runtime.event;
import io.agentscope.core.hook.HookEvent;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
/**
* AgentScope Hook 事件的统一观察和干预调度器。
*
* <p>处理顺序固定为:先执行干预器,再执行观察器。干预器属于主线路能力,
* 可以修改 AgentScope HookEvent观察器属于旁线路能力只能查看事件并通过
* {@link AgentRuntimeEventBridge} 发射对外监察事件。</p>
*/
public class AgentRuntimeObservationManager {
private final List<AgentRuntimeInterceptor> interceptors;
private final List<AgentRuntimeObserver> observers;
/**
* 创建观察调度器。
*
* @param interceptors 主线路干预器
* @param observers 旁路观察器
*/
public AgentRuntimeObservationManager(List<AgentRuntimeInterceptor> interceptors,
List<AgentRuntimeObserver> observers) {
this.interceptors = sortInterceptors(interceptors);
this.observers = sortObservers(observers);
}
/**
* 创建空观察调度器。
*
* @return 空调度器
*/
public static AgentRuntimeObservationManager empty() {
return new AgentRuntimeObservationManager(List.of(), List.of());
}
/**
* 处理 AgentScope Hook 事件。
*
* @param event Hook 事件
* @param <T> Hook 事件类型
* @return 处理后的 Hook 事件
*/
public <T extends HookEvent> Mono<T> handle(T event) {
Mono<T> chain = Mono.just(event);
for (AgentRuntimeInterceptor interceptor : interceptors) {
chain = chain.flatMap(interceptor::intercept);
}
for (AgentRuntimeObserver observer : observers) {
chain = chain.flatMap(current -> observer.observe(current).thenReturn(current));
}
return chain;
}
private List<AgentRuntimeInterceptor> sortInterceptors(List<AgentRuntimeInterceptor> source) {
List<AgentRuntimeInterceptor> sorted = new ArrayList<>(source == null ? List.of() : source);
sorted.sort(Comparator.comparingInt(AgentRuntimeInterceptor::priority));
return List.copyOf(sorted);
}
private List<AgentRuntimeObserver> sortObservers(List<AgentRuntimeObserver> source) {
List<AgentRuntimeObserver> sorted = new ArrayList<>(source == null ? List.of() : source);
sorted.sort(Comparator.comparingInt(AgentRuntimeObserver::priority));
return List.copyOf(sorted);
}
}

View File

@@ -0,0 +1,31 @@
package com.easyagents.agent.runtime.event;
import io.agentscope.core.hook.HookEvent;
import reactor.core.publisher.Mono;
/**
* AgentScope Hook 事件的旁路观察器。对接 AgentScope 的原生生命周期hook
*
* <p>观察器只做监察和旁路事件发射,不允许修改 AgentScope HookEvent。
* 如果能力需要影响主线路,例如修改输入消息、暂停 agent 或替换工具结果,应实现
* {@link AgentRuntimeInterceptor}。</p>
*/
public interface AgentRuntimeObserver {
/**
* 观察 AgentScope Hook 事件。
*
* @param event AgentScope Hook 事件
* @return 完成信号
*/
Mono<Void> observe(HookEvent event);
/**
* 获取观察器执行优先级。
*
* @return 优先级,数值越小越先执行
*/
default int priority() {
return 100;
}
}

View File

@@ -0,0 +1,137 @@
package com.easyagents.agent.runtime.event;
import com.easyagents.agent.runtime.AgentRuntimeExecutionContext;
import reactor.core.publisher.Sinks;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* AgentScope 运行时的单轮执行上下文。
*/
public class AgentRuntimeTurnContext {
/**
* 本轮运行上下文。
*/
private final AgentRuntimeExecutionContext executionContext;
/**
* 本轮旁路事件 sink。
*
* <p>该字段只承载旁线路监察事件,不承载 AgentScope 主线路 stream 事件。</p>
*/
private final Sinks.Many<AgentRuntimeEvent> eventSink;
/**
* 本轮旁路事件桥。
*
* <p>adapter 和 observer 应优先通过 bridge 发射旁路事件,避免各处重复拼接
* trace/session/request 等公共字段。</p>
*/
private final AgentRuntimeEventBridge eventBridge;
/**
* 创建单轮执行上下文。
*
* @param executionContext 本轮运行上下文
* @param eventSink 本轮旁路事件 sink
*/
public AgentRuntimeTurnContext(AgentRuntimeExecutionContext executionContext,
Sinks.Many<AgentRuntimeEvent> eventSink) {
this(executionContext, eventSink, null);
}
/**
* 创建单轮执行上下文。
*
* @param executionContext 本轮运行上下文
* @param eventSink 本轮旁路事件 sink
* @param eventBridge 本轮旁路事件桥
*/
public AgentRuntimeTurnContext(AgentRuntimeExecutionContext executionContext,
Sinks.Many<AgentRuntimeEvent> eventSink,
AgentRuntimeEventBridge eventBridge) {
this.executionContext = executionContext;
this.eventSink = eventSink;
this.eventBridge = eventBridge;
}
/**
* 获取本轮运行上下文。
*
* @return 本轮运行上下文
*/
public AgentRuntimeExecutionContext getExecutionContext() {
return executionContext;
}
/**
* 获取本轮旁路事件 sink。
*
* @return 本轮旁路事件 sink
*/
public Sinks.Many<AgentRuntimeEvent> getEventSink() {
return eventSink;
}
/**
* 获取本轮旁路事件桥。
*
* @return 本轮旁路事件桥
*/
public AgentRuntimeEventBridge getEventBridge() {
return eventBridge;
}
/**
* 构建以当前轮信息覆盖运行时信息后的上下文。
*
* @param fallback 运行时级上下文
* @return 当前轮上下文,未设置时返回运行时级上下文
*/
public AgentRuntimeExecutionContext mergeWith(AgentRuntimeExecutionContext fallback) {
if (executionContext == null) {
return fallback;
}
AgentRuntimeExecutionContext merged = new AgentRuntimeExecutionContext();
merged.setRequestId(firstNonBlank(executionContext.getRequestId(), fallback == null ? null : fallback.getRequestId()));
merged.setTraceId(firstNonBlank(executionContext.getTraceId(), fallback == null ? null : fallback.getTraceId()));
merged.setSessionId(firstNonBlank(executionContext.getSessionId(), fallback == null ? null : fallback.getSessionId()));
merged.setAgentDefinition(executionContext.getAgentDefinition() == null && fallback != null
? fallback.getAgentDefinition()
: executionContext.getAgentDefinition());
merged.setRuntimeContext(executionContext.getRuntimeContext() == null && fallback != null
? fallback.getRuntimeContext()
: executionContext.getRuntimeContext());
merged.setUserMessage(executionContext.getUserMessage());
merged.setMemorySnapshot(executionContext.getMemorySnapshot());
merged.setToolInvokers(fallback == null ? executionContext.getToolInvokers() : fallback.getToolInvokers());
merged.setKnowledgeRetrievers(fallback == null
? executionContext.getKnowledgeRetrievers()
: fallback.getKnowledgeRetrievers());
merged.setSessionStore(fallback == null ? executionContext.getSessionStore() : fallback.getSessionStore());
merged.setConversationRecorder(fallback == null
? executionContext.getConversationRecorder()
: fallback.getConversationRecorder());
merged.setMetadata(mergedMetadata(fallback, executionContext));
merged.setCancelReason(executionContext.getCancelReason());
return merged;
}
private Map<String, Object> mergedMetadata(AgentRuntimeExecutionContext fallback,
AgentRuntimeExecutionContext current) {
Map<String, Object> metadata = new LinkedHashMap<>();
if (fallback != null && fallback.getMetadata() != null) {
metadata.putAll(fallback.getMetadata());
}
if (current != null && current.getMetadata() != null) {
metadata.putAll(current.getMetadata());
}
return metadata;
}
private String firstNonBlank(String first, String second) {
return first == null || first.isBlank() ? second : first;
}
}

View File

@@ -0,0 +1,74 @@
package com.easyagents.agent.runtime.event;
import com.easyagents.agent.runtime.AgentRuntimeExecutionContext;
import reactor.core.publisher.Sinks;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
/**
* 保存当前运行轮次上下文。
*/
public class AgentRuntimeTurnContextHolder {
private final AtomicReference<AgentRuntimeTurnContext> current = new AtomicReference<>();
/**
* 设置当前运行轮次上下文。
*
* @param context 当前轮次上下文
*/
public void set(AgentRuntimeTurnContext context) {
current.set(context);
}
/**
* 清理当前运行轮次上下文。
*/
public void clear() {
current.set(null);
}
/**
* 获取当前运行轮次上下文。
*
* @return 当前轮次上下文
*/
public Optional<AgentRuntimeTurnContext> current() {
return Optional.ofNullable(current.get());
}
/**
* 获取当前轮次事件 sink。
*
* @return 当前轮次旁路事件 sink
*/
public Optional<Sinks.Many<AgentRuntimeEvent>> eventSink() {
return current()
.map(AgentRuntimeTurnContext::getEventSink)
.filter(sink -> sink != null);
}
/**
* 获取当前轮次旁路事件桥。
*
* @return 当前轮次旁路事件桥
*/
public Optional<AgentRuntimeEventBridge> eventBridge() {
return current()
.map(AgentRuntimeTurnContext::getEventBridge)
.filter(bridge -> bridge != null);
}
/**
* 合并当前轮次上下文和运行时级上下文。
*
* @param fallback 运行时级上下文
* @return 合并后的上下文
*/
public AgentRuntimeExecutionContext executionContext(AgentRuntimeExecutionContext fallback) {
return current()
.map(context -> context.mergeWith(fallback))
.orElse(fallback);
}
}

View File

@@ -0,0 +1,357 @@
package com.easyagents.agent.runtime.event.interceptor;
import com.easyagents.agent.runtime.AgentRuntimeException;
import com.easyagents.agent.runtime.event.AgentRuntimeEvent;
import com.easyagents.agent.runtime.event.AgentRuntimeEventBridge;
import com.easyagents.agent.runtime.event.AgentRuntimeEventType;
import com.easyagents.agent.runtime.event.AgentRuntimeInterceptor;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Agent;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PreCallEvent;
import io.agentscope.core.hook.PreReasoningEvent;
import io.agentscope.core.memory.Memory;
import io.agentscope.core.memory.autocontext.*;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.plan.PlanNotebook;
import io.agentscope.core.tool.Toolkit;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* AutoContext 主线路干预器。
*
* <p><strong>特殊约束:</strong>本干预器是对 AgentScope 官方
* {@code AutoContextHook} 的替代实现,不能与官方 {@code AutoContextHook} 同时注册。
* 两者同时存在会导致 {@link AutoContextMemory#compressIfNeeded()}、上下文重写以及
* {@link ContextOffloadTool} 注册被重复执行。</p>
*
* <p>本类承担两类职责。第一类是主线路干预:在 {@link PreCallEvent} 中注册
* AutoContext 工具能力,在 {@link PreReasoningEvent} 中触发记忆压缩并改写
* LLM 输入消息。第二类是旁路通知:通过 {@link AgentRuntimeEventBridge} 发出
* {@link AgentRuntimeEventType#MEMORY_COMPRESSION_STARTED} 和
* {@link AgentRuntimeEventType#MEMORY_COMPRESSION_COMPLETED},这些事件只用于调用方展示,
* 不写入 AgentScope memory/session也不参与 LLM 会话协议。</p>
*/
public class AutoContextInterceptor implements AgentRuntimeInterceptor {
private static final String STATUS_KEY = "memory-compression";
private static final String AUTO_CONTEXT_SYSTEM_INSTRUCTION =
"You may see compressed messages containing <!-- CONTEXT_OFFLOAD uuid=... -->.\n"
+ "- Use the UUID to call context_reload if you need full details.\n"
+ "- NEVER mention, quote, or refer to UUIDs, offload tags, or internal metadata in your response.";
private final AgentRuntimeEventBridge eventBridge;
private final AutoContextConfig autoContextConfig;
private final AtomicBoolean registered = new AtomicBoolean(false);
/**
* 创建 AutoContext 主线路干预器。
*
* <p>传入的 {@link AutoContextConfig} 必须是创建目标 {@link AutoContextMemory}
* 时使用的同一份配置。这样压缩开始事件的触发条件才能与 AgentScope
* {@code compressIfNeeded()} 的入口判断保持一致。</p>
*
* @param eventBridge 旁路事件桥
* @param autoContextConfig AutoContext 配置
*/
public AutoContextInterceptor(AgentRuntimeEventBridge eventBridge,
AutoContextConfig autoContextConfig) {
this.eventBridge = eventBridge;
this.autoContextConfig = autoContextConfig;
}
/**
* 处理 AutoContext 相关 AgentScope Hook 事件。
*
* @param event AgentScope Hook 事件
* @param <T> Hook 事件类型
* @return 处理后的 Hook 事件
*/
@Override
public <T extends HookEvent> Mono<T> intercept(T event) {
if (event instanceof PreCallEvent preCallEvent) {
@SuppressWarnings("unchecked")
Mono<T> result = (Mono<T>) handlePreCall(preCallEvent);
return result;
}
if (event instanceof PreReasoningEvent preReasoningEvent) {
@SuppressWarnings("unchecked")
Mono<T> result = (Mono<T>) handlePreReasoning(preReasoningEvent);
return result;
}
return Mono.just(event);
}
/**
* 获取干预器优先级。
*
* @return 优先级,保持与官方 AutoContextHook 一致
*/
@Override
public int priority() {
return 0;
}
/**
* 判断 AutoContext 工具集成是否已经注册。
*
* @return 已注册时返回 true
*/
public boolean isRegistered() {
return registered.get();
}
/**
* 处理调用前事件,注册 AutoContext 的上下文重载工具和计划本。
*
* @param event 调用前事件
* @return 原事件
*/
private Mono<PreCallEvent> handlePreCall(PreCallEvent event) {
if (registered.get()) {
return Mono.just(event);
}
Agent agent = event.getAgent();
if (!(agent instanceof ReActAgent reActAgent)) {
return Mono.just(event);
}
Memory memory = reActAgent.getMemory();
if (!(memory instanceof AutoContextMemory autoContextMemory)) {
return Mono.just(event);
}
if (!registered.compareAndSet(false, true)) {
return Mono.just(event);
}
try {
Toolkit toolkit = reActAgent.getToolkit();
if (toolkit != null) {
toolkit.registerTool(new ContextOffloadTool(autoContextMemory));
}
PlanNotebook planNotebook = reActAgent.getPlanNotebook();
if (planNotebook != null) {
autoContextMemory.attachPlanNote(planNotebook);
}
} catch (Exception e) {
registered.set(false);
throw new AgentRuntimeException("Failed to register AutoContext integration.", e);
}
return Mono.just(event);
}
/**
* 处理推理前事件,触发 AutoContext 压缩并改写 LLM 输入消息。
*
* <p>这是主线路干预逻辑:{@code compressIfNeeded()} 和 {@code setInputMessages(...)}
* 会影响 AgentScope 本次 reasoning 输入。压缩开始/完成事件则是旁路通知,只发给调用方。</p>
*
* @param event 推理前事件
* @return 改写输入消息后的事件
*/
private Mono<PreReasoningEvent> handlePreReasoning(PreReasoningEvent event) {
Agent agent = event.getAgent();
if (!(agent instanceof ReActAgent reActAgent)) {
return Mono.just(event);
}
Memory memory = reActAgent.getMemory();
if (!(memory instanceof AutoContextMemory autoContextMemory)) {
return Mono.just(event);
}
// 判断是否达到压缩条件
CompressionCheck compressionCheck = compressionCheck(autoContextMemory.getMessages());
int beforeEventCount = compressionEventCount(autoContextMemory);
if (compressionCheck.thresholdReached()) {
boolean compressed = autoContextMemory.compressIfNeeded();
List<CompressionEvent> newEvents = newCompressionEvents(autoContextMemory, beforeEventCount);
if (compressed && !newEvents.isEmpty()) {
emitCompressionStarted(compressionCheck);
emitCompressionCompleted(compressionCheck, newEvents);
}
}
// 压缩完毕后自动进行当前的会话
event.setInputMessages(buildInputMessages(event, autoContextMemory));
return Mono.just(event);
}
/**
* 计算当前记忆是否达到 AutoContext 官方压缩入口条件。
*
* @param messages 当前工作记忆消息
* @return 压缩入口检查结果
*/
private CompressionCheck compressionCheck(List<Msg> messages) {
List<Msg> safeMessages = messages == null ? List.of() : messages;
int messageCount = safeMessages.size();
int tokenCount = TokenCounterUtil.calculateToken(safeMessages);
int thresholdMessageCount = autoContextConfig == null ? Integer.MAX_VALUE : autoContextConfig.getMsgThreshold();
long thresholdTokenCount = autoContextConfig == null
? Long.MAX_VALUE
: (long) (autoContextConfig.getMaxToken() * autoContextConfig.getTokenRatio());
boolean messageThresholdReached = messageCount >= thresholdMessageCount;
boolean tokenThresholdReached = tokenCount >= thresholdTokenCount;
return new CompressionCheck(messageCount, tokenCount, thresholdMessageCount, thresholdTokenCount,
messageThresholdReached, tokenThresholdReached);
}
/**
* 获取当前压缩事件数量。
*
* @param autoContextMemory AutoContext 记忆
* @return 压缩事件数量
*/
private int compressionEventCount(AutoContextMemory autoContextMemory) {
List<CompressionEvent> events = autoContextMemory.getCompressionEvents();
return events == null ? 0 : events.size();
}
/**
* 获取本次压缩新增的 AgentScope 压缩事件。
*
* @param autoContextMemory AutoContext 记忆
* @param beforeEventCount 压缩前事件数量
* @return 新增压缩事件
*/
private List<CompressionEvent> newCompressionEvents(AutoContextMemory autoContextMemory, int beforeEventCount) {
List<CompressionEvent> events = autoContextMemory.getCompressionEvents();
if (events == null || events.size() <= beforeEventCount) {
return List.of();
}
return new ArrayList<>(events.subList(beforeEventCount, events.size()));
}
/**
* 构建 AutoContext 压缩后传给 LLM 的输入消息。
*
* @param event 推理前事件
* @param autoContextMemory AutoContext 记忆
* @return 更新后的输入消息
*/
private List<Msg> buildInputMessages(PreReasoningEvent event, AutoContextMemory autoContextMemory) {
List<Msg> originalInputMessages = event.getInputMessages();
List<Msg> newInputMessages = new ArrayList<>();
if (!originalInputMessages.isEmpty() && originalInputMessages.get(0).getRole() == MsgRole.SYSTEM) {
Msg originalSystemMsg = originalInputMessages.get(0);
String originalSystemText = originalSystemMsg.getTextContent();
String newSystemText = originalSystemText != null
? originalSystemText + "\n\n" + AUTO_CONTEXT_SYSTEM_INSTRUCTION
: AUTO_CONTEXT_SYSTEM_INSTRUCTION;
newInputMessages.add(Msg.builder()
.role(MsgRole.SYSTEM)
.name(originalSystemMsg.getName())
.content(TextBlock.builder().text(newSystemText).build())
.metadata(originalSystemMsg.getMetadata())
.build());
} else {
newInputMessages.add(Msg.builder()
.role(MsgRole.SYSTEM)
.name("system")
.content(TextBlock.builder().text(AUTO_CONTEXT_SYSTEM_INSTRUCTION).build())
.build());
}
newInputMessages.addAll(autoContextMemory.getMessages());
return newInputMessages;
}
/**
* 发射上下文压缩开始旁路事件。
*
* @param compressionCheck 压缩入口检查结果
*/
private void emitCompressionStarted(CompressionCheck compressionCheck) {
AgentRuntimeEvent event = eventBridge.event(AgentRuntimeEventType.MEMORY_COMPRESSION_STARTED);
event.getPayload().put("statusKey", STATUS_KEY);
event.getPayload().put("phase", "started");
event.getPayload().put("status", "running");
event.getPayload().put("label", "正在整理上下文");
putCompressionCheckPayload(event, compressionCheck);
eventBridge.emit(event);
}
/**
* 发射上下文压缩完成旁路事件。
*
* @param compressionCheck 压缩入口检查结果
* @param events 新增压缩事件
*/
private void emitCompressionCompleted(CompressionCheck compressionCheck,
List<CompressionEvent> events) {
AgentRuntimeEvent event = eventBridge.event(AgentRuntimeEventType.MEMORY_COMPRESSION_COMPLETED);
event.getPayload().put("statusKey", STATUS_KEY);
event.getPayload().put("phase", "completed");
event.getPayload().put("status", "done");
event.getPayload().put("label", "已整理上下文");
event.getPayload().put("compressed", true);
event.getPayload().put("eventCount", events == null ? 0 : events.size());
event.getPayload().put("events", toPayloadEvents(events));
putCompressionCheckPayload(event, compressionCheck);
eventBridge.emit(event);
}
/**
* 填充压缩入口判断相关载荷。
*
* @param event 运行时事件
* @param compressionCheck 压缩入口检查结果
*/
private void putCompressionCheckPayload(AgentRuntimeEvent event, CompressionCheck compressionCheck) {
event.getPayload().put("messageCount", compressionCheck.messageCount());
event.getPayload().put("tokenCount", compressionCheck.tokenCount());
event.getPayload().put("thresholdMessageCount", compressionCheck.thresholdMessageCount());
event.getPayload().put("thresholdTokenCount", compressionCheck.thresholdTokenCount());
event.getPayload().put("messageThresholdReached", compressionCheck.messageThresholdReached());
event.getPayload().put("tokenThresholdReached", compressionCheck.tokenThresholdReached());
}
/**
* 将 AgentScope 压缩事件转换为旁路事件载荷。
*
* @param events AgentScope 压缩事件
* @return 压缩事件载荷
*/
private List<Map<String, Object>> toPayloadEvents(List<CompressionEvent> events) {
if (events == null || events.isEmpty()) {
return List.of();
}
List<Map<String, Object>> payloadEvents = new ArrayList<>();
for (CompressionEvent event : events) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("eventType", event.getEventType());
payload.put("timestamp", event.getTimestamp());
payload.put("compressedMessageCount", event.getCompressedMessageCount());
payload.put("previousMessageId", event.getPreviousMessageId());
payload.put("nextMessageId", event.getNextMessageId());
payload.put("compressedMessageId", event.getCompressedMessageId());
payload.put("tokenBefore", event.getTokenBefore());
payload.put("tokenAfter", event.getTokenAfter());
payload.put("tokenReduction", event.getTokenReduction());
payload.put("inputToken", event.getCompressInputToken());
payload.put("outputToken", event.getCompressOutputToken());
payloadEvents.add(payload);
}
return payloadEvents;
}
private record CompressionCheck(int messageCount,
int tokenCount,
int thresholdMessageCount,
long thresholdTokenCount,
boolean messageThresholdReached,
boolean tokenThresholdReached) {
/**
* 判断是否达到 AutoContext 官方压缩入口条件。
*
* @return 达到任一阈值时返回 true
*/
private boolean thresholdReached() {
return messageThresholdReached || tokenThresholdReached;
}
}
}

View File

@@ -0,0 +1,198 @@
package com.easyagents.agent.runtime.event.interceptor;
import com.easyagents.agent.runtime.AgentRuntimeExecutionContext;
import com.easyagents.agent.runtime.event.AgentRuntimeEvent;
import com.easyagents.agent.runtime.event.AgentRuntimeEventBridge;
import com.easyagents.agent.runtime.event.AgentRuntimeEventType;
import com.easyagents.agent.runtime.event.AgentRuntimeInterceptor;
import com.easyagents.agent.runtime.hitl.AgentPendingState;
import com.easyagents.agent.runtime.hitl.AgentToolApprovalCoordinator;
import com.easyagents.agent.runtime.hitl.AgentToolApprovalRequest;
import com.easyagents.agent.runtime.tool.AgentToolSpec;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.ToolUseBlock;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 工具 HITL 主线路干预器。
*
* <p>本 interceptor 专门处理“工具执行前人工审批”。监听 AgentScope 原生
* {@link PostReasoningEvent}</p>
*
* <p>这里包含两类动作:
* <ul>
* <li>主线路干预:发现待审批工具后调用 {@link PostReasoningEvent#stopAgent()}
* 让 AgentScope 返回当前带 ToolUseBlock 的消息并暂停工具执行。</li>
* <li>旁路交互事件:通过 {@link AgentRuntimeEventBridge} 发出
* {@link AgentRuntimeEventType#TOOL_APPROVAL_REQUIRED},通知调用方展示审批交互。</li>
* </ul>
*
* <p>注意:本 interceptor 不执行工具、不写入 AgentScope memory/session也不实现恢复。
* 后续 resume 流程应基于 AgentScope pending tool 状态继续调用 agent stream/call。</p>
*/
public class ToolHitlInterceptor implements AgentRuntimeInterceptor {
private final AgentRuntimeEventBridge eventBridge;
private final AgentToolApprovalCoordinator approvalCoordinator;
private final Map<String, AgentToolSpec> toolSpecs;
/**
* 创建工具 HITL 干预器。
*
* @param eventBridge 旁路事件桥
* @param approvalCoordinator 工具审批协调器
* @param toolSpecs 工具声明列表
*/
public ToolHitlInterceptor(AgentRuntimeEventBridge eventBridge,
AgentToolApprovalCoordinator approvalCoordinator,
List<AgentToolSpec> toolSpecs) {
this.eventBridge = eventBridge;
this.approvalCoordinator = approvalCoordinator;
this.toolSpecs = (toolSpecs == null ? List.<AgentToolSpec>of() : toolSpecs).stream()
.filter(spec -> spec != null && spec.getName() != null && !spec.getName().isBlank())
.collect(Collectors.toMap(AgentToolSpec::getName, Function.identity(), (left, right) -> left,
LinkedHashMap::new));
}
/**
* 处理 AgentScope Hook 事件。
*
* @param event AgentScope Hook 事件
* @param <T> Hook 事件类型
* @return 处理后的 Hook 事件
*/
@Override
public <T extends HookEvent> Mono<T> intercept(T event) {
if (event instanceof PostReasoningEvent postReasoningEvent) {
interceptPostReasoning(postReasoningEvent);
}
return Mono.just(event);
}
/**
* 返回执行优先级。
*
* <p>工具审批需要在普通观察器发出 reasoning completed 后保持事件已被标记暂停,
* 但不应早于 AutoContext 的 PreReasoning 干预。当前值用于主线路 reasoning 后检查。</p>
*
* @return 优先级
*/
@Override
public int priority() {
return 50;
}
private void interceptPostReasoning(PostReasoningEvent event) {
Msg reasoningMessage = event.getReasoningMessage();
if (reasoningMessage == null) {
return;
}
List<ToolUseBlock> approvalRequiredTools = approvalRequiredTools(reasoningMessage);
if (approvalRequiredTools.isEmpty()) {
return;
}
List<Map<String, Object>> pendingApprovals = new ArrayList<>();
for (ToolUseBlock toolUse : approvalRequiredTools) {
AgentToolSpec toolSpec = toolSpecs.get(toolUse.getName());
AgentPendingState pendingState = registerPendingState(toolSpec, toolUse);
AgentRuntimeEvent approvalEvent = toolApprovalRequiredEvent(toolSpec, toolUse, pendingState);
pendingState.setEventId(approvalEvent.getEventId());
pendingApprovals.add(pendingApprovalPayload(pendingState, toolUse));
eventBridge.emit(approvalEvent);
}
AgentRuntimeExecutionContext context = eventBridge.executionContext();
if (context != null) {
context.getMetadata().put("hitlSuspended", true);
context.getMetadata().put("hitlSuspendReason", "TOOL_APPROVAL_REQUIRED");
context.getMetadata().put("hitlPendingApprovals", pendingApprovals);
}
event.stopAgent();
}
private List<ToolUseBlock> approvalRequiredTools(Msg reasoningMessage) {
List<ToolUseBlock> toolUses = reasoningMessage.getContentBlocks(ToolUseBlock.class);
if (toolUses == null || toolUses.isEmpty()) {
return List.of();
}
return toolUses.stream()
.filter(toolUse -> {
AgentToolSpec toolSpec = toolUse == null ? null : toolSpecs.get(toolUse.getName());
return toolSpec != null && toolSpec.isApprovalRequired();
})
.toList();
}
private AgentPendingState registerPendingState(AgentToolSpec toolSpec, ToolUseBlock toolUse) {
AgentRuntimeExecutionContext context = eventBridge.executionContext();
AgentToolApprovalRequest approvalRequest = toolSpec.getApprovalRequest();
Duration timeout = approvalRequest == null || approvalRequest.getTimeout() == null
? Duration.ofMinutes(30)
: approvalRequest.getTimeout();
Map<String, Object> metadata = approvalRequest == null
? new LinkedHashMap<>()
: new LinkedHashMap<>(approvalRequest.getMetadata());
metadata.put("phase", "POST_REASONING");
metadata.put("source", "TOOL_HITL_INTERCEPTOR");
metadata.putAll(toolUse.getMetadata() == null ? Map.of() : toolUse.getMetadata());
return approvalCoordinator.register(
context == null ? null : context.getSessionId(),
context == null || context.getAgentDefinition() == null ? null : context.getAgentDefinition().getAgentId(),
toolUse.getId(),
toolUse.getName(),
approvalPrompt(approvalRequest),
toolUse.getInput(),
metadata,
Instant.now().plus(timeout));
}
private AgentRuntimeEvent toolApprovalRequiredEvent(AgentToolSpec toolSpec,
ToolUseBlock toolUse,
AgentPendingState pendingState) {
AgentRuntimeExecutionContext context = eventBridge.executionContext();
AgentRuntimeEvent event = eventBridge.event(AgentRuntimeEventType.TOOL_APPROVAL_REQUIRED);
event.setToolCallId(toolUse.getId());
event.getPayload().putAll(pendingApprovalPayload(pendingState, toolUse));
event.getPayload().put("sessionId", context == null ? null : context.getSessionId());
event.getPayload().put("agentId", context == null || context.getAgentDefinition() == null
? null
: context.getAgentDefinition().getAgentId());
event.getPayload().put("approvalPrompt", approvalPrompt(toolSpec.getApprovalRequest()));
event.getPayload().put("approvalMetadata", pendingState.getMetadata());
event.getPayload().put("toolDescription", toolSpec.getDescription());
event.getMetadata().put("source", "TOOL_HITL_INTERCEPTOR");
event.getMetadata().put("phase", "POST_REASONING");
return event;
}
private Map<String, Object> pendingApprovalPayload(AgentPendingState pendingState, ToolUseBlock toolUse) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("resumeToken", pendingState.getResumeToken().getValue());
payload.put("toolCallId", pendingState.getToolCallId());
payload.put("toolName", pendingState.getToolName());
payload.put("toolInput", pendingState.getToolInput());
payload.put("input", toolUse.getInput());
payload.put("content", toolUse.getContent());
payload.put("expiresAt", pendingState.getExpiresAt() == null ? null : pendingState.getExpiresAt().toString());
return payload;
}
private String approvalPrompt(AgentToolApprovalRequest approvalRequest) {
if (approvalRequest != null
&& approvalRequest.getApprovalPrompt() != null
&& !approvalRequest.getApprovalPrompt().isBlank()) {
return approvalRequest.getApprovalPrompt();
}
return "是否批准执行该工具?";
}
}

View File

@@ -0,0 +1,62 @@
package com.easyagents.agent.runtime.event.observer;
import com.easyagents.agent.runtime.event.AgentRuntimeEvent;
import com.easyagents.agent.runtime.event.AgentRuntimeEventBridge;
import com.easyagents.agent.runtime.event.AgentRuntimeEventType;
import com.easyagents.agent.runtime.event.AgentRuntimeObserver;
import io.agentscope.core.agent.Agent;
import io.agentscope.core.hook.ErrorEvent;
import io.agentscope.core.hook.HookEvent;
import reactor.core.publisher.Mono;
/**
* 监听 AgentScope 原生错误事件,并发射运行失败旁路事件。
*
* <p>该观察器复用 {@link AgentRuntimeEventType#FAILED},但通过 payload 中的
* {@code source=HOOK} 标识它来自 AgentScope 生命周期观察,不等同于 Easy-Agents
* runtime 外层流已经完成失败收口。</p>
*/
public class AgentRuntimeErrorObserver implements AgentRuntimeObserver {
private final AgentRuntimeEventBridge eventBridge;
/**
* 创建运行错误观察器。
*
* @param eventBridge 旁路事件桥
*/
public AgentRuntimeErrorObserver(AgentRuntimeEventBridge eventBridge) {
this.eventBridge = eventBridge;
}
/**
* 观察 AgentScope 错误事件。
*
* @param event AgentScope Hook 事件
* @return 完成信号
*/
@Override
public Mono<Void> observe(HookEvent event) {
if (!(event instanceof ErrorEvent errorEvent)) {
return Mono.empty();
}
Throwable error = errorEvent.getError();
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.FAILED);
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "ERROR");
runtimeEvent.getPayload().put("stage", "AGENTSCOPE_HOOK");
runtimeEvent.getPayload().put("errorType", error == null ? null : error.getClass().getName());
runtimeEvent.getPayload().put("message", error == null ? "AgentScope hook error." : error.getMessage());
appendAgent(runtimeEvent, errorEvent.getAgent());
eventBridge.emit(runtimeEvent);
return Mono.empty();
}
private void appendAgent(AgentRuntimeEvent event, Agent agent) {
if (agent == null) {
return;
}
event.getPayload().put("agentName", agent.getName());
event.getPayload().put("agentScopeAgentId", agent.getAgentId());
}
}

View File

@@ -0,0 +1,77 @@
package com.easyagents.agent.runtime.event.observer;
import com.easyagents.agent.runtime.event.AgentRuntimeEvent;
import com.easyagents.agent.runtime.event.AgentRuntimeEventBridge;
import com.easyagents.agent.runtime.event.AgentRuntimeEventType;
import com.easyagents.agent.runtime.event.AgentRuntimeObserver;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.hook.PreReasoningEvent;
import io.agentscope.core.message.Msg;
import reactor.core.publisher.Mono;
/**
* 监听 AgentScope 推理生命周期,并发射思考状态旁路事件。
*
* <p>{@link AgentRuntimeEventType#REASONING_DELTA} 表示主线路中的推理内容片段;
* 本观察器发射的 started/completed 事件只用于前端状态展示,不携带模型上下文,
* 也不会修改 AgentScope 的推理输入或输出。</p>
*/
public class ReasoningLifecycleObserver implements AgentRuntimeObserver {
private final AgentRuntimeEventBridge eventBridge;
/**
* 创建推理生命周期观察器。
*
* @param eventBridge 旁路事件桥
*/
public ReasoningLifecycleObserver(AgentRuntimeEventBridge eventBridge) {
this.eventBridge = eventBridge;
}
/**
* 观察推理开始和完成事件。
*
* @param event AgentScope Hook 事件
* @return 完成信号
*/
@Override
public Mono<Void> observe(HookEvent event) {
if (event instanceof PreReasoningEvent preReasoningEvent) {
emitStarted(preReasoningEvent);
return Mono.empty();
}
if (event instanceof PostReasoningEvent postReasoningEvent) {
emitCompleted(postReasoningEvent);
}
return Mono.empty();
}
private void emitStarted(PreReasoningEvent event) {
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.REASONING_STARTED);
runtimeEvent.getPayload().put("modelName", event.getModelName());
runtimeEvent.getPayload().put("messageCount", event.getInputMessages() == null ? 0 : event.getInputMessages().size());
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "PRE_REASONING");
eventBridge.emit(runtimeEvent);
}
private void emitCompleted(PostReasoningEvent event) {
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.REASONING_COMPLETED);
runtimeEvent.getPayload().put("modelName", event.getModelName());
runtimeEvent.getPayload().put("stopRequested", event.isStopRequested());
runtimeEvent.getPayload().put("gotoReasoningRequested", event.isGotoReasoningRequested());
runtimeEvent.getPayload().put("text", reasoningText(event.getReasoningMessage()));
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "POST_REASONING");
eventBridge.emit(runtimeEvent);
}
private String reasoningText(Msg message) {
if (message == null || message.getTextContent() == null) {
return "";
}
return message.getTextContent();
}
}

View File

@@ -0,0 +1,300 @@
package com.easyagents.agent.runtime.event.observer;
import com.easyagents.agent.runtime.event.AgentRuntimeEvent;
import com.easyagents.agent.runtime.event.AgentRuntimeEventBridge;
import com.easyagents.agent.runtime.event.AgentRuntimeEventType;
import com.easyagents.agent.runtime.event.AgentRuntimeObserver;
import com.easyagents.agent.runtime.skill.AgentSkillBinding;
import com.easyagents.agent.runtime.skill.AgentSkillLoadCall;
import com.easyagents.agent.runtime.skill.AgentSkillRuntimeContext;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.hook.PreActingEvent;
import io.agentscope.core.message.ContentBlock;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.message.ToolResultBlock;
import io.agentscope.core.message.ToolUseBlock;
import io.agentscope.core.skill.SkillBox;
import io.agentscope.core.tool.Toolkit;
import reactor.core.publisher.Mono;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.StringJoiner;
/**
* 监听 AgentScope 工具执行生命周期,并发射 Skill 旁路事件。
*
* <p>该观察器只做 Easy-Agents 的旁路监察,不修改 AgentScope {@link HookEvent}。
* Skill 加载工具 {@link AgentSkillRuntimeContext#LOAD_SKILL_TOOL_NAME} 本身仍是
* AgentScope 主线路中的工具调用;本观察器只把它翻译成调用方可展示的
* {@link AgentRuntimeEventType#SKILL_CALL}、{@link AgentRuntimeEventType#SKILL_RESULT}
* 和 {@link AgentRuntimeEventType#SKILL_FAILED}。已激活 Skill 内部的普通工具调用会
* 被翻译成 {@link AgentRuntimeEventType#SKILL_STEP}。</p>
*
* <p>Skill 是否激活以 AgentScope {@link SkillBox} 和 {@link Toolkit#getActiveGroups()}
* 为准,本地 {@link AgentSkillRuntimeContext} 只缓存旁路展示所需的归属状态。</p>
*/
public class SkillExecutionObserver implements AgentRuntimeObserver {
private final AgentRuntimeEventBridge eventBridge;
private final AgentSkillRuntimeContext skillContext;
private final SkillBox skillBox;
/**
* 创建 Skill 执行观察器。
*
* @param eventBridge 旁路事件桥
* @param skillContext Skill 运行时上下文
*/
public SkillExecutionObserver(AgentRuntimeEventBridge eventBridge,
AgentSkillRuntimeContext skillContext) {
this(eventBridge, skillContext, null);
}
/**
* 创建 Skill 执行观察器。
*
* @param eventBridge 旁路事件桥
* @param skillContext Skill 运行时上下文
* @param skillBox AgentScope SkillBox作为 Skill 激活状态的权威来源
*/
public SkillExecutionObserver(AgentRuntimeEventBridge eventBridge,
AgentSkillRuntimeContext skillContext,
SkillBox skillBox) {
this.eventBridge = eventBridge;
this.skillContext = skillContext;
this.skillBox = skillBox;
}
/**
* 观察 Skill 加载和已激活 Skill 内部工具执行。
*
* @param event AgentScope Hook 事件
* @return 完成信号
*/
@Override
public Mono<Void> observe(HookEvent event) {
if (skillContext == null) {
return Mono.empty();
}
if (event instanceof PreActingEvent preActingEvent) {
observePreActing(preActingEvent);
return Mono.empty();
}
if (event instanceof PostActingEvent postActingEvent) {
observePostActing(postActingEvent);
}
return Mono.empty();
}
private void observePreActing(PreActingEvent event) {
ToolUseBlock toolUse = event.getToolUse();
if (toolUse == null) {
return;
}
syncSkillState(toolUse.getName(), event.getToolkit());
if (skillContext.isSkillLoadTool(toolUse.getName())) {
emitSkillCall(event, toolUse);
return;
}
AgentSkillBinding activeBinding = skillContext.getActiveToolBinding(toolUse.getName());
if (activeBinding != null) {
emitSkillStepCall(toolUse, activeBinding);
}
}
private void observePostActing(PostActingEvent event) {
ToolResultBlock result = event.getToolResult();
ToolUseBlock toolUse = event.getToolUse();
String toolName = result == null ? toolName(toolUse) : result.getName();
if (skillContext.isSkillLoadTool(toolName)) {
emitSkillResult(result, toolUse, event.getToolkit());
return;
}
syncSkillState(toolName, event.getToolkit());
AgentSkillBinding activeBinding = skillContext.getActiveToolBinding(toolName);
if (activeBinding != null) {
emitSkillStepResult(result, toolUse, activeBinding);
}
}
private void emitSkillCall(PreActingEvent event, ToolUseBlock toolUse) {
AgentSkillLoadCall call = skillContext.rememberLoadCall(toolUse.getId(), toolUse.getInput());
if (!skillContext.markLoadCallEmitted(toolUse.getId())) {
return;
}
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.SKILL_CALL);
runtimeEvent.setToolCallId(toolUse.getId());
runtimeEvent.getPayload().put("toolCallId", toolUse.getId());
runtimeEvent.getPayload().put("toolName", toolUse.getName());
runtimeEvent.getPayload().put("input", toolUse.getInput());
runtimeEvent.getPayload().put("status", "RUNNING");
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "PRE_ACTING");
appendSkillLoadPayload(runtimeEvent, call);
runtimeEvent.getMetadata().putAll(nullToEmpty(toolUse.getMetadata()));
eventBridge.emit(runtimeEvent);
}
private void emitSkillResult(ToolResultBlock result, ToolUseBlock toolUse, Toolkit toolkit) {
String toolCallId = result == null ? toolCallId(toolUse) : result.getId();
AgentSkillLoadCall call = skillContext.removeLoadCall(toolCallId);
boolean active = syncSkillState(call, toolkit);
AgentRuntimeEvent runtimeEvent = eventBridge.event(skillResultType(result, active));
runtimeEvent.setToolCallId(toolCallId);
runtimeEvent.getPayload().put("toolCallId", toolCallId);
runtimeEvent.getPayload().put("toolName", result == null ? toolName(toolUse) : result.getName());
runtimeEvent.getPayload().put("text", resultText(result));
runtimeEvent.getPayload().put("status", runtimeEvent.getEventType() == AgentRuntimeEventType.SKILL_RESULT
? "SUCCESS"
: "FAILED");
runtimeEvent.getPayload().put("success", runtimeEvent.getEventType() == AgentRuntimeEventType.SKILL_RESULT);
runtimeEvent.getPayload().put("suspended", result != null && result.isSuspended());
runtimeEvent.getPayload().put("active", active);
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "POST_ACTING");
if (result != null) {
runtimeEvent.getMetadata().putAll(nullToEmpty(result.getMetadata()));
}
appendSkillLoadPayload(runtimeEvent, call);
eventBridge.emit(runtimeEvent);
}
private void emitSkillStepCall(ToolUseBlock toolUse, AgentSkillBinding binding) {
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.SKILL_STEP);
runtimeEvent.setToolCallId(toolUse.getId());
runtimeEvent.getPayload().put("toolCallId", toolUse.getId());
runtimeEvent.getPayload().put("name", toolUse.getName());
runtimeEvent.getPayload().put("toolName", toolUse.getName());
runtimeEvent.getPayload().put("input", toolUse.getInput());
runtimeEvent.getPayload().put("content", toolUse.getContent());
runtimeEvent.getPayload().put("stepType", "TOOL_CALL");
runtimeEvent.getPayload().put("stepName", toolUse.getName());
runtimeEvent.getPayload().put("status", "RUNNING");
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "PRE_ACTING");
appendSkillPayload(runtimeEvent.getPayload(), binding);
appendSkillPayload(runtimeEvent.getMetadata(), binding);
runtimeEvent.getMetadata().putAll(nullToEmpty(toolUse.getMetadata()));
eventBridge.emit(runtimeEvent);
}
private void emitSkillStepResult(ToolResultBlock result,
ToolUseBlock toolUse,
AgentSkillBinding binding) {
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.SKILL_STEP);
String toolCallId = result == null ? toolCallId(toolUse) : result.getId();
String toolName = result == null ? toolName(toolUse) : result.getName();
runtimeEvent.setToolCallId(toolCallId);
runtimeEvent.getPayload().put("toolCallId", toolCallId);
runtimeEvent.getPayload().put("name", toolName);
runtimeEvent.getPayload().put("toolName", toolName);
runtimeEvent.getPayload().put("text", resultText(result));
runtimeEvent.getPayload().put("suspended", result != null && result.isSuspended());
runtimeEvent.getPayload().put("stepType", "TOOL_RESULT");
runtimeEvent.getPayload().put("stepName", toolName);
runtimeEvent.getPayload().put("status", success(result) ? "SUCCESS" : "FAILED");
runtimeEvent.getPayload().put("success", success(result));
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "POST_ACTING");
appendSkillPayload(runtimeEvent.getPayload(), binding);
appendSkillPayload(runtimeEvent.getMetadata(), binding);
if (result != null) {
runtimeEvent.getMetadata().putAll(nullToEmpty(result.getMetadata()));
}
eventBridge.emit(runtimeEvent);
}
private AgentRuntimeEventType skillResultType(ToolResultBlock result, boolean active) {
return success(result) && active ? AgentRuntimeEventType.SKILL_RESULT : AgentRuntimeEventType.SKILL_FAILED;
}
private boolean success(ToolResultBlock result) {
if (result == null) {
return false;
}
Object success = result.getMetadata() == null ? null : result.getMetadata().get("success");
return !(success instanceof Boolean) || Boolean.TRUE.equals(success);
}
private String resultText(ToolResultBlock result) {
if (result == null || result.getOutput() == null || result.getOutput().isEmpty()) {
return "";
}
StringJoiner joiner = new StringJoiner("\n");
for (ContentBlock output : result.getOutput()) {
if (output instanceof TextBlock textBlock && textBlock.getText() != null) {
joiner.add(textBlock.getText());
} else if (output != null) {
joiner.add(output.toString());
}
}
return joiner.toString();
}
private boolean syncSkillState(AgentSkillLoadCall call, Toolkit toolkit) {
if (call == null) {
return false;
}
boolean active = isAgentScopeSkillActive(call.getSkillId(), toolkit);
skillContext.syncSkillActive(call.getSkillId(), active);
return active;
}
private void syncSkillState(String toolName, Toolkit toolkit) {
AgentSkillBinding binding = skillContext.getToolBinding(toolName);
if (binding == null) {
return;
}
skillContext.syncSkillActive(binding.getSkillId(), isAgentScopeSkillActive(binding.getSkillId(), toolkit));
}
private boolean isAgentScopeSkillActive(String skillId, Toolkit toolkit) {
if (skillId == null || skillId.isBlank()) {
return false;
}
if (skillBox != null && skillBox.isSkillActive(skillId)) {
return true;
}
return toolkit != null && toolkit.getActiveGroups().contains(skillToolGroupName(skillId));
}
private String skillToolGroupName(String skillId) {
return skillId + "_skill_tools";
}
private void appendSkillLoadPayload(AgentRuntimeEvent event, AgentSkillLoadCall call) {
if (call == null) {
return;
}
event.getPayload().put("skillId", call.getSkillId());
event.getPayload().put("skillName", call.getSkillName());
event.getPayload().put("skillBoxId", call.getSkillBoxId());
event.getPayload().put("path", call.getPath());
event.getMetadata().put("skillId", call.getSkillId());
event.getMetadata().put("skillName", call.getSkillName());
event.getMetadata().put("skillBoxId", call.getSkillBoxId());
}
private void appendSkillPayload(Map<String, Object> target, AgentSkillBinding binding) {
if (binding == null || target == null) {
return;
}
target.put("skillId", binding.getSkillId());
target.put("skillName", binding.getSkillName());
target.put("skillBoxId", binding.getSkillBoxId());
}
private String toolCallId(ToolUseBlock toolUse) {
return toolUse == null ? null : toolUse.getId();
}
private String toolName(ToolUseBlock toolUse) {
return toolUse == null ? null : toolUse.getName();
}
private Map<String, Object> nullToEmpty(Map<String, Object> map) {
return map == null ? new LinkedHashMap<>() : map;
}
}

View File

@@ -0,0 +1,154 @@
package com.easyagents.agent.runtime.event.observer;
import com.easyagents.agent.runtime.event.AgentRuntimeEvent;
import com.easyagents.agent.runtime.event.AgentRuntimeEventBridge;
import com.easyagents.agent.runtime.event.AgentRuntimeEventType;
import com.easyagents.agent.runtime.event.AgentRuntimeObserver;
import com.easyagents.agent.runtime.skill.AgentSkillRuntimeContext;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.hook.PreActingEvent;
import io.agentscope.core.message.ContentBlock;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.message.ToolResultBlock;
import io.agentscope.core.message.ToolUseBlock;
import reactor.core.publisher.Mono;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 监听 AgentScope 原生工具执行生命周期,并发射工具状态旁路事件。
*
* <p>该观察器复用 {@link AgentRuntimeEventType#TOOL_CALL} 和
* {@link AgentRuntimeEventType#TOOL_RESULT},用于 EasyFlow 展示工具开始与完成状态。
* 它不修改 AgentScope HookEvent也不写入模型上下文。</p>
*/
public class ToolExecutionObserver implements AgentRuntimeObserver {
private final AgentRuntimeEventBridge eventBridge;
private final AgentSkillRuntimeContext skillContext;
/**
* 创建工具执行观察器。
*
* @param eventBridge 旁路事件桥
*/
public ToolExecutionObserver(AgentRuntimeEventBridge eventBridge) {
this(eventBridge, null);
}
/**
* 创建工具执行观察器。
*
* @param eventBridge 旁路事件桥
* @param skillContext Skill 上下文,用于跳过由 SkillExecutionObserver 处理的工具
*/
public ToolExecutionObserver(AgentRuntimeEventBridge eventBridge,
AgentSkillRuntimeContext skillContext) {
this.eventBridge = eventBridge;
this.skillContext = skillContext;
}
/**
* 观察工具执行前后事件。
*
* @param event AgentScope Hook 事件
* @return 完成信号
*/
@Override
public Mono<Void> observe(HookEvent event) {
if (event instanceof PreActingEvent preActingEvent) {
emitToolCall(preActingEvent);
return Mono.empty();
}
if (event instanceof PostActingEvent postActingEvent) {
emitToolResult(postActingEvent);
}
return Mono.empty();
}
private void emitToolCall(PreActingEvent event) {
ToolUseBlock toolUse = event.getToolUse();
if (toolUse == null) {
return;
}
if (isSkillTool(toolUse.getName())) {
return;
}
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.TOOL_CALL);
runtimeEvent.setToolCallId(toolUse.getId());
runtimeEvent.getPayload().put("toolCallId", toolUse.getId());
runtimeEvent.getPayload().put("name", toolUse.getName());
runtimeEvent.getPayload().put("toolName", toolUse.getName());
runtimeEvent.getPayload().put("input", toolUse.getInput());
runtimeEvent.getPayload().put("content", toolUse.getContent());
runtimeEvent.getPayload().put("status", "RUNNING");
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "PRE_ACTING");
runtimeEvent.getMetadata().putAll(nullToEmpty(toolUse.getMetadata()));
eventBridge.emit(runtimeEvent);
}
private void emitToolResult(PostActingEvent event) {
ToolResultBlock result = event.getToolResult();
ToolUseBlock toolUse = event.getToolUse();
if (result == null && toolUse == null) {
return;
}
String toolCallId = result == null ? toolUse.getId() : result.getId();
String toolName = result == null ? toolUse.getName() : result.getName();
if (isSkillTool(toolName)) {
return;
}
AgentRuntimeEvent runtimeEvent = eventBridge.event(AgentRuntimeEventType.TOOL_RESULT);
runtimeEvent.setToolCallId(toolCallId);
runtimeEvent.getPayload().put("toolCallId", toolCallId);
runtimeEvent.getPayload().put("name", toolName);
runtimeEvent.getPayload().put("toolName", toolName);
runtimeEvent.getPayload().put("text", resultText(result));
runtimeEvent.getPayload().put("suspended", result != null && result.isSuspended());
runtimeEvent.getPayload().put("status", success(result) ? "SUCCESS" : "FAILED");
runtimeEvent.getPayload().put("success", success(result));
runtimeEvent.getPayload().put("source", "HOOK");
runtimeEvent.getPayload().put("phase", "POST_ACTING");
if (result != null) {
runtimeEvent.getMetadata().putAll(nullToEmpty(result.getMetadata()));
}
eventBridge.emit(runtimeEvent);
}
private boolean success(ToolResultBlock result) {
if (result == null) {
return false;
}
Object success = result.getMetadata() == null ? null : result.getMetadata().get("success");
return !(success instanceof Boolean) || Boolean.TRUE.equals(success);
}
private String resultText(ToolResultBlock result) {
if (result == null || result.getOutput() == null || result.getOutput().isEmpty()) {
return "";
}
StringBuilder builder = new StringBuilder();
for (ContentBlock block : result.getOutput()) {
if (block instanceof TextBlock textBlock) {
builder.append(textBlock.getText());
} else {
builder.append(block);
}
}
return builder.toString();
}
private Map<String, Object> nullToEmpty(Map<String, Object> map) {
return map == null ? new LinkedHashMap<>() : map;
}
private boolean isSkillTool(String toolName) {
if (skillContext == null) {
return false;
}
return skillContext.isSkillLoadTool(toolName) || skillContext.getActiveToolBinding(toolName) != null;
}
}