feat(ui): coalesce streaming deltas and deduplicate live feed items

Merge consecutive assistant/thinking deltas into a single feed entry
instead of creating one per chunk. Add dedupeKey to FeedItem, increase
streaming text cap to 4000 chars, and bump seen-keys limit to 6000.
Applied consistently to both ActiveAgentsPanel and LiveRunWidget.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dotta
2026-03-06 14:16:34 -06:00
parent c9718dc27a
commit feb384acca
2 changed files with 110 additions and 29 deletions

View File

@@ -21,9 +21,13 @@ interface FeedItem {
agentName: string;
text: string;
tone: FeedTone;
dedupeKey: string;
streamingKind?: "assistant" | "thinking";
}
const MAX_FEED_ITEMS = 40;
const MAX_FEED_TEXT_LENGTH = 220;
const MAX_STREAMING_TEXT_LENGTH = 4000;
const MIN_DASHBOARD_RUNS = 4;
function readString(value: unknown): string | null {
@@ -70,17 +74,25 @@ function createFeedItem(
text: string,
tone: FeedTone,
nextId: number,
options?: {
streamingKind?: "assistant" | "thinking";
preserveWhitespace?: boolean;
},
): FeedItem | null {
const trimmed = text.trim();
if (!trimmed) return null;
if (!text.trim()) return null;
const base = options?.preserveWhitespace ? text : text.trim();
const maxLength = options?.streamingKind ? MAX_STREAMING_TEXT_LENGTH : MAX_FEED_TEXT_LENGTH;
const normalized = base.length > maxLength ? base.slice(-maxLength) : base;
return {
id: `${run.id}:${nextId}`,
ts,
runId: run.id,
agentId: run.agentId,
agentName: run.agentName,
text: trimmed.slice(0, 220),
text: normalized,
tone,
dedupeKey: `feed:${run.id}:${ts}:${tone}:${normalized}`,
streamingKind: options?.streamingKind,
};
}
@@ -97,16 +109,16 @@ function parseStdoutChunk(
pendingByRun.set(pendingKey, split.pop() ?? "");
const adapter = getUIAdapter(run.adapterType);
const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean; assistantDelta?: boolean }> = [];
const summarized: Array<{ text: string; tone: FeedTone; streamingKind?: "assistant" | "thinking" }> = [];
const appendSummary = (entry: TranscriptEntry) => {
if (entry.kind === "assistant" && entry.delta) {
const text = entry.text;
if (!text.trim()) return;
const last = summarized[summarized.length - 1];
if (last && last.assistantDelta) {
if (last && last.streamingKind === "assistant") {
last.text += text;
} else {
summarized.push({ text, tone: "assistant", assistantDelta: true });
summarized.push({ text, tone: "assistant", streamingKind: "assistant" });
}
return;
}
@@ -115,10 +127,10 @@ function parseStdoutChunk(
const text = entry.text;
if (!text.trim()) return;
const last = summarized[summarized.length - 1];
if (last && last.thinkingDelta) {
if (last && last.streamingKind === "thinking") {
last.text += text;
} else {
summarized.push({ text: `[thinking] ${text}`, tone: "info", thinkingDelta: true });
summarized.push({ text: `[thinking] ${text}`, tone: "info", streamingKind: "thinking" });
}
return;
}
@@ -144,7 +156,10 @@ function parseStdoutChunk(
}
for (const summary of summarized) {
const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++);
const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++, {
streamingKind: summary.streamingKind,
preserveWhitespace: !!summary.streamingKind,
});
if (item) items.push(item);
}
@@ -234,8 +249,38 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) {
if (items.length === 0) return;
setFeedByRun((prev) => {
const next = new Map(prev);
const existing = next.get(runId) ?? [];
next.set(runId, [...existing, ...items].slice(-MAX_FEED_ITEMS));
const existing = [...(next.get(runId) ?? [])];
for (const item of items) {
if (seenKeysRef.current.has(item.dedupeKey)) continue;
seenKeysRef.current.add(item.dedupeKey);
const last = existing[existing.length - 1];
if (
item.streamingKind &&
last &&
last.runId === item.runId &&
last.streamingKind === item.streamingKind
) {
const mergedText = `${last.text}${item.text}`;
const nextText =
mergedText.length > MAX_STREAMING_TEXT_LENGTH
? mergedText.slice(-MAX_STREAMING_TEXT_LENGTH)
: mergedText;
existing[existing.length - 1] = {
...last,
ts: item.ts,
text: nextText,
dedupeKey: last.dedupeKey,
};
continue;
}
existing.push(item);
}
if (seenKeysRef.current.size > 6000) {
seenKeysRef.current.clear();
}
next.set(runId, existing.slice(-MAX_FEED_ITEMS));
return next;
});
};
@@ -277,7 +322,7 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) {
const dedupeKey = `${runId}:event:${seq ?? `${eventType}:${messageText}:${event.createdAt}`}`;
if (seenKeysRef.current.has(dedupeKey)) return;
seenKeysRef.current.add(dedupeKey);
if (seenKeysRef.current.size > 2000) seenKeysRef.current.clear();
if (seenKeysRef.current.size > 6000) seenKeysRef.current.clear();
const tone = eventType === "error" ? "error" : eventType === "lifecycle" ? "warn" : "info";
const item = createFeedItem(run, event.createdAt, messageText, tone, nextIdRef.current++);
if (item) appendItems(run.id, [item]);
@@ -289,7 +334,7 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) {
const dedupeKey = `${runId}:status:${status}:${readString(payload["finishedAt"]) ?? ""}`;
if (seenKeysRef.current.has(dedupeKey)) return;
seenKeysRef.current.add(dedupeKey);
if (seenKeysRef.current.size > 2000) seenKeysRef.current.clear();
if (seenKeysRef.current.size > 6000) seenKeysRef.current.clear();
const tone = status === "failed" || status === "timed_out" ? "error" : "warn";
const item = createFeedItem(run, event.createdAt, `run ${status}`, tone, nextIdRef.current++);
if (item) appendItems(run.id, [item]);

View File

@@ -26,9 +26,13 @@ interface FeedItem {
agentName: string;
text: string;
tone: FeedTone;
dedupeKey: string;
streamingKind?: "assistant" | "thinking";
}
const MAX_FEED_ITEMS = 80;
const MAX_FEED_TEXT_LENGTH = 220;
const MAX_STREAMING_TEXT_LENGTH = 4000;
const LOG_POLL_INTERVAL_MS = 2000;
const LOG_READ_LIMIT_BYTES = 256_000;
@@ -81,17 +85,25 @@ function createFeedItem(
text: string,
tone: FeedTone,
nextId: number,
options?: {
streamingKind?: "assistant" | "thinking";
preserveWhitespace?: boolean;
},
): FeedItem | null {
const trimmed = text.trim();
if (!trimmed) return null;
if (!text.trim()) return null;
const base = options?.preserveWhitespace ? text : text.trim();
const maxLength = options?.streamingKind ? MAX_STREAMING_TEXT_LENGTH : MAX_FEED_TEXT_LENGTH;
const normalized = base.length > maxLength ? base.slice(-maxLength) : base;
return {
id: `${run.id}:${nextId}`,
ts,
runId: run.id,
agentId: run.agentId,
agentName: run.agentName,
text: trimmed.slice(0, 220),
text: normalized,
tone,
dedupeKey: `feed:${run.id}:${ts}:${tone}:${normalized}`,
streamingKind: options?.streamingKind,
};
}
@@ -108,16 +120,16 @@ function parseStdoutChunk(
pendingByRun.set(pendingKey, split.pop() ?? "");
const adapter = getUIAdapter(run.adapterType);
const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean; assistantDelta?: boolean }> = [];
const summarized: Array<{ text: string; tone: FeedTone; streamingKind?: "assistant" | "thinking" }> = [];
const appendSummary = (entry: TranscriptEntry) => {
if (entry.kind === "assistant" && entry.delta) {
const text = entry.text;
if (!text.trim()) return;
const last = summarized[summarized.length - 1];
if (last && last.assistantDelta) {
if (last && last.streamingKind === "assistant") {
last.text += text;
} else {
summarized.push({ text, tone: "assistant", assistantDelta: true });
summarized.push({ text, tone: "assistant", streamingKind: "assistant" });
}
return;
}
@@ -126,10 +138,10 @@ function parseStdoutChunk(
const text = entry.text;
if (!text.trim()) return;
const last = summarized[summarized.length - 1];
if (last && last.thinkingDelta) {
if (last && last.streamingKind === "thinking") {
last.text += text;
} else {
summarized.push({ text: `[thinking] ${text}`, tone: "info", thinkingDelta: true });
summarized.push({ text: `[thinking] ${text}`, tone: "info", streamingKind: "thinking" });
}
return;
}
@@ -158,7 +170,10 @@ function parseStdoutChunk(
}
for (const summary of summarized) {
const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++);
const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++, {
streamingKind: summary.streamingKind,
preserveWhitespace: !!summary.streamingKind,
});
if (item) items.push(item);
}
@@ -291,18 +306,39 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) {
const appendItems = (items: FeedItem[]) => {
if (items.length === 0) return;
setFeed((prev) => {
const deduped: FeedItem[] = [];
const next = [...prev];
for (const item of items) {
const key = `feed:${item.runId}:${item.ts}:${item.tone}:${item.text}`;
if (seenKeysRef.current.has(key)) continue;
seenKeysRef.current.add(key);
deduped.push(item);
if (seenKeysRef.current.has(item.dedupeKey)) continue;
seenKeysRef.current.add(item.dedupeKey);
const last = next[next.length - 1];
if (
item.streamingKind &&
last &&
last.runId === item.runId &&
last.streamingKind === item.streamingKind
) {
const mergedText = `${last.text}${item.text}`;
const nextText =
mergedText.length > MAX_STREAMING_TEXT_LENGTH
? mergedText.slice(-MAX_STREAMING_TEXT_LENGTH)
: mergedText;
next[next.length - 1] = {
...last,
ts: item.ts,
text: nextText,
dedupeKey: last.dedupeKey,
};
continue;
}
next.push(item);
}
if (deduped.length === 0) return prev;
if (seenKeysRef.current.size > 6000) {
seenKeysRef.current.clear();
}
return [...prev, ...deduped].slice(-MAX_FEED_ITEMS);
if (next.length === prev.length) return prev;
return next.slice(-MAX_FEED_ITEMS);
});
};