From feb384acca67c67032350d624e319334e3e01c4b Mon Sep 17 00:00:00 2001 From: Dotta Date: Fri, 6 Mar 2026 14:16:34 -0600 Subject: [PATCH] feat(ui): coalesce streaming deltas and deduplicate live feed items Merge consecutive assistant/thinking deltas into a single feed entry instead of creating one per chunk. Add dedupeKey to FeedItem, increase streaming text cap to 4000 chars, and bump seen-keys limit to 6000. Applied consistently to both ActiveAgentsPanel and LiveRunWidget. Co-Authored-By: Claude Opus 4.6 --- ui/src/components/ActiveAgentsPanel.tsx | 71 ++++++++++++++++++++----- ui/src/components/LiveRunWidget.tsx | 68 +++++++++++++++++------ 2 files changed, 110 insertions(+), 29 deletions(-) diff --git a/ui/src/components/ActiveAgentsPanel.tsx b/ui/src/components/ActiveAgentsPanel.tsx index 190713b3..2910b68d 100644 --- a/ui/src/components/ActiveAgentsPanel.tsx +++ b/ui/src/components/ActiveAgentsPanel.tsx @@ -21,9 +21,13 @@ interface FeedItem { agentName: string; text: string; tone: FeedTone; + dedupeKey: string; + streamingKind?: "assistant" | "thinking"; } const MAX_FEED_ITEMS = 40; +const MAX_FEED_TEXT_LENGTH = 220; +const MAX_STREAMING_TEXT_LENGTH = 4000; const MIN_DASHBOARD_RUNS = 4; function readString(value: unknown): string | null { @@ -70,17 +74,25 @@ function createFeedItem( text: string, tone: FeedTone, nextId: number, + options?: { + streamingKind?: "assistant" | "thinking"; + preserveWhitespace?: boolean; + }, ): FeedItem | null { - const trimmed = text.trim(); - if (!trimmed) return null; + if (!text.trim()) return null; + const base = options?.preserveWhitespace ? text : text.trim(); + const maxLength = options?.streamingKind ? MAX_STREAMING_TEXT_LENGTH : MAX_FEED_TEXT_LENGTH; + const normalized = base.length > maxLength ? base.slice(-maxLength) : base; return { id: `${run.id}:${nextId}`, ts, runId: run.id, agentId: run.agentId, agentName: run.agentName, - text: trimmed.slice(0, 220), + text: normalized, tone, + dedupeKey: `feed:${run.id}:${ts}:${tone}:${normalized}`, + streamingKind: options?.streamingKind, }; } @@ -97,16 +109,16 @@ function parseStdoutChunk( pendingByRun.set(pendingKey, split.pop() ?? ""); const adapter = getUIAdapter(run.adapterType); - const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean; assistantDelta?: boolean }> = []; + const summarized: Array<{ text: string; tone: FeedTone; streamingKind?: "assistant" | "thinking" }> = []; const appendSummary = (entry: TranscriptEntry) => { if (entry.kind === "assistant" && entry.delta) { const text = entry.text; if (!text.trim()) return; const last = summarized[summarized.length - 1]; - if (last && last.assistantDelta) { + if (last && last.streamingKind === "assistant") { last.text += text; } else { - summarized.push({ text, tone: "assistant", assistantDelta: true }); + summarized.push({ text, tone: "assistant", streamingKind: "assistant" }); } return; } @@ -115,10 +127,10 @@ function parseStdoutChunk( const text = entry.text; if (!text.trim()) return; const last = summarized[summarized.length - 1]; - if (last && last.thinkingDelta) { + if (last && last.streamingKind === "thinking") { last.text += text; } else { - summarized.push({ text: `[thinking] ${text}`, tone: "info", thinkingDelta: true }); + summarized.push({ text: `[thinking] ${text}`, tone: "info", streamingKind: "thinking" }); } return; } @@ -144,7 +156,10 @@ function parseStdoutChunk( } for (const summary of summarized) { - const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++); + const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++, { + streamingKind: summary.streamingKind, + preserveWhitespace: !!summary.streamingKind, + }); if (item) items.push(item); } @@ -234,8 +249,38 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) { if (items.length === 0) return; setFeedByRun((prev) => { const next = new Map(prev); - const existing = next.get(runId) ?? []; - next.set(runId, [...existing, ...items].slice(-MAX_FEED_ITEMS)); + const existing = [...(next.get(runId) ?? [])]; + for (const item of items) { + if (seenKeysRef.current.has(item.dedupeKey)) continue; + seenKeysRef.current.add(item.dedupeKey); + + const last = existing[existing.length - 1]; + if ( + item.streamingKind && + last && + last.runId === item.runId && + last.streamingKind === item.streamingKind + ) { + const mergedText = `${last.text}${item.text}`; + const nextText = + mergedText.length > MAX_STREAMING_TEXT_LENGTH + ? mergedText.slice(-MAX_STREAMING_TEXT_LENGTH) + : mergedText; + existing[existing.length - 1] = { + ...last, + ts: item.ts, + text: nextText, + dedupeKey: last.dedupeKey, + }; + continue; + } + + existing.push(item); + } + if (seenKeysRef.current.size > 6000) { + seenKeysRef.current.clear(); + } + next.set(runId, existing.slice(-MAX_FEED_ITEMS)); return next; }); }; @@ -277,7 +322,7 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) { const dedupeKey = `${runId}:event:${seq ?? `${eventType}:${messageText}:${event.createdAt}`}`; if (seenKeysRef.current.has(dedupeKey)) return; seenKeysRef.current.add(dedupeKey); - if (seenKeysRef.current.size > 2000) seenKeysRef.current.clear(); + if (seenKeysRef.current.size > 6000) seenKeysRef.current.clear(); const tone = eventType === "error" ? "error" : eventType === "lifecycle" ? "warn" : "info"; const item = createFeedItem(run, event.createdAt, messageText, tone, nextIdRef.current++); if (item) appendItems(run.id, [item]); @@ -289,7 +334,7 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) { const dedupeKey = `${runId}:status:${status}:${readString(payload["finishedAt"]) ?? ""}`; if (seenKeysRef.current.has(dedupeKey)) return; seenKeysRef.current.add(dedupeKey); - if (seenKeysRef.current.size > 2000) seenKeysRef.current.clear(); + if (seenKeysRef.current.size > 6000) seenKeysRef.current.clear(); const tone = status === "failed" || status === "timed_out" ? "error" : "warn"; const item = createFeedItem(run, event.createdAt, `run ${status}`, tone, nextIdRef.current++); if (item) appendItems(run.id, [item]); diff --git a/ui/src/components/LiveRunWidget.tsx b/ui/src/components/LiveRunWidget.tsx index cb4a7dcf..9f15a1fc 100644 --- a/ui/src/components/LiveRunWidget.tsx +++ b/ui/src/components/LiveRunWidget.tsx @@ -26,9 +26,13 @@ interface FeedItem { agentName: string; text: string; tone: FeedTone; + dedupeKey: string; + streamingKind?: "assistant" | "thinking"; } const MAX_FEED_ITEMS = 80; +const MAX_FEED_TEXT_LENGTH = 220; +const MAX_STREAMING_TEXT_LENGTH = 4000; const LOG_POLL_INTERVAL_MS = 2000; const LOG_READ_LIMIT_BYTES = 256_000; @@ -81,17 +85,25 @@ function createFeedItem( text: string, tone: FeedTone, nextId: number, + options?: { + streamingKind?: "assistant" | "thinking"; + preserveWhitespace?: boolean; + }, ): FeedItem | null { - const trimmed = text.trim(); - if (!trimmed) return null; + if (!text.trim()) return null; + const base = options?.preserveWhitespace ? text : text.trim(); + const maxLength = options?.streamingKind ? MAX_STREAMING_TEXT_LENGTH : MAX_FEED_TEXT_LENGTH; + const normalized = base.length > maxLength ? base.slice(-maxLength) : base; return { id: `${run.id}:${nextId}`, ts, runId: run.id, agentId: run.agentId, agentName: run.agentName, - text: trimmed.slice(0, 220), + text: normalized, tone, + dedupeKey: `feed:${run.id}:${ts}:${tone}:${normalized}`, + streamingKind: options?.streamingKind, }; } @@ -108,16 +120,16 @@ function parseStdoutChunk( pendingByRun.set(pendingKey, split.pop() ?? ""); const adapter = getUIAdapter(run.adapterType); - const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean; assistantDelta?: boolean }> = []; + const summarized: Array<{ text: string; tone: FeedTone; streamingKind?: "assistant" | "thinking" }> = []; const appendSummary = (entry: TranscriptEntry) => { if (entry.kind === "assistant" && entry.delta) { const text = entry.text; if (!text.trim()) return; const last = summarized[summarized.length - 1]; - if (last && last.assistantDelta) { + if (last && last.streamingKind === "assistant") { last.text += text; } else { - summarized.push({ text, tone: "assistant", assistantDelta: true }); + summarized.push({ text, tone: "assistant", streamingKind: "assistant" }); } return; } @@ -126,10 +138,10 @@ function parseStdoutChunk( const text = entry.text; if (!text.trim()) return; const last = summarized[summarized.length - 1]; - if (last && last.thinkingDelta) { + if (last && last.streamingKind === "thinking") { last.text += text; } else { - summarized.push({ text: `[thinking] ${text}`, tone: "info", thinkingDelta: true }); + summarized.push({ text: `[thinking] ${text}`, tone: "info", streamingKind: "thinking" }); } return; } @@ -158,7 +170,10 @@ function parseStdoutChunk( } for (const summary of summarized) { - const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++); + const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++, { + streamingKind: summary.streamingKind, + preserveWhitespace: !!summary.streamingKind, + }); if (item) items.push(item); } @@ -291,18 +306,39 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) { const appendItems = (items: FeedItem[]) => { if (items.length === 0) return; setFeed((prev) => { - const deduped: FeedItem[] = []; + const next = [...prev]; for (const item of items) { - const key = `feed:${item.runId}:${item.ts}:${item.tone}:${item.text}`; - if (seenKeysRef.current.has(key)) continue; - seenKeysRef.current.add(key); - deduped.push(item); + if (seenKeysRef.current.has(item.dedupeKey)) continue; + seenKeysRef.current.add(item.dedupeKey); + + const last = next[next.length - 1]; + if ( + item.streamingKind && + last && + last.runId === item.runId && + last.streamingKind === item.streamingKind + ) { + const mergedText = `${last.text}${item.text}`; + const nextText = + mergedText.length > MAX_STREAMING_TEXT_LENGTH + ? mergedText.slice(-MAX_STREAMING_TEXT_LENGTH) + : mergedText; + next[next.length - 1] = { + ...last, + ts: item.ts, + text: nextText, + dedupeKey: last.dedupeKey, + }; + continue; + } + + next.push(item); } - if (deduped.length === 0) return prev; if (seenKeysRef.current.size > 6000) { seenKeysRef.current.clear(); } - return [...prev, ...deduped].slice(-MAX_FEED_ITEMS); + if (next.length === prev.length) return prev; + return next.slice(-MAX_FEED_ITEMS); }); };