import type {ServerSentEventMessage} from 'fetch-event-stream'; import type { ChatTimelineItem, ChatTimelineKnowledgeHit, ChatTimelineMessageItem, ChatTimelineToolApprovalPayload, } from '@easyflow/common-ui'; import {ChatTimelineBuilder} from '@easyflow/common-ui'; import type {AgentChatMessageRecord} from '../api'; export interface AgentSseEnvelope { domain: string; payload: Record; type: string; } function asText(value: unknown) { return value === null || value === undefined ? '' : String(value); } function asRecord(value: unknown): Record { return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record) : {}; } function asArray(value: unknown): any[] { return Array.isArray(value) ? value : []; } function asTimestamp(value: unknown) { if (!value) { return Date.now(); } const timestamp = new Date(String(value)).getTime(); return Number.isFinite(timestamp) ? timestamp : Date.now(); } function normalizeRole(value: unknown): 'assistant' | 'system' | 'user' { const role = asText(value).toLowerCase(); if (role === 'assistant' || role === 'system' || role === 'user') { return role; } return 'assistant'; } function normalizeToolName(value: unknown) { return asText(value).trim(); } function normalizeToolCallId(payload: Record) { return asText(payload.toolCallId ?? payload.tool_call_id ?? payload.id); } function isBlankToolName(value: unknown) { return !normalizeToolName(value); } function shouldSkipToolProjection(value: unknown) { const normalizedName = normalizeToolName(value).toLowerCase(); return ( normalizedName === 'context_reload' || normalizedName === '__fragment__' ); } function normalizeToolCallName(payload: Record) { const fn = asRecord(payload.function); return normalizeToolName(payload.name ?? payload.toolName ?? fn.name); } function normalizeToolCallInput(payload: Record) { const fn = asRecord(payload.function); return payload.arguments ?? payload.input ?? fn.arguments; } function statusKeyForProjection( payload: Record, metadata?: Partial, fallback = 'status', ) { const statusKey = asText(payload.statusKey) || fallback; const roundId = asText(metadata?.roundId); return roundId ? `${statusKey}:${roundId}` : statusKey; } function normalizeMetadata(record: AgentChatMessageRecord) { return { createdAt: asTimestamp(record.created), id: `history-${record.id || record.roundId || Date.now()}`, roundId: asText(record.roundId), roundNo: record.roundNo, selectedVariantIndex: record.selectedVariantIndex, switchable: false, variantCount: record.variantCount, variantIndex: record.variantIndex, } satisfies Partial; } function assistantMetadata( record: AgentChatMessageRecord, suffix?: string, ): Partial { const metadata = normalizeMetadata(record); return suffix ? { ...metadata, id: `${metadata.id}-${suffix}` } : metadata; } function normalizeKnowledgeItems(payload: Record) { const rawItems = asArray(payload.items).length > 0 ? asArray(payload.items) : asArray(payload.knowledgeReferences).length > 0 ? asArray(payload.knowledgeReferences) : asArray(payload.knowledgeCitations); return rawItems .map((item, index): ChatTimelineKnowledgeHit => { const source = asRecord(item); const metadata = asRecord(source.metadata); const documentName = asText( source.documentName ?? source.title ?? metadata.documentName, ); const sourceFileName = asText( source.sourceFileName ?? metadata.sourceFileName, ); const chunkContent = asText( source.chunkContent ?? source.content ?? source.text ?? source.summary, ); return { ...source, id: asText(source.id ?? source.chunkId ?? index), chunkContent, content: asText(source.content ?? source.text ?? source.summary), documentId: asText(source.documentId ?? metadata.documentId), documentName, knowledgeId: asText(source.knowledgeId ?? payload.knowledgeId), knowledgeName: asText(source.knowledgeName ?? payload.knowledgeName), metadata, score: source.score ?? source.similarity, sourceFileName, sourceUri: asText(source.sourceUri ?? metadata.sourceUri), title: documentName || sourceFileName || asText(source.source), }; }) .filter((item) => item.chunkContent || item.title || item.documentName); } function buildApprovalPayload(payload: Record) { return { expiresAt: asText(payload.expiresAt), input: payload.input, metadata: payload.metadata, requestId: asText(payload.requestId), resumeToken: asText(payload.resumeToken), toolCallId: normalizeToolCallId(payload), toolDisplayName: asText(payload.toolDisplayName), toolName: normalizeToolName(payload.toolName ?? payload.name) || '工具调用', toolType: asText(payload.toolType), } satisfies ChatTimelineToolApprovalPayload; } function appendAssistantText( items: ChatTimelineItem[], record: AgentChatMessageRecord, content: unknown, suffix?: string, metadata?: Partial, ) { const text = asText(content); if (!text) { return; } ChatTimelineBuilder.appendMessageDelta(items, text, { ...assistantMetadata(record, suffix), ...metadata, }); } function appendAssistantThinking( items: ChatTimelineItem[], record: AgentChatMessageRecord, content: unknown, suffix?: string, metadata?: Partial, ) { const text = asText(content); if (!text) { return; } ChatTimelineBuilder.appendThinkingDelta(items, text, { ...assistantMetadata(record, suffix), ...metadata, }); } function projectHistoryChain( items: ChatTimelineItem[], record: AgentChatMessageRecord, ) { const payload = asRecord(record.contentPayload); let hasAssistantText = false; let hasAssistantThinking = false; const toolNameByCallId = new Map(); const displayChains = asArray(payload.displayChains ?? payload.chains); for (const chain of displayChains) { const item = asRecord(chain); const reasoning = item.reasoningContent ?? item.reasoning_content; if (reasoning) { appendAssistantThinking(items, record, reasoning, 'thinking'); hasAssistantThinking = true; continue; } const toolName = normalizeToolName(item.name ?? item.toolName); const toolCallId = normalizeToolCallId(item); if (toolCallId && toolName) { toolNameByCallId.set(toolCallId, toolName); } if (toolName && !shouldSkipToolProjection(toolName)) { ChatTimelineBuilder.upsertToolCall(items, { input: item.arguments ?? item.input, output: item.result ?? item.output, status: asText(item.status) === 'TOOL_RESULT' ? 'success' : 'running', statusKey: statusKeyForProjection( item, normalizeMetadata(record), 'knowledge-retrieval', ), toolCallId, toolName, }); } } const messageChain = asArray(payload.messageChain); for (const chain of messageChain) { const item = asRecord(chain); const role = asText(item.role).toLowerCase(); if (role === 'assistant') { appendAssistantThinking(items, record, item.reasoningContent, 'thinking'); if (item.reasoningContent) { hasAssistantThinking = true; } if (!payload.agentResult && item.content) { appendAssistantText(items, record, item.content, 'text'); hasAssistantText = true; } for (const toolCall of asArray(item.toolCalls)) { const tool = asRecord(toolCall); const toolCallId = normalizeToolCallId(tool); const toolName = normalizeToolCallName(tool); if (toolCallId && toolName) { toolNameByCallId.set(toolCallId, toolName); } if (isBlankToolName(toolName) || shouldSkipToolProjection(toolName)) { continue; } ChatTimelineBuilder.upsertToolCall(items, { input: normalizeToolCallInput(tool), status: 'running', statusKey: statusKeyForProjection( tool, normalizeMetadata(record), 'knowledge-retrieval', ), toolCallId, toolName, }); } continue; } if (role === 'tool') { const toolCallId = normalizeToolCallId(item); const toolName = normalizeToolCallName(item) || toolNameByCallId.get(toolCallId) || ''; if (isBlankToolName(toolName) || shouldSkipToolProjection(toolName)) { continue; } ChatTimelineBuilder.upsertToolCall(items, { output: item.content ?? item.result, status: 'success', statusKey: statusKeyForProjection( item, normalizeMetadata(record), 'knowledge-retrieval', ), toolCallId, toolName, }); } } return { hasAssistantText, hasAssistantThinking, }; } function appendHistoryRecord( items: ChatTimelineItem[], record: AgentChatMessageRecord, ) { const role = normalizeRole(record.senderRole); const metadata = normalizeMetadata(record); if (role === 'user') { ChatTimelineBuilder.appendUserMessage(items, record.contentText, metadata); return; } if (role === 'system') { ChatTimelineBuilder.appendError(items, record.contentText || '系统消息'); return; } const payload = asRecord(record.contentPayload); const agentResult = asRecord(payload.agentResult); const chainProjection = projectHistoryChain(items, record); if (!chainProjection.hasAssistantThinking) { appendAssistantThinking( items, record, payload.reasoningContent ?? agentResult.reasoning, 'thinking', ); } if (!chainProjection.hasAssistantText) { appendAssistantText( items, record, agentResult.text ?? payload.content ?? record.contentText, chainProjection.hasAssistantThinking ? 'text' : undefined, ); } const knowledgeItems = normalizeKnowledgeItems({ ...payload, items: payload.knowledgeCitations ?? agentResult.knowledgeReferences ?? payload.knowledgeReferences, }); if (knowledgeItems.length > 0) { ChatTimelineBuilder.appendKnowledge(items, knowledgeItems); } ChatTimelineBuilder.finalize(items); } export function recordsToTimelineItems(records: AgentChatMessageRecord[] = []) { const items: ChatTimelineItem[] = []; for (const record of records) { appendHistoryRecord(items, record); } ChatTimelineBuilder.finalize(items); return items; } export function parseAgentSseMessage(message: ServerSentEventMessage) { const raw = message.data || ''; if (!raw) { return undefined; } try { const data = JSON.parse(raw); return { domain: asText( data.domain ?? data.eventDomain ?? data.typeDomain, ).toUpperCase(), payload: asRecord(data.payload ?? data.data ?? data), type: asText( data.type ?? data.eventType ?? data.chatType ?? data.event, ).toUpperCase(), } satisfies AgentSseEnvelope; } catch { return { domain: 'LLM', payload: { delta: raw }, type: 'MESSAGE', } satisfies AgentSseEnvelope; } } export function applyAgentSseEnvelope( items: ChatTimelineItem[], envelope: AgentSseEnvelope, metadata?: Partial, ) { const { domain, payload, type } = envelope; if (domain === 'LLM' && type === 'MESSAGE') { ChatTimelineBuilder.appendMessageDelta( items, payload.delta ?? payload.text, metadata, ); return; } if (domain === 'LLM' && type === 'THINKING') { ChatTimelineBuilder.appendThinkingDelta( items, payload.reasoning ?? payload.delta ?? payload.text, metadata, ); return; } if (domain === 'TOOL' && type === 'FORM_REQUEST') { ChatTimelineBuilder.appendToolApproval( items, buildApprovalPayload(payload), ); return; } if (domain === 'TOOL' && type === 'FORM_APPROVING') { ChatTimelineBuilder.markToolApproving(items, { requestId: asText(payload.requestId), resumeToken: asText(payload.resumeToken), toolCallId: normalizeToolCallId(payload), }); return; } if (domain === 'TOOL' && type === 'FORM_REJECTED') { ChatTimelineBuilder.markToolRejected(items, { reason: asText(payload.reason), requestId: asText(payload.requestId), resumeToken: asText(payload.resumeToken), toolCallId: normalizeToolCallId(payload), }); return; } if (domain === 'TOOL' && (type === 'TOOL_CALL' || type === 'TOOL_RESULT')) { ChatTimelineBuilder.upsertToolCall(items, { input: payload.input ?? payload.toolInput, output: payload.output ?? payload.result ?? payload.text, status: type === 'TOOL_RESULT' ? 'success' : 'running', statusKey: statusKeyForProjection( payload, metadata, 'knowledge-retrieval', ), toolCallId: normalizeToolCallId(payload), toolName: normalizeToolName( payload.toolDisplayName ?? payload.toolName ?? payload.name, ), }); return; } if (domain === 'BUSINESS' && type === 'CITATIONS') { ChatTimelineBuilder.appendKnowledge( items, normalizeKnowledgeItems(payload), ); return; } if (domain === 'BUSINESS' && type === 'STATUS') { if (asText(payload.statusKey) === 'memory-compression') { ChatTimelineBuilder.upsertMemoryCompressionStatus(items, { compressed: typeof payload.compressed === 'boolean' ? payload.compressed : undefined, label: asText(payload.label), phase: asText(payload.phase), status: asText(payload.status), statusKey: statusKeyForProjection( payload, metadata, 'memory-compression', ), }); return; } if (asText(payload.statusKey) === 'knowledge-retrieval') { ChatTimelineBuilder.upsertKnowledgeRetrievalStatus( items, asText(payload.status) === 'running' ? 'running' : 'done', statusKeyForProjection(payload, metadata, 'knowledge-retrieval'), ); } return; } if (domain === 'SYSTEM' && type === 'DONE') { ChatTimelineBuilder.finalize(items); return; } if (domain === 'ERROR' || type === 'ERROR') { ChatTimelineBuilder.appendError( items, payload.message ?? payload.error ?? '请求失败', ); } }