import { useEffect, useMemo, useRef, useState, type MutableRefObject } from "react"; import { Link } from "react-router-dom"; import { useQuery } from "@tanstack/react-query"; import type { Issue, LiveEvent } from "@paperclip/shared"; import { heartbeatsApi, type LiveRunForIssue } from "../api/heartbeats"; import { issuesApi } from "../api/issues"; import { getUIAdapter } from "../adapters"; import type { TranscriptEntry } from "../adapters"; import { queryKeys } from "../lib/queryKeys"; import { cn, relativeTime } from "../lib/utils"; import { ExternalLink } from "lucide-react"; import { Identity } from "./Identity"; type FeedTone = "info" | "warn" | "error" | "assistant" | "tool"; interface FeedItem { id: string; ts: string; runId: string; agentId: string; agentName: string; text: string; tone: FeedTone; } const MAX_FEED_ITEMS = 40; function readString(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value : null; } function summarizeEntry(entry: TranscriptEntry): { text: string; tone: FeedTone } | null { if (entry.kind === "assistant") { const text = entry.text.trim(); return text ? { text, tone: "assistant" } : null; } if (entry.kind === "thinking") { const text = entry.text.trim(); return text ? { text: `[thinking] ${text}`, tone: "info" } : null; } if (entry.kind === "tool_call") { return { text: `tool ${entry.name}`, tone: "tool" }; } if (entry.kind === "tool_result") { const base = entry.content.trim(); return { text: entry.isError ? `tool error: ${base}` : `tool result: ${base}`, tone: entry.isError ? "error" : "tool", }; } if (entry.kind === "stderr") { const text = entry.text.trim(); return text ? { text, tone: "error" } : null; } if (entry.kind === "system") { const text = entry.text.trim(); return text ? { text, tone: "warn" } : null; } if (entry.kind === "stdout") { const text = entry.text.trim(); return text ? { text, tone: "info" } : null; } return null; } function createFeedItem( run: LiveRunForIssue, ts: string, text: string, tone: FeedTone, nextId: number, ): FeedItem | null { const trimmed = text.trim(); if (!trimmed) return null; return { id: `${run.id}:${nextId}`, ts, runId: run.id, agentId: run.agentId, agentName: run.agentName, text: trimmed.slice(0, 220), tone, }; } function parseStdoutChunk( run: LiveRunForIssue, chunk: string, ts: string, pendingByRun: Map, nextIdRef: MutableRefObject, ): FeedItem[] { const pendingKey = `${run.id}:stdout`; const combined = `${pendingByRun.get(pendingKey) ?? ""}${chunk}`; const split = combined.split(/\r?\n/); pendingByRun.set(pendingKey, split.pop() ?? ""); const adapter = getUIAdapter(run.adapterType); const items: FeedItem[] = []; for (const line of split.slice(-8)) { const trimmed = line.trim(); if (!trimmed) continue; const parsed = adapter.parseStdoutLine(trimmed, ts); if (parsed.length === 0) { const fallback = createFeedItem(run, ts, trimmed, "info", nextIdRef.current++); if (fallback) items.push(fallback); continue; } for (const entry of parsed) { const summary = summarizeEntry(entry); if (!summary) continue; const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++); if (item) items.push(item); } } return items; } function parseStderrChunk( run: LiveRunForIssue, chunk: string, ts: string, pendingByRun: Map, nextIdRef: MutableRefObject, ): FeedItem[] { const pendingKey = `${run.id}:stderr`; const combined = `${pendingByRun.get(pendingKey) ?? ""}${chunk}`; const split = combined.split(/\r?\n/); pendingByRun.set(pendingKey, split.pop() ?? ""); const items: FeedItem[] = []; for (const line of split.slice(-8)) { const item = createFeedItem(run, ts, line, "error", nextIdRef.current++); if (item) items.push(item); } return items; } interface ActiveAgentsPanelProps { companyId: string; } export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) { const [feedByRun, setFeedByRun] = useState>(new Map()); const seenKeysRef = useRef(new Set()); const pendingByRunRef = useRef(new Map()); const nextIdRef = useRef(1); const { data: liveRuns } = useQuery({ queryKey: queryKeys.liveRuns(companyId), queryFn: () => heartbeatsApi.liveRunsForCompany(companyId), }); const runs = liveRuns ?? []; const { data: issues } = useQuery({ queryKey: queryKeys.issues.list(companyId), queryFn: () => issuesApi.list(companyId), enabled: runs.length > 0, }); const issueById = useMemo(() => { const map = new Map(); for (const issue of issues ?? []) { map.set(issue.id, issue); } return map; }, [issues]); const runById = useMemo(() => new Map(runs.map((r) => [r.id, r])), [runs]); const activeRunIds = useMemo(() => new Set(runs.map((r) => r.id)), [runs]); // Clean up pending buffers for runs that ended useEffect(() => { const stillActive = new Set(); for (const runId of activeRunIds) { stillActive.add(`${runId}:stdout`); stillActive.add(`${runId}:stderr`); } for (const key of pendingByRunRef.current.keys()) { if (!stillActive.has(key)) { pendingByRunRef.current.delete(key); } } }, [activeRunIds]); // WebSocket connection for streaming useEffect(() => { if (activeRunIds.size === 0) return; let closed = false; let reconnectTimer: number | null = null; let socket: WebSocket | null = null; const appendItems = (runId: string, items: FeedItem[]) => { if (items.length === 0) return; setFeedByRun((prev) => { const next = new Map(prev); const existing = next.get(runId) ?? []; next.set(runId, [...existing, ...items].slice(-MAX_FEED_ITEMS)); return next; }); }; const scheduleReconnect = () => { if (closed) return; reconnectTimer = window.setTimeout(connect, 1500); }; const connect = () => { if (closed) return; const protocol = window.location.protocol === "https:" ? "wss" : "ws"; const url = `${protocol}://${window.location.host}/api/companies/${encodeURIComponent(companyId)}/events/ws`; socket = new WebSocket(url); socket.onmessage = (message) => { const raw = typeof message.data === "string" ? message.data : ""; if (!raw) return; let event: LiveEvent; try { event = JSON.parse(raw) as LiveEvent; } catch { return; } if (event.companyId !== companyId) return; const payload = event.payload ?? {}; const runId = readString(payload["runId"]); if (!runId || !activeRunIds.has(runId)) return; const run = runById.get(runId); if (!run) return; if (event.type === "heartbeat.run.event") { const seq = typeof payload["seq"] === "number" ? payload["seq"] : null; const eventType = readString(payload["eventType"]) ?? "event"; const messageText = readString(payload["message"]) ?? eventType; const dedupeKey = `${runId}:event:${seq ?? `${eventType}:${messageText}:${event.createdAt}`}`; if (seenKeysRef.current.has(dedupeKey)) return; seenKeysRef.current.add(dedupeKey); if (seenKeysRef.current.size > 2000) seenKeysRef.current.clear(); const tone = eventType === "error" ? "error" : eventType === "lifecycle" ? "warn" : "info"; const item = createFeedItem(run, event.createdAt, messageText, tone, nextIdRef.current++); if (item) appendItems(run.id, [item]); return; } if (event.type === "heartbeat.run.status") { const status = readString(payload["status"]) ?? "updated"; const dedupeKey = `${runId}:status:${status}:${readString(payload["finishedAt"]) ?? ""}`; if (seenKeysRef.current.has(dedupeKey)) return; seenKeysRef.current.add(dedupeKey); if (seenKeysRef.current.size > 2000) seenKeysRef.current.clear(); const tone = status === "failed" || status === "timed_out" ? "error" : "warn"; const item = createFeedItem(run, event.createdAt, `run ${status}`, tone, nextIdRef.current++); if (item) appendItems(run.id, [item]); return; } if (event.type === "heartbeat.run.log") { const chunk = readString(payload["chunk"]); if (!chunk) return; const stream = readString(payload["stream"]) === "stderr" ? "stderr" : "stdout"; if (stream === "stderr") { appendItems(run.id, parseStderrChunk(run, chunk, event.createdAt, pendingByRunRef.current, nextIdRef)); return; } appendItems(run.id, parseStdoutChunk(run, chunk, event.createdAt, pendingByRunRef.current, nextIdRef)); } }; socket.onerror = () => { socket?.close(); }; socket.onclose = () => { scheduleReconnect(); }; }; connect(); return () => { closed = true; if (reconnectTimer !== null) window.clearTimeout(reconnectTimer); if (socket) { socket.onmessage = null; socket.onerror = null; socket.onclose = null; socket.close(1000, "active_agents_panel_unmount"); } }; }, [activeRunIds, companyId, runById]); if (runs.length === 0) return null; return (

Active Agents

{runs.map((run) => ( ))}
); } function AgentRunCard({ run, issue, feed, }: { run: LiveRunForIssue; issue?: Issue; feed: FeedItem[]; }) { const bodyRef = useRef(null); const recent = feed.slice(-20); useEffect(() => { const body = bodyRef.current; if (!body) return; body.scrollTo({ top: body.scrollHeight, behavior: "smooth" }); }, [feed.length]); return (
Live {run.id.slice(0, 8)}
Open run
{run.issueId && (
Working on: {issue?.identifier ?? run.issueId.slice(0, 8)} {issue?.title ? ` - ${issue.title}` : ""}
)}
{recent.length === 0 && (
Waiting for output...
)} {recent.map((item, index) => (
{relativeTime(item.ts)} {item.text}
))}
); }