From a2bdfb0dd32adb16794483640b18e99cc6eba685 Mon Sep 17 00:00:00 2001 From: Dotta Date: Fri, 6 Mar 2026 14:39:49 -0600 Subject: [PATCH] stream live run detail output via websocket --- ui/src/pages/AgentDetail.tsx | 123 +++++++++++++++++++++++++++++++++-- 1 file changed, 118 insertions(+), 5 deletions(-) diff --git a/ui/src/pages/AgentDetail.tsx b/ui/src/pages/AgentDetail.tsx index 4b159e07..2888d8c9 100644 --- a/ui/src/pages/AgentDetail.tsx +++ b/ui/src/pages/AgentDetail.tsx @@ -56,7 +56,7 @@ import { } from "lucide-react"; import { Input } from "@/components/ui/input"; import { AgentIcon, AgentIconPicker } from "../components/AgentIconPicker"; -import { isUuidLike, type Agent, type HeartbeatRun, type HeartbeatRunEvent, type AgentRuntimeState } from "@paperclipai/shared"; +import { isUuidLike, type Agent, type HeartbeatRun, type HeartbeatRunEvent, type AgentRuntimeState, type LiveEvent } from "@paperclipai/shared"; import { agentRouteRef } from "../lib/utils"; const runStatusIcons: Record = { @@ -1761,6 +1761,7 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin const [logError, setLogError] = useState(null); const [logOffset, setLogOffset] = useState(0); const [isFollowing, setIsFollowing] = useState(false); + const [isStreamingConnected, setIsStreamingConnected] = useState(false); const logEndRef = useRef(null); const pendingLogLineRef = useRef(""); const scrollContainerRef = useRef(null); @@ -1957,7 +1958,7 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin // Poll for live updates useEffect(() => { - if (!isLive) return; + if (!isLive || isStreamingConnected) return; const interval = setInterval(async () => { const maxSeq = events.length > 0 ? Math.max(...events.map((e) => e.seq)) : 0; try { @@ -1970,11 +1971,11 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin } }, 2000); return () => clearInterval(interval); - }, [run.id, isLive, events]); + }, [run.id, isLive, isStreamingConnected, events]); // Poll shell log for running runs useEffect(() => { - if (!isLive) return; + if (!isLive || isStreamingConnected) return; const interval = setInterval(async () => { try { const result = await heartbeatsApi.log(run.id, logOffset, 256_000); @@ -1992,7 +1993,119 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin } }, 2000); return () => clearInterval(interval); - }, [run.id, isLive, logOffset]); + }, [run.id, isLive, isStreamingConnected, logOffset]); + + // Stream live updates from websocket (primary path for running runs). + useEffect(() => { + if (!isLive) return; + + let closed = false; + let reconnectTimer: number | null = null; + let socket: WebSocket | null = null; + + const scheduleReconnect = () => { + if (closed) return; + reconnectTimer = window.setTimeout(connect, 1500); + }; + + const connect = () => { + if (closed) return; + const protocol = window.location.protocol === "https:" ? "wss" : "ws"; + const url = `${protocol}://${window.location.host}/api/companies/${encodeURIComponent(run.companyId)}/events/ws`; + socket = new WebSocket(url); + + socket.onopen = () => { + setIsStreamingConnected(true); + }; + + socket.onmessage = (message) => { + const rawMessage = typeof message.data === "string" ? message.data : ""; + if (!rawMessage) return; + + let event: LiveEvent; + try { + event = JSON.parse(rawMessage) as LiveEvent; + } catch { + return; + } + + if (event.companyId !== run.companyId) return; + const payload = asRecord(event.payload); + const eventRunId = asNonEmptyString(payload?.runId); + if (!payload || eventRunId !== run.id) return; + + if (event.type === "heartbeat.run.log") { + const chunk = typeof payload.chunk === "string" ? payload.chunk : ""; + if (!chunk) return; + const streamRaw = asNonEmptyString(payload.stream); + const stream = streamRaw === "stderr" || streamRaw === "system" ? streamRaw : "stdout"; + const ts = asNonEmptyString((payload as Record).ts) ?? event.createdAt; + setLogLines((prev) => [...prev, { ts, stream, chunk }]); + return; + } + + if (event.type !== "heartbeat.run.event") return; + + const seq = typeof payload.seq === "number" ? payload.seq : null; + if (seq === null || !Number.isFinite(seq)) return; + + const streamRaw = asNonEmptyString(payload.stream); + const stream = + streamRaw === "stdout" || streamRaw === "stderr" || streamRaw === "system" + ? streamRaw + : null; + const levelRaw = asNonEmptyString(payload.level); + const level = + levelRaw === "info" || levelRaw === "warn" || levelRaw === "error" + ? levelRaw + : null; + + const liveEvent: HeartbeatRunEvent = { + id: seq, + companyId: run.companyId, + runId: run.id, + agentId: run.agentId, + seq, + eventType: asNonEmptyString(payload.eventType) ?? "event", + stream, + level, + color: asNonEmptyString(payload.color), + message: asNonEmptyString(payload.message), + payload: asRecord(payload.payload), + createdAt: new Date(event.createdAt), + }; + + setEvents((prev) => { + if (prev.some((existing) => existing.seq === seq)) return prev; + return [...prev, liveEvent]; + }); + }; + + socket.onerror = () => { + socket?.close(); + }; + + socket.onclose = () => { + setIsStreamingConnected(false); + scheduleReconnect(); + }; + }; + + connect(); + + return () => { + closed = true; + setIsStreamingConnected(false); + if (reconnectTimer !== null) window.clearTimeout(reconnectTimer); + if (socket) { + socket.onopen = null; + socket.onmessage = null; + socket.onerror = null; + socket.onclose = null; + socket.close(1000, "run_detail_unmount"); + } + }; + }, [isLive, run.companyId, run.id, run.agentId]); const adapterInvokePayload = useMemo(() => { const evt = events.find((e) => e.eventType === "adapter.invoke");