From ef28f048db1d353329a34bcb8ee103bc5ad3a7e4 Mon Sep 17 00:00:00 2001 From: Dotta Date: Tue, 3 Mar 2026 11:37:56 -0600 Subject: [PATCH] Backfill issue live run widget logs from persisted run log --- ui/src/components/LiveRunWidget.tsx | 132 ++++++++++++++++++++++++++-- 1 file changed, 127 insertions(+), 5 deletions(-) diff --git a/ui/src/components/LiveRunWidget.tsx b/ui/src/components/LiveRunWidget.tsx index 27e1b9f2..7af7164b 100644 --- a/ui/src/components/LiveRunWidget.tsx +++ b/ui/src/components/LiveRunWidget.tsx @@ -29,6 +29,8 @@ interface FeedItem { } const MAX_FEED_ITEMS = 80; +const LOG_POLL_INTERVAL_MS = 2000; +const LOG_READ_LIMIT_BYTES = 256_000; function readString(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value : null; @@ -147,12 +149,45 @@ function parseStderrChunk( return items; } +function parsePersistedLogContent( + runId: string, + content: string, + pendingByRun: Map, +): Array<{ ts: string; stream: "stdout" | "stderr" | "system"; chunk: string }> { + if (!content) return []; + + const pendingKey = `${runId}:records`; + const combined = `${pendingByRun.get(pendingKey) ?? ""}${content}`; + const split = combined.split("\n"); + pendingByRun.set(pendingKey, split.pop() ?? ""); + + const parsed: Array<{ ts: string; stream: "stdout" | "stderr" | "system"; chunk: string }> = []; + for (const line of split) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const raw = JSON.parse(trimmed) as { ts?: unknown; stream?: unknown; chunk?: unknown }; + const stream = raw.stream === "stderr" || raw.stream === "system" ? raw.stream : "stdout"; + const chunk = typeof raw.chunk === "string" ? raw.chunk : ""; + const ts = typeof raw.ts === "string" ? raw.ts : new Date().toISOString(); + if (!chunk) continue; + parsed.push({ ts, stream, chunk }); + } catch { + // Ignore malformed log rows. + } + } + + return parsed; +} + export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) { const queryClient = useQueryClient(); const [feed, setFeed] = useState([]); const [cancellingRunIds, setCancellingRunIds] = useState(new Set()); const seenKeysRef = useRef(new Set()); const pendingByRunRef = useRef(new Map()); + const pendingLogRowsByRunRef = useRef(new Map()); + const logOffsetByRunRef = useRef(new Map()); const runMetaByIdRef = useRef(new Map()); const nextIdRef = useRef(1); const bodyRef = useRef(null); @@ -162,6 +197,7 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) { try { await heartbeatsApi.cancel(runId); queryClient.invalidateQueries({ queryKey: queryKeys.issues.liveRuns(issueId) }); + queryClient.invalidateQueries({ queryKey: queryKeys.issues.activeRun(issueId) }); } finally { setCancellingRunIds((prev) => { const next = new Set(prev); @@ -212,6 +248,27 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) { const runById = useMemo(() => new Map(runs.map((run) => [run.id, run])), [runs]); const activeRunIds = useMemo(() => new Set(runs.map((run) => run.id)), [runs]); + const runIdsKey = useMemo( + () => runs.map((run) => run.id).sort((a, b) => a.localeCompare(b)).join(","), + [runs], + ); + const appendItems = (items: FeedItem[]) => { + if (items.length === 0) return; + setFeed((prev) => { + const deduped: FeedItem[] = []; + for (const item of items) { + const key = `feed:${item.runId}:${item.ts}:${item.tone}:${item.text}`; + if (seenKeysRef.current.has(key)) continue; + seenKeysRef.current.add(key); + deduped.push(item); + } + if (deduped.length === 0) return prev; + if (seenKeysRef.current.size > 6000) { + seenKeysRef.current.clear(); + } + return [...prev, ...deduped].slice(-MAX_FEED_ITEMS); + }); + }; useEffect(() => { const body = bodyRef.current; @@ -236,8 +293,78 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) { pendingByRunRef.current.delete(key); } } + const liveRunIds = new Set(activeRunIds); + for (const key of pendingLogRowsByRunRef.current.keys()) { + const runId = key.replace(/:records$/, ""); + if (!liveRunIds.has(runId)) { + pendingLogRowsByRunRef.current.delete(key); + } + } + for (const runId of logOffsetByRunRef.current.keys()) { + if (!liveRunIds.has(runId)) { + logOffsetByRunRef.current.delete(runId); + } + } }, [activeRunIds]); + useEffect(() => { + if (runs.length === 0) return; + + let cancelled = false; + + const readRunLog = async (run: LiveRunForIssue) => { + const offset = logOffsetByRunRef.current.get(run.id) ?? 0; + try { + const result = await heartbeatsApi.log(run.id, offset, LOG_READ_LIMIT_BYTES); + if (cancelled) return; + + const rows = parsePersistedLogContent(run.id, result.content, pendingLogRowsByRunRef.current); + const items: FeedItem[] = []; + for (const row of rows) { + if (row.stream === "stderr") { + items.push( + ...parseStderrChunk(run, row.chunk, row.ts, pendingByRunRef.current, nextIdRef), + ); + continue; + } + if (row.stream === "system") { + const item = createFeedItem(run, row.ts, row.chunk, "warn", nextIdRef.current++); + if (item) items.push(item); + continue; + } + items.push( + ...parseStdoutChunk(run, row.chunk, row.ts, pendingByRunRef.current, nextIdRef), + ); + } + appendItems(items); + + if (result.nextOffset !== undefined) { + logOffsetByRunRef.current.set(run.id, result.nextOffset); + return; + } + if (result.content.length > 0) { + logOffsetByRunRef.current.set(run.id, offset + result.content.length); + } + } catch { + // Ignore log read errors while run output is initializing. + } + }; + + const readAll = async () => { + await Promise.all(runs.map((run) => readRunLog(run))); + }; + + void readAll(); + const interval = window.setInterval(() => { + void readAll(); + }, LOG_POLL_INTERVAL_MS); + + return () => { + cancelled = true; + window.clearInterval(interval); + }; + }, [runIdsKey, runs]); + useEffect(() => { if (!companyId || activeRunIds.size === 0) return; @@ -245,11 +372,6 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) { let reconnectTimer: number | null = null; let socket: WebSocket | null = null; - const appendItems = (items: FeedItem[]) => { - if (items.length === 0) return; - setFeed((prev) => [...prev, ...items].slice(-MAX_FEED_ITEMS)); - }; - const scheduleReconnect = () => { if (closed) return; reconnectTimer = window.setTimeout(connect, 1500);