Backfill issue live run widget logs from persisted run log
This commit is contained in:
@@ -29,6 +29,8 @@ interface FeedItem {
|
||||
}
|
||||
|
||||
const MAX_FEED_ITEMS = 80;
|
||||
const LOG_POLL_INTERVAL_MS = 2000;
|
||||
const LOG_READ_LIMIT_BYTES = 256_000;
|
||||
|
||||
function readString(value: unknown): string | null {
|
||||
return typeof value === "string" && value.trim().length > 0 ? value : null;
|
||||
@@ -147,12 +149,45 @@ function parseStderrChunk(
|
||||
return items;
|
||||
}
|
||||
|
||||
function parsePersistedLogContent(
|
||||
runId: string,
|
||||
content: string,
|
||||
pendingByRun: Map<string, string>,
|
||||
): Array<{ ts: string; stream: "stdout" | "stderr" | "system"; chunk: string }> {
|
||||
if (!content) return [];
|
||||
|
||||
const pendingKey = `${runId}:records`;
|
||||
const combined = `${pendingByRun.get(pendingKey) ?? ""}${content}`;
|
||||
const split = combined.split("\n");
|
||||
pendingByRun.set(pendingKey, split.pop() ?? "");
|
||||
|
||||
const parsed: Array<{ ts: string; stream: "stdout" | "stderr" | "system"; chunk: string }> = [];
|
||||
for (const line of split) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) continue;
|
||||
try {
|
||||
const raw = JSON.parse(trimmed) as { ts?: unknown; stream?: unknown; chunk?: unknown };
|
||||
const stream = raw.stream === "stderr" || raw.stream === "system" ? raw.stream : "stdout";
|
||||
const chunk = typeof raw.chunk === "string" ? raw.chunk : "";
|
||||
const ts = typeof raw.ts === "string" ? raw.ts : new Date().toISOString();
|
||||
if (!chunk) continue;
|
||||
parsed.push({ ts, stream, chunk });
|
||||
} catch {
|
||||
// Ignore malformed log rows.
|
||||
}
|
||||
}
|
||||
|
||||
return parsed;
|
||||
}
|
||||
|
||||
export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) {
|
||||
const queryClient = useQueryClient();
|
||||
const [feed, setFeed] = useState<FeedItem[]>([]);
|
||||
const [cancellingRunIds, setCancellingRunIds] = useState(new Set<string>());
|
||||
const seenKeysRef = useRef(new Set<string>());
|
||||
const pendingByRunRef = useRef(new Map<string, string>());
|
||||
const pendingLogRowsByRunRef = useRef(new Map<string, string>());
|
||||
const logOffsetByRunRef = useRef(new Map<string, number>());
|
||||
const runMetaByIdRef = useRef(new Map<string, { agentId: string; agentName: string }>());
|
||||
const nextIdRef = useRef(1);
|
||||
const bodyRef = useRef<HTMLDivElement>(null);
|
||||
@@ -162,6 +197,7 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) {
|
||||
try {
|
||||
await heartbeatsApi.cancel(runId);
|
||||
queryClient.invalidateQueries({ queryKey: queryKeys.issues.liveRuns(issueId) });
|
||||
queryClient.invalidateQueries({ queryKey: queryKeys.issues.activeRun(issueId) });
|
||||
} finally {
|
||||
setCancellingRunIds((prev) => {
|
||||
const next = new Set(prev);
|
||||
@@ -212,6 +248,27 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) {
|
||||
|
||||
const runById = useMemo(() => new Map(runs.map((run) => [run.id, run])), [runs]);
|
||||
const activeRunIds = useMemo(() => new Set(runs.map((run) => run.id)), [runs]);
|
||||
const runIdsKey = useMemo(
|
||||
() => runs.map((run) => run.id).sort((a, b) => a.localeCompare(b)).join(","),
|
||||
[runs],
|
||||
);
|
||||
const appendItems = (items: FeedItem[]) => {
|
||||
if (items.length === 0) return;
|
||||
setFeed((prev) => {
|
||||
const deduped: FeedItem[] = [];
|
||||
for (const item of items) {
|
||||
const key = `feed:${item.runId}:${item.ts}:${item.tone}:${item.text}`;
|
||||
if (seenKeysRef.current.has(key)) continue;
|
||||
seenKeysRef.current.add(key);
|
||||
deduped.push(item);
|
||||
}
|
||||
if (deduped.length === 0) return prev;
|
||||
if (seenKeysRef.current.size > 6000) {
|
||||
seenKeysRef.current.clear();
|
||||
}
|
||||
return [...prev, ...deduped].slice(-MAX_FEED_ITEMS);
|
||||
});
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
const body = bodyRef.current;
|
||||
@@ -236,8 +293,78 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) {
|
||||
pendingByRunRef.current.delete(key);
|
||||
}
|
||||
}
|
||||
const liveRunIds = new Set(activeRunIds);
|
||||
for (const key of pendingLogRowsByRunRef.current.keys()) {
|
||||
const runId = key.replace(/:records$/, "");
|
||||
if (!liveRunIds.has(runId)) {
|
||||
pendingLogRowsByRunRef.current.delete(key);
|
||||
}
|
||||
}
|
||||
for (const runId of logOffsetByRunRef.current.keys()) {
|
||||
if (!liveRunIds.has(runId)) {
|
||||
logOffsetByRunRef.current.delete(runId);
|
||||
}
|
||||
}
|
||||
}, [activeRunIds]);
|
||||
|
||||
useEffect(() => {
|
||||
if (runs.length === 0) return;
|
||||
|
||||
let cancelled = false;
|
||||
|
||||
const readRunLog = async (run: LiveRunForIssue) => {
|
||||
const offset = logOffsetByRunRef.current.get(run.id) ?? 0;
|
||||
try {
|
||||
const result = await heartbeatsApi.log(run.id, offset, LOG_READ_LIMIT_BYTES);
|
||||
if (cancelled) return;
|
||||
|
||||
const rows = parsePersistedLogContent(run.id, result.content, pendingLogRowsByRunRef.current);
|
||||
const items: FeedItem[] = [];
|
||||
for (const row of rows) {
|
||||
if (row.stream === "stderr") {
|
||||
items.push(
|
||||
...parseStderrChunk(run, row.chunk, row.ts, pendingByRunRef.current, nextIdRef),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if (row.stream === "system") {
|
||||
const item = createFeedItem(run, row.ts, row.chunk, "warn", nextIdRef.current++);
|
||||
if (item) items.push(item);
|
||||
continue;
|
||||
}
|
||||
items.push(
|
||||
...parseStdoutChunk(run, row.chunk, row.ts, pendingByRunRef.current, nextIdRef),
|
||||
);
|
||||
}
|
||||
appendItems(items);
|
||||
|
||||
if (result.nextOffset !== undefined) {
|
||||
logOffsetByRunRef.current.set(run.id, result.nextOffset);
|
||||
return;
|
||||
}
|
||||
if (result.content.length > 0) {
|
||||
logOffsetByRunRef.current.set(run.id, offset + result.content.length);
|
||||
}
|
||||
} catch {
|
||||
// Ignore log read errors while run output is initializing.
|
||||
}
|
||||
};
|
||||
|
||||
const readAll = async () => {
|
||||
await Promise.all(runs.map((run) => readRunLog(run)));
|
||||
};
|
||||
|
||||
void readAll();
|
||||
const interval = window.setInterval(() => {
|
||||
void readAll();
|
||||
}, LOG_POLL_INTERVAL_MS);
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
window.clearInterval(interval);
|
||||
};
|
||||
}, [runIdsKey, runs]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!companyId || activeRunIds.size === 0) return;
|
||||
|
||||
@@ -245,11 +372,6 @@ export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) {
|
||||
let reconnectTimer: number | null = null;
|
||||
let socket: WebSocket | null = null;
|
||||
|
||||
const appendItems = (items: FeedItem[]) => {
|
||||
if (items.length === 0) return;
|
||||
setFeed((prev) => [...prev, ...items].slice(-MAX_FEED_ITEMS));
|
||||
};
|
||||
|
||||
const scheduleReconnect = () => {
|
||||
if (closed) return;
|
||||
reconnectTimer = window.setTimeout(connect, 1500);
|
||||
|
||||
Reference in New Issue
Block a user