stream live run detail output via websocket

This commit is contained in:
Dotta
2026-03-06 14:39:49 -06:00
parent 67247b5d6a
commit a2bdfb0dd3

View File

@@ -56,7 +56,7 @@ import {
} from "lucide-react";
import { Input } from "@/components/ui/input";
import { AgentIcon, AgentIconPicker } from "../components/AgentIconPicker";
import { isUuidLike, type Agent, type HeartbeatRun, type HeartbeatRunEvent, type AgentRuntimeState } from "@paperclipai/shared";
import { isUuidLike, type Agent, type HeartbeatRun, type HeartbeatRunEvent, type AgentRuntimeState, type LiveEvent } from "@paperclipai/shared";
import { agentRouteRef } from "../lib/utils";
const runStatusIcons: Record<string, { icon: typeof CheckCircle2; color: string }> = {
@@ -1761,6 +1761,7 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin
const [logError, setLogError] = useState<string | null>(null);
const [logOffset, setLogOffset] = useState(0);
const [isFollowing, setIsFollowing] = useState(false);
const [isStreamingConnected, setIsStreamingConnected] = useState(false);
const logEndRef = useRef<HTMLDivElement>(null);
const pendingLogLineRef = useRef("");
const scrollContainerRef = useRef<ScrollContainer | null>(null);
@@ -1957,7 +1958,7 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin
// Poll for live updates
useEffect(() => {
if (!isLive) return;
if (!isLive || isStreamingConnected) return;
const interval = setInterval(async () => {
const maxSeq = events.length > 0 ? Math.max(...events.map((e) => e.seq)) : 0;
try {
@@ -1970,11 +1971,11 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin
}
}, 2000);
return () => clearInterval(interval);
}, [run.id, isLive, events]);
}, [run.id, isLive, isStreamingConnected, events]);
// Poll shell log for running runs
useEffect(() => {
if (!isLive) return;
if (!isLive || isStreamingConnected) return;
const interval = setInterval(async () => {
try {
const result = await heartbeatsApi.log(run.id, logOffset, 256_000);
@@ -1992,7 +1993,119 @@ function LogViewer({ run, adapterType }: { run: HeartbeatRun; adapterType: strin
}
}, 2000);
return () => clearInterval(interval);
}, [run.id, isLive, logOffset]);
}, [run.id, isLive, isStreamingConnected, logOffset]);
// Stream live updates from websocket (primary path for running runs).
useEffect(() => {
if (!isLive) return;
let closed = false;
let reconnectTimer: number | null = null;
let socket: WebSocket | null = null;
const scheduleReconnect = () => {
if (closed) return;
reconnectTimer = window.setTimeout(connect, 1500);
};
const connect = () => {
if (closed) return;
const protocol = window.location.protocol === "https:" ? "wss" : "ws";
const url = `${protocol}://${window.location.host}/api/companies/${encodeURIComponent(run.companyId)}/events/ws`;
socket = new WebSocket(url);
socket.onopen = () => {
setIsStreamingConnected(true);
};
socket.onmessage = (message) => {
const rawMessage = typeof message.data === "string" ? message.data : "";
if (!rawMessage) return;
let event: LiveEvent;
try {
event = JSON.parse(rawMessage) as LiveEvent;
} catch {
return;
}
if (event.companyId !== run.companyId) return;
const payload = asRecord(event.payload);
const eventRunId = asNonEmptyString(payload?.runId);
if (!payload || eventRunId !== run.id) return;
if (event.type === "heartbeat.run.log") {
const chunk = typeof payload.chunk === "string" ? payload.chunk : "";
if (!chunk) return;
const streamRaw = asNonEmptyString(payload.stream);
const stream = streamRaw === "stderr" || streamRaw === "system" ? streamRaw : "stdout";
const ts = asNonEmptyString((payload as Record<string, unknown>).ts) ?? event.createdAt;
setLogLines((prev) => [...prev, { ts, stream, chunk }]);
return;
}
if (event.type !== "heartbeat.run.event") return;
const seq = typeof payload.seq === "number" ? payload.seq : null;
if (seq === null || !Number.isFinite(seq)) return;
const streamRaw = asNonEmptyString(payload.stream);
const stream =
streamRaw === "stdout" || streamRaw === "stderr" || streamRaw === "system"
? streamRaw
: null;
const levelRaw = asNonEmptyString(payload.level);
const level =
levelRaw === "info" || levelRaw === "warn" || levelRaw === "error"
? levelRaw
: null;
const liveEvent: HeartbeatRunEvent = {
id: seq,
companyId: run.companyId,
runId: run.id,
agentId: run.agentId,
seq,
eventType: asNonEmptyString(payload.eventType) ?? "event",
stream,
level,
color: asNonEmptyString(payload.color),
message: asNonEmptyString(payload.message),
payload: asRecord(payload.payload),
createdAt: new Date(event.createdAt),
};
setEvents((prev) => {
if (prev.some((existing) => existing.seq === seq)) return prev;
return [...prev, liveEvent];
});
};
socket.onerror = () => {
socket?.close();
};
socket.onclose = () => {
setIsStreamingConnected(false);
scheduleReconnect();
};
};
connect();
return () => {
closed = true;
setIsStreamingConnected(false);
if (reconnectTimer !== null) window.clearTimeout(reconnectTimer);
if (socket) {
socket.onopen = null;
socket.onmessage = null;
socket.onerror = null;
socket.onclose = null;
socket.close(1000, "run_detail_unmount");
}
};
}, [isLive, run.companyId, run.id, run.agentId]);
const adapterInvokePayload = useMemo(() => {
const evt = events.find((e) => e.eventType === "adapter.invoke");