284 lines
8.9 KiB
TypeScript
284 lines
8.9 KiB
TypeScript
import { useEffect, useMemo, useRef, useState } from "react";
|
|
import type { LiveEvent } from "@paperclipai/shared";
|
|
import { heartbeatsApi, type LiveRunForIssue } from "../../api/heartbeats";
|
|
import { buildTranscript, getUIAdapter, type RunLogChunk, type TranscriptEntry } from "../../adapters";
|
|
|
|
const LOG_POLL_INTERVAL_MS = 2000;
|
|
const LOG_READ_LIMIT_BYTES = 256_000;
|
|
|
|
interface UseLiveRunTranscriptsOptions {
|
|
runs: LiveRunForIssue[];
|
|
companyId?: string | null;
|
|
maxChunksPerRun?: number;
|
|
}
|
|
|
|
function readString(value: unknown): string | null {
|
|
return typeof value === "string" && value.trim().length > 0 ? value : null;
|
|
}
|
|
|
|
function isTerminalStatus(status: string): boolean {
|
|
return status === "failed" || status === "timed_out" || status === "cancelled" || status === "succeeded";
|
|
}
|
|
|
|
function parsePersistedLogContent(
|
|
runId: string,
|
|
content: string,
|
|
pendingByRun: Map<string, string>,
|
|
): Array<RunLogChunk & { dedupeKey: string }> {
|
|
if (!content) return [];
|
|
|
|
const pendingKey = `${runId}:records`;
|
|
const combined = `${pendingByRun.get(pendingKey) ?? ""}${content}`;
|
|
const split = combined.split("\n");
|
|
pendingByRun.set(pendingKey, split.pop() ?? "");
|
|
|
|
const parsed: Array<RunLogChunk & { dedupeKey: string }> = [];
|
|
for (const line of split) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed) continue;
|
|
try {
|
|
const raw = JSON.parse(trimmed) as { ts?: unknown; stream?: unknown; chunk?: unknown };
|
|
const stream = raw.stream === "stderr" || raw.stream === "system" ? raw.stream : "stdout";
|
|
const chunk = typeof raw.chunk === "string" ? raw.chunk : "";
|
|
const ts = typeof raw.ts === "string" ? raw.ts : new Date().toISOString();
|
|
if (!chunk) continue;
|
|
parsed.push({
|
|
ts,
|
|
stream,
|
|
chunk,
|
|
dedupeKey: `log:${runId}:${ts}:${stream}:${chunk}`,
|
|
});
|
|
} catch {
|
|
// Ignore malformed log rows.
|
|
}
|
|
}
|
|
|
|
return parsed;
|
|
}
|
|
|
|
export function useLiveRunTranscripts({
|
|
runs,
|
|
companyId,
|
|
maxChunksPerRun = 200,
|
|
}: UseLiveRunTranscriptsOptions) {
|
|
const [chunksByRun, setChunksByRun] = useState<Map<string, RunLogChunk[]>>(new Map());
|
|
const seenChunkKeysRef = useRef(new Set<string>());
|
|
const pendingLogRowsByRunRef = useRef(new Map<string, string>());
|
|
const logOffsetByRunRef = useRef(new Map<string, number>());
|
|
|
|
const runById = useMemo(() => new Map(runs.map((run) => [run.id, run])), [runs]);
|
|
const activeRunIds = useMemo(
|
|
() => new Set(runs.filter((run) => !isTerminalStatus(run.status)).map((run) => run.id)),
|
|
[runs],
|
|
);
|
|
const runIdsKey = useMemo(
|
|
() => runs.map((run) => run.id).sort((a, b) => a.localeCompare(b)).join(","),
|
|
[runs],
|
|
);
|
|
|
|
const appendChunks = (runId: string, chunks: Array<RunLogChunk & { dedupeKey: string }>) => {
|
|
if (chunks.length === 0) return;
|
|
setChunksByRun((prev) => {
|
|
const next = new Map(prev);
|
|
const existing = [...(next.get(runId) ?? [])];
|
|
let changed = false;
|
|
|
|
for (const chunk of chunks) {
|
|
if (seenChunkKeysRef.current.has(chunk.dedupeKey)) continue;
|
|
seenChunkKeysRef.current.add(chunk.dedupeKey);
|
|
existing.push({ ts: chunk.ts, stream: chunk.stream, chunk: chunk.chunk });
|
|
changed = true;
|
|
}
|
|
|
|
if (!changed) return prev;
|
|
if (seenChunkKeysRef.current.size > 12000) {
|
|
seenChunkKeysRef.current.clear();
|
|
}
|
|
next.set(runId, existing.slice(-maxChunksPerRun));
|
|
return next;
|
|
});
|
|
};
|
|
|
|
useEffect(() => {
|
|
const knownRunIds = new Set(runs.map((run) => run.id));
|
|
setChunksByRun((prev) => {
|
|
const next = new Map<string, RunLogChunk[]>();
|
|
for (const [runId, chunks] of prev) {
|
|
if (knownRunIds.has(runId)) {
|
|
next.set(runId, chunks);
|
|
}
|
|
}
|
|
return next.size === prev.size ? prev : next;
|
|
});
|
|
|
|
for (const key of pendingLogRowsByRunRef.current.keys()) {
|
|
const runId = key.replace(/:records$/, "");
|
|
if (!knownRunIds.has(runId)) {
|
|
pendingLogRowsByRunRef.current.delete(key);
|
|
}
|
|
}
|
|
for (const runId of logOffsetByRunRef.current.keys()) {
|
|
if (!knownRunIds.has(runId)) {
|
|
logOffsetByRunRef.current.delete(runId);
|
|
}
|
|
}
|
|
}, [runs]);
|
|
|
|
useEffect(() => {
|
|
if (runs.length === 0) return;
|
|
|
|
let cancelled = false;
|
|
|
|
const readRunLog = async (run: LiveRunForIssue) => {
|
|
const offset = logOffsetByRunRef.current.get(run.id) ?? 0;
|
|
try {
|
|
const result = await heartbeatsApi.log(run.id, offset, LOG_READ_LIMIT_BYTES);
|
|
if (cancelled) return;
|
|
|
|
appendChunks(run.id, parsePersistedLogContent(run.id, result.content, pendingLogRowsByRunRef.current));
|
|
|
|
if (result.nextOffset !== undefined) {
|
|
logOffsetByRunRef.current.set(run.id, result.nextOffset);
|
|
return;
|
|
}
|
|
if (result.content.length > 0) {
|
|
logOffsetByRunRef.current.set(run.id, offset + result.content.length);
|
|
}
|
|
} catch {
|
|
// Ignore log read errors while output is initializing.
|
|
}
|
|
};
|
|
|
|
const readAll = async () => {
|
|
await Promise.all(runs.map((run) => readRunLog(run)));
|
|
};
|
|
|
|
void readAll();
|
|
const interval = window.setInterval(() => {
|
|
void readAll();
|
|
}, LOG_POLL_INTERVAL_MS);
|
|
|
|
return () => {
|
|
cancelled = true;
|
|
window.clearInterval(interval);
|
|
};
|
|
}, [runIdsKey, runs]);
|
|
|
|
useEffect(() => {
|
|
if (!companyId || activeRunIds.size === 0) return;
|
|
|
|
let closed = false;
|
|
let reconnectTimer: number | null = null;
|
|
let socket: WebSocket | null = null;
|
|
|
|
const scheduleReconnect = () => {
|
|
if (closed) return;
|
|
reconnectTimer = window.setTimeout(connect, 1500);
|
|
};
|
|
|
|
const connect = () => {
|
|
if (closed) return;
|
|
const protocol = window.location.protocol === "https:" ? "wss" : "ws";
|
|
const url = `${protocol}://${window.location.host}/api/companies/${encodeURIComponent(companyId)}/events/ws`;
|
|
socket = new WebSocket(url);
|
|
|
|
socket.onmessage = (message) => {
|
|
const raw = typeof message.data === "string" ? message.data : "";
|
|
if (!raw) return;
|
|
|
|
let event: LiveEvent;
|
|
try {
|
|
event = JSON.parse(raw) as LiveEvent;
|
|
} catch {
|
|
return;
|
|
}
|
|
|
|
if (event.companyId !== companyId) return;
|
|
const payload = event.payload ?? {};
|
|
const runId = readString(payload["runId"]);
|
|
if (!runId || !activeRunIds.has(runId)) return;
|
|
if (!runById.has(runId)) return;
|
|
|
|
if (event.type === "heartbeat.run.log") {
|
|
const chunk = readString(payload["chunk"]);
|
|
if (!chunk) return;
|
|
const ts = readString(payload["ts"]) ?? event.createdAt;
|
|
const stream =
|
|
readString(payload["stream"]) === "stderr"
|
|
? "stderr"
|
|
: readString(payload["stream"]) === "system"
|
|
? "system"
|
|
: "stdout";
|
|
appendChunks(runId, [{
|
|
ts,
|
|
stream,
|
|
chunk,
|
|
dedupeKey: `log:${runId}:${ts}:${stream}:${chunk}`,
|
|
}]);
|
|
return;
|
|
}
|
|
|
|
if (event.type === "heartbeat.run.event") {
|
|
const seq = typeof payload["seq"] === "number" ? payload["seq"] : null;
|
|
const eventType = readString(payload["eventType"]) ?? "event";
|
|
const messageText = readString(payload["message"]) ?? eventType;
|
|
appendChunks(runId, [{
|
|
ts: event.createdAt,
|
|
stream: eventType === "error" ? "stderr" : "system",
|
|
chunk: messageText,
|
|
dedupeKey: `socket:event:${runId}:${seq ?? `${eventType}:${messageText}:${event.createdAt}`}`,
|
|
}]);
|
|
return;
|
|
}
|
|
|
|
if (event.type === "heartbeat.run.status") {
|
|
const status = readString(payload["status"]) ?? "updated";
|
|
appendChunks(runId, [{
|
|
ts: event.createdAt,
|
|
stream: isTerminalStatus(status) && status !== "succeeded" ? "stderr" : "system",
|
|
chunk: `run ${status}`,
|
|
dedupeKey: `socket:status:${runId}:${status}:${readString(payload["finishedAt"]) ?? ""}`,
|
|
}]);
|
|
}
|
|
};
|
|
|
|
socket.onerror = () => {
|
|
socket?.close();
|
|
};
|
|
|
|
socket.onclose = () => {
|
|
scheduleReconnect();
|
|
};
|
|
};
|
|
|
|
connect();
|
|
|
|
return () => {
|
|
closed = true;
|
|
if (reconnectTimer !== null) window.clearTimeout(reconnectTimer);
|
|
if (socket) {
|
|
socket.onmessage = null;
|
|
socket.onerror = null;
|
|
socket.onclose = null;
|
|
socket.close(1000, "live_run_transcripts_unmount");
|
|
}
|
|
};
|
|
}, [activeRunIds, companyId, runById]);
|
|
|
|
const transcriptByRun = useMemo(() => {
|
|
const next = new Map<string, TranscriptEntry[]>();
|
|
for (const run of runs) {
|
|
const adapter = getUIAdapter(run.adapterType);
|
|
next.set(run.id, buildTranscript(chunksByRun.get(run.id) ?? [], adapter.parseStdoutLine));
|
|
}
|
|
return next;
|
|
}, [chunksByRun, runs]);
|
|
|
|
return {
|
|
transcriptByRun,
|
|
hasOutputForRun(runId: string) {
|
|
return (chunksByRun.get(runId)?.length ?? 0) > 0;
|
|
},
|
|
};
|
|
}
|