From c9c75bbc0a37c7ecf1ab64becc112fcd3c3ddc53 Mon Sep 17 00:00:00 2001 From: Forgotten Date: Tue, 17 Feb 2026 12:24:43 -0600 Subject: [PATCH] Implement agent runtime services and WebSocket realtime Expand heartbeat service with full run executor, wakeup coordinator, and adapter lifecycle. Add run-log-store for pluggable log persistence. Add live-events service and WebSocket handler for realtime updates. Expand agent and issue routes with runtime operations. Add ws dependency. Co-Authored-By: Claude Opus 4.6 --- server/package.json | 1 + server/src/config.ts | 4 + server/src/index.ts | 30 +- server/src/realtime/live-events-ws.ts | 177 ++++ server/src/routes/agents.ts | 143 ++- server/src/routes/issues.ts | 46 +- server/src/services/activity-log.ts | 15 + server/src/services/heartbeat.ts | 1286 ++++++++++++++++++++++--- server/src/services/index.ts | 1 + server/src/services/live-events.ts | 40 + server/src/services/run-log-store.ts | 159 +++ 11 files changed, 1746 insertions(+), 156 deletions(-) create mode 100644 server/src/realtime/live-events-ws.ts create mode 100644 server/src/services/live-events.ts create mode 100644 server/src/services/run-log-store.ts diff --git a/server/package.json b/server/package.json index ee9939af..1b8105f7 100644 --- a/server/package.json +++ b/server/package.json @@ -16,6 +16,7 @@ "express": "^5.1.0", "pino": "^9.6.0", "pino-http": "^10.4.0", + "ws": "^8.19.0", "zod": "^3.24.2" }, "devDependencies": { diff --git a/server/src/config.ts b/server/src/config.ts index 6a59872e..c2387f88 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -2,6 +2,8 @@ export interface Config { port: number; databaseUrl: string | undefined; serveUi: boolean; + heartbeatSchedulerEnabled: boolean; + heartbeatSchedulerIntervalMs: number; } export function loadConfig(): Config { @@ -9,5 +11,7 @@ export function loadConfig(): Config { port: Number(process.env.PORT) || 3100, databaseUrl: process.env.DATABASE_URL, serveUi: process.env.SERVE_UI === "true", + heartbeatSchedulerEnabled: process.env.HEARTBEAT_SCHEDULER_ENABLED !== "false", + heartbeatSchedulerIntervalMs: Math.max(10000, Number(process.env.HEARTBEAT_SCHEDULER_INTERVAL_MS) || 30000), }; } diff --git a/server/src/index.ts b/server/src/index.ts index 905fe263..f87de840 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -1,7 +1,11 @@ +import { createServer } from "node:http"; +import { resolve } from "node:path"; import { createDb, createPgliteDb } from "@paperclip/db"; import { createApp } from "./app.js"; import { loadConfig } from "./config.js"; import { logger } from "./middleware/logger.js"; +import { setupLiveEventsWebSocketServer } from "./realtime/live-events-ws.js"; +import { heartbeatService } from "./services/index.js"; const config = loadConfig(); @@ -9,13 +13,33 @@ let db; if (config.databaseUrl) { db = createDb(config.databaseUrl); } else { - logger.info("No DATABASE_URL set — using embedded PGlite (./data/pglite)"); - db = await createPgliteDb("./data/pglite"); + const dataDir = resolve("./data/pglite"); + logger.info(`No DATABASE_URL set — using embedded PGlite (${dataDir})`); + db = await createPgliteDb(dataDir); logger.info("PGlite ready, schema pushed"); } const app = createApp(db as any, { serveUi: config.serveUi }); +const server = createServer(app); -app.listen(config.port, () => { +setupLiveEventsWebSocketServer(server, db as any); + +if (config.heartbeatSchedulerEnabled) { + const heartbeat = heartbeatService(db as any); + setInterval(() => { + void heartbeat + .tickTimers(new Date()) + .then((result) => { + if (result.enqueued > 0) { + logger.info({ ...result }, "heartbeat timer tick enqueued runs"); + } + }) + .catch((err) => { + logger.error({ err }, "heartbeat timer tick failed"); + }); + }, config.heartbeatSchedulerIntervalMs); +} + +server.listen(config.port, () => { logger.info(`Server listening on :${config.port}`); }); diff --git a/server/src/realtime/live-events-ws.ts b/server/src/realtime/live-events-ws.ts new file mode 100644 index 00000000..cdcbf8a2 --- /dev/null +++ b/server/src/realtime/live-events-ws.ts @@ -0,0 +1,177 @@ +import { createHash } from "node:crypto"; +import type { IncomingMessage, Server as HttpServer } from "node:http"; +import type { Duplex } from "node:stream"; +import { and, eq, isNull } from "drizzle-orm"; +import type { Db } from "@paperclip/db"; +import { agentApiKeys } from "@paperclip/db"; +import { WebSocket, WebSocketServer } from "ws"; +import { logger } from "../middleware/logger.js"; +import { subscribeCompanyLiveEvents } from "../services/live-events.js"; + +interface UpgradeContext { + companyId: string; + actorType: "board" | "agent"; + actorId: string; +} + +interface IncomingMessageWithContext extends IncomingMessage { + paperclipUpgradeContext?: UpgradeContext; +} + +function hashToken(token: string) { + return createHash("sha256").update(token).digest("hex"); +} + +function rejectUpgrade(socket: Duplex, statusLine: string, message: string) { + const safe = message.replace(/[\r\n]+/g, " ").trim(); + socket.write(`HTTP/1.1 ${statusLine}\r\nConnection: close\r\nContent-Type: text/plain\r\n\r\n${safe}`); + socket.destroy(); +} + +function parseCompanyId(pathname: string) { + const match = pathname.match(/^\/api\/companies\/([^/]+)\/events\/ws$/); + if (!match) return null; + + try { + return decodeURIComponent(match[1] ?? ""); + } catch { + return null; + } +} + +function parseBearerToken(rawAuth: string | string[] | undefined) { + const auth = Array.isArray(rawAuth) ? rawAuth[0] : rawAuth; + if (!auth) return null; + if (!auth.toLowerCase().startsWith("bearer ")) return null; + const token = auth.slice("bearer ".length).trim(); + return token.length > 0 ? token : null; +} + +async function authorizeUpgrade( + db: Db, + req: IncomingMessage, + companyId: string, + url: URL, +): Promise { + const queryToken = url.searchParams.get("token")?.trim() ?? ""; + const authToken = parseBearerToken(req.headers.authorization); + const token = authToken ?? (queryToken.length > 0 ? queryToken : null); + + // Browser board context has no bearer token in V1. + if (!token) { + return { + companyId, + actorType: "board", + actorId: "board", + }; + } + + const tokenHash = hashToken(token); + const key = await db + .select() + .from(agentApiKeys) + .where(and(eq(agentApiKeys.keyHash, tokenHash), isNull(agentApiKeys.revokedAt))) + .then((rows) => rows[0] ?? null); + + if (!key || key.companyId !== companyId) { + return null; + } + + await db + .update(agentApiKeys) + .set({ lastUsedAt: new Date() }) + .where(eq(agentApiKeys.id, key.id)); + + return { + companyId, + actorType: "agent", + actorId: key.agentId, + }; +} + +export function setupLiveEventsWebSocketServer(server: HttpServer, db: Db) { + const wss = new WebSocketServer({ noServer: true }); + const cleanupByClient = new Map void>(); + const aliveByClient = new Map(); + + const pingInterval = setInterval(() => { + for (const socket of wss.clients) { + if (!aliveByClient.get(socket)) { + socket.terminate(); + continue; + } + aliveByClient.set(socket, false); + socket.ping(); + } + }, 30000); + + wss.on("connection", (socket, req) => { + const context = (req as IncomingMessageWithContext).paperclipUpgradeContext; + if (!context) { + socket.close(1008, "missing context"); + return; + } + + const unsubscribe = subscribeCompanyLiveEvents(context.companyId, (event) => { + if (socket.readyState !== WebSocket.OPEN) return; + socket.send(JSON.stringify(event)); + }); + + cleanupByClient.set(socket, unsubscribe); + aliveByClient.set(socket, true); + + socket.on("pong", () => { + aliveByClient.set(socket, true); + }); + + socket.on("close", () => { + const cleanup = cleanupByClient.get(socket); + if (cleanup) cleanup(); + cleanupByClient.delete(socket); + aliveByClient.delete(socket); + }); + + socket.on("error", (err) => { + logger.warn({ err, companyId: context.companyId }, "live websocket client error"); + }); + }); + + wss.on("close", () => { + clearInterval(pingInterval); + }); + + server.on("upgrade", (req, socket, head) => { + if (!req.url) { + rejectUpgrade(socket, "400 Bad Request", "missing url"); + return; + } + + const url = new URL(req.url, "http://localhost"); + const companyId = parseCompanyId(url.pathname); + if (!companyId) { + socket.destroy(); + return; + } + + void authorizeUpgrade(db, req, companyId, url) + .then((context) => { + if (!context) { + rejectUpgrade(socket, "403 Forbidden", "forbidden"); + return; + } + + const reqWithContext = req as IncomingMessageWithContext; + reqWithContext.paperclipUpgradeContext = context; + + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit("connection", ws, reqWithContext); + }); + }) + .catch((err) => { + logger.error({ err, path: req.url }, "failed websocket upgrade authorization"); + rejectUpgrade(socket, "500 Internal Server Error", "upgrade failed"); + }); + }); + + return wss; +} diff --git a/server/src/routes/agents.ts b/server/src/routes/agents.ts index d7d3a893..d87bcb9e 100644 --- a/server/src/routes/agents.ts +++ b/server/src/routes/agents.ts @@ -3,6 +3,7 @@ import type { Db } from "@paperclip/db"; import { createAgentKeySchema, createAgentSchema, + wakeAgentSchema, updateAgentSchema, } from "@paperclip/shared"; import { validate } from "../middleware/validate.js"; @@ -39,6 +40,44 @@ export function agentRoutes(db: Db) { res.json(agent); }); + router.get("/agents/:id/runtime-state", async (req, res) => { + assertBoard(req); + const id = req.params.id as string; + const agent = await svc.getById(id); + if (!agent) { + res.status(404).json({ error: "Agent not found" }); + return; + } + assertCompanyAccess(req, agent.companyId); + + const state = await heartbeat.getRuntimeState(id); + res.json(state); + }); + + router.post("/agents/:id/runtime-state/reset-session", async (req, res) => { + assertBoard(req); + const id = req.params.id as string; + const agent = await svc.getById(id); + if (!agent) { + res.status(404).json({ error: "Agent not found" }); + return; + } + assertCompanyAccess(req, agent.companyId); + + const state = await heartbeat.resetRuntimeSession(id); + + await logActivity(db, { + companyId: agent.companyId, + actorType: "user", + actorId: req.actor.userId ?? "board", + action: "agent.runtime_session_reset", + entityType: "agent", + entityId: id, + }); + + res.json(state); + }); + router.post("/companies/:companyId/agents", validate(createAgentSchema), async (req, res) => { const companyId = req.params.companyId as string; assertCompanyAccess(req, companyId); @@ -192,6 +231,54 @@ export function agentRoutes(db: Db) { res.status(201).json(key); }); + router.post("/agents/:id/wakeup", validate(wakeAgentSchema), async (req, res) => { + const id = req.params.id as string; + const agent = await svc.getById(id); + if (!agent) { + res.status(404).json({ error: "Agent not found" }); + return; + } + assertCompanyAccess(req, agent.companyId); + + if (req.actor.type === "agent" && req.actor.agentId !== id) { + res.status(403).json({ error: "Agent can only invoke itself" }); + return; + } + + const run = await heartbeat.wakeup(id, { + source: req.body.source, + triggerDetail: req.body.triggerDetail ?? "manual", + reason: req.body.reason ?? null, + payload: req.body.payload ?? null, + idempotencyKey: req.body.idempotencyKey ?? null, + requestedByActorType: req.actor.type === "agent" ? "agent" : "user", + requestedByActorId: req.actor.type === "agent" ? req.actor.agentId ?? null : req.actor.userId ?? null, + contextSnapshot: { + triggeredBy: req.actor.type, + actorId: req.actor.type === "agent" ? req.actor.agentId : req.actor.userId, + }, + }); + + if (!run) { + res.status(202).json({ status: "skipped" }); + return; + } + + const actor = getActorInfo(req); + await logActivity(db, { + companyId: agent.companyId, + actorType: actor.actorType, + actorId: actor.actorId, + agentId: actor.agentId, + action: "heartbeat.invoked", + entityType: "heartbeat_run", + entityId: run.id, + details: { agentId: id }, + }); + + res.status(202).json(run); + }); + router.post("/agents/:id/heartbeat/invoke", async (req, res) => { const id = req.params.id as string; const agent = await svc.getById(id); @@ -206,10 +293,24 @@ export function agentRoutes(db: Db) { return; } - const run = await heartbeat.invoke(id, "manual", { - triggeredBy: req.actor.type, - actorId: req.actor.type === "agent" ? req.actor.agentId : req.actor.userId, - }); + const run = await heartbeat.invoke( + id, + "on_demand", + { + triggeredBy: req.actor.type, + actorId: req.actor.type === "agent" ? req.actor.agentId : req.actor.userId, + }, + "manual", + { + actorType: req.actor.type === "agent" ? "agent" : "user", + actorId: req.actor.type === "agent" ? req.actor.agentId ?? null : req.actor.userId ?? null, + }, + ); + + if (!run) { + res.status(202).json({ status: "skipped" }); + return; + } const actor = getActorInfo(req); await logActivity(db, { @@ -254,5 +355,39 @@ export function agentRoutes(db: Db) { res.json(run); }); + router.get("/heartbeat-runs/:runId/events", async (req, res) => { + const runId = req.params.runId as string; + const run = await heartbeat.getRun(runId); + if (!run) { + res.status(404).json({ error: "Heartbeat run not found" }); + return; + } + assertCompanyAccess(req, run.companyId); + + const afterSeq = Number(req.query.afterSeq ?? 0); + const limit = Number(req.query.limit ?? 200); + const events = await heartbeat.listEvents(runId, Number.isFinite(afterSeq) ? afterSeq : 0, Number.isFinite(limit) ? limit : 200); + res.json(events); + }); + + router.get("/heartbeat-runs/:runId/log", async (req, res) => { + const runId = req.params.runId as string; + const run = await heartbeat.getRun(runId); + if (!run) { + res.status(404).json({ error: "Heartbeat run not found" }); + return; + } + assertCompanyAccess(req, run.companyId); + + const offset = Number(req.query.offset ?? 0); + const limitBytes = Number(req.query.limitBytes ?? 256000); + const result = await heartbeat.readLog(runId, { + offset: Number.isFinite(offset) ? offset : 0, + limitBytes: Number.isFinite(limitBytes) ? limitBytes : 256000, + }); + + res.json(result); + }); + return router; } diff --git a/server/src/routes/issues.ts b/server/src/routes/issues.ts index 50efb487..0d3ba6f7 100644 --- a/server/src/routes/issues.ts +++ b/server/src/routes/issues.ts @@ -7,12 +7,14 @@ import { updateIssueSchema, } from "@paperclip/shared"; import { validate } from "../middleware/validate.js"; -import { issueService, logActivity } from "../services/index.js"; +import { heartbeatService, issueService, logActivity } from "../services/index.js"; +import { logger } from "../middleware/logger.js"; import { assertCompanyAccess, getActorInfo } from "./authz.js"; export function issueRoutes(db: Db) { const router = Router(); const svc = issueService(db); + const heartbeat = heartbeatService(db); router.get("/companies/:companyId/issues", async (req, res) => { const companyId = req.params.companyId as string; @@ -58,6 +60,20 @@ export function issueRoutes(db: Db) { details: { title: issue.title }, }); + if (issue.assigneeAgentId) { + void heartbeat + .wakeup(issue.assigneeAgentId, { + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: { issueId: issue.id, mutation: "create" }, + requestedByActorType: actor.actorType, + requestedByActorId: actor.actorId, + contextSnapshot: { issueId: issue.id, source: "issue.create" }, + }) + .catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue create")); + } + res.status(201).json(issue); }); @@ -88,6 +104,22 @@ export function issueRoutes(db: Db) { details: req.body, }); + const assigneeChanged = + req.body.assigneeAgentId !== undefined && req.body.assigneeAgentId !== existing.assigneeAgentId; + if (assigneeChanged && issue.assigneeAgentId) { + void heartbeat + .wakeup(issue.assigneeAgentId, { + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: { issueId: issue.id, mutation: "update" }, + requestedByActorType: actor.actorType, + requestedByActorId: actor.actorId, + contextSnapshot: { issueId: issue.id, source: "issue.update" }, + }) + .catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue update")); + } + res.json(issue); }); @@ -148,6 +180,18 @@ export function issueRoutes(db: Db) { details: { agentId: req.body.agentId }, }); + void heartbeat + .wakeup(req.body.agentId, { + source: "assignment", + triggerDetail: "system", + reason: "issue_checked_out", + payload: { issueId: issue.id, mutation: "checkout" }, + requestedByActorType: actor.actorType, + requestedByActorId: actor.actorId, + contextSnapshot: { issueId: issue.id, source: "issue.checkout" }, + }) + .catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue checkout")); + res.json(updated); }); diff --git a/server/src/services/activity-log.ts b/server/src/services/activity-log.ts index 4ebb180d..b6f0e6e5 100644 --- a/server/src/services/activity-log.ts +++ b/server/src/services/activity-log.ts @@ -1,5 +1,6 @@ import type { Db } from "@paperclip/db"; import { activityLog } from "@paperclip/db"; +import { publishLiveEvent } from "./live-events.js"; export interface LogActivityInput { companyId: string; @@ -23,4 +24,18 @@ export async function logActivity(db: Db, input: LogActivityInput) { agentId: input.agentId ?? null, details: input.details ?? null, }); + + publishLiveEvent({ + companyId: input.companyId, + type: "activity.logged", + payload: { + actorType: input.actorType, + actorId: input.actorId, + action: input.action, + entityType: input.entityType, + entityId: input.entityId, + agentId: input.agentId ?? null, + details: input.details ?? null, + }, + }); } diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 5a395811..8f6c9eb5 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -1,16 +1,67 @@ import { spawn, type ChildProcess } from "node:child_process"; -import { and, eq, inArray } from "drizzle-orm"; +import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; -import { agents, heartbeatRuns } from "@paperclip/db"; +import { + agents, + agentRuntimeState, + agentWakeupRequests, + heartbeatRunEvents, + heartbeatRuns, + costEvents, +} from "@paperclip/db"; import { conflict, notFound } from "../errors.js"; import { logger } from "../middleware/logger.js"; +import { publishLiveEvent } from "./live-events.js"; +import { getRunLogStore, type RunLogHandle } from "./run-log-store.js"; interface RunningProcess { child: ChildProcess; graceSec: number; } +interface RunProcessResult { + exitCode: number | null; + signal: string | null; + timedOut: boolean; + stdout: string; + stderr: string; +} + +interface UsageSummary { + inputTokens: number; + outputTokens: number; + cachedInputTokens?: number; +} + +interface AdapterExecutionResult { + exitCode: number | null; + signal: string | null; + timedOut: boolean; + errorMessage?: string | null; + usage?: UsageSummary; + sessionId?: string | null; + provider?: string | null; + model?: string | null; + costUsd?: number | null; + resultJson?: Record | null; + summary?: string | null; +} + +interface WakeupOptions { + source?: "timer" | "assignment" | "on_demand" | "automation"; + triggerDetail?: "manual" | "ping" | "callback" | "system"; + reason?: string | null; + payload?: Record | null; + idempotencyKey?: string | null; + requestedByActorType?: "user" | "agent" | "system"; + requestedByActorId?: string | null; + contextSnapshot?: Record; +} + const runningProcesses = new Map(); +const MAX_CAPTURE_BYTES = 4 * 1024 * 1024; +const MAX_EXCERPT_BYTES = 32 * 1024; +const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; function parseObject(value: unknown): Record { if (typeof value !== "object" || value === null || Array.isArray(value)) { @@ -27,11 +78,181 @@ function asNumber(value: unknown, fallback: number): number { return typeof value === "number" && Number.isFinite(value) ? value : fallback; } +function asBoolean(value: unknown, fallback: boolean): boolean { + return typeof value === "boolean" ? value : fallback; +} + function asStringArray(value: unknown): string[] { return Array.isArray(value) ? value.filter((item): item is string => typeof item === "string") : []; } +function parseJson(value: string): Record | null { + try { + return JSON.parse(value) as Record; + } catch { + return null; + } +} + +function appendWithCap(prev: string, chunk: string, cap = MAX_CAPTURE_BYTES) { + const combined = prev + chunk; + return combined.length > cap ? combined.slice(combined.length - cap) : combined; +} + +function appendExcerpt(prev: string, chunk: string) { + return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES); +} + +function resolvePathValue(obj: Record, dottedPath: string) { + const parts = dottedPath.split("."); + let cursor: unknown = obj; + + for (const part of parts) { + if (typeof cursor !== "object" || cursor === null || Array.isArray(cursor)) { + return ""; + } + cursor = (cursor as Record)[part]; + } + + if (cursor === null || cursor === undefined) return ""; + if (typeof cursor === "string") return cursor; + if (typeof cursor === "number" || typeof cursor === "boolean") return String(cursor); + + try { + return JSON.stringify(cursor); + } catch { + return ""; + } +} + +function renderTemplate(template: string, data: Record) { + return template.replace(/{{\s*([a-zA-Z0-9_.-]+)\s*}}/g, (_, path) => resolvePathValue(data, path)); +} + +function parseCodexJsonl(stdout: string) { + let sessionId: string | null = null; + const messages: string[] = []; + const usage = { + inputTokens: 0, + cachedInputTokens: 0, + outputTokens: 0, + }; + + for (const rawLine of stdout.split(/\r?\n/)) { + const line = rawLine.trim(); + if (!line) continue; + + const event = parseJson(line); + if (!event) continue; + + const type = asString(event.type, ""); + if (type === "thread.started") { + sessionId = asString(event.thread_id, sessionId ?? "") || sessionId; + continue; + } + + if (type === "item.completed") { + const item = parseObject(event.item); + if (asString(item.type, "") === "agent_message") { + const text = asString(item.text, ""); + if (text) messages.push(text); + } + continue; + } + + if (type === "turn.completed") { + const usageObj = parseObject(event.usage); + usage.inputTokens = asNumber(usageObj.input_tokens, usage.inputTokens); + usage.cachedInputTokens = asNumber(usageObj.cached_input_tokens, usage.cachedInputTokens); + usage.outputTokens = asNumber(usageObj.output_tokens, usage.outputTokens); + } + } + + return { + sessionId, + summary: messages.join("\n\n").trim(), + usage, + }; +} + +async function runChildProcess( + runId: string, + command: string, + args: string[], + opts: { + cwd: string; + env: Record; + timeoutSec: number; + graceSec: number; + onLog: (stream: "stdout" | "stderr", chunk: string) => Promise; + }, +): Promise { + return new Promise((resolve, reject) => { + const child = spawn(command, args, { + cwd: opts.cwd, + env: { ...process.env, ...opts.env }, + shell: false, + stdio: ["ignore", "pipe", "pipe"], + }); + + runningProcesses.set(runId, { child, graceSec: opts.graceSec }); + + let timedOut = false; + let stdout = ""; + let stderr = ""; + let logChain: Promise = Promise.resolve(); + + const timeout = setTimeout(() => { + timedOut = true; + child.kill("SIGTERM"); + setTimeout(() => { + if (!child.killed) { + child.kill("SIGKILL"); + } + }, Math.max(1, opts.graceSec) * 1000); + }, Math.max(1, opts.timeoutSec) * 1000); + + child.stdout?.on("data", (chunk) => { + const text = String(chunk); + stdout = appendWithCap(stdout, text); + logChain = logChain + .then(() => opts.onLog("stdout", text)) + .catch((err) => logger.warn({ err, runId }, "failed to append stdout log chunk")); + }); + + child.stderr?.on("data", (chunk) => { + const text = String(chunk); + stderr = appendWithCap(stderr, text); + logChain = logChain + .then(() => opts.onLog("stderr", text)) + .catch((err) => logger.warn({ err, runId }, "failed to append stderr log chunk")); + }); + + child.on("error", (err) => { + clearTimeout(timeout); + runningProcesses.delete(runId); + reject(err); + }); + + child.on("close", (code, signal) => { + clearTimeout(timeout); + runningProcesses.delete(runId); + void logChain.finally(() => { + resolve({ + exitCode: code, + signal, + timedOut, + stdout, + stderr, + }); + }); + }); + }); +} + export function heartbeatService(db: Db) { + const runLogStore = getRunLogStore(); + async function getAgent(agentId: string) { return db .select() @@ -40,20 +261,142 @@ export function heartbeatService(db: Db) { .then((rows) => rows[0] ?? null); } + async function getRun(runId: string) { + return db + .select() + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, runId)) + .then((rows) => rows[0] ?? null); + } + + async function getRuntimeState(agentId: string) { + return db + .select() + .from(agentRuntimeState) + .where(eq(agentRuntimeState.agentId, agentId)) + .then((rows) => rows[0] ?? null); + } + + async function ensureRuntimeState(agent: typeof agents.$inferSelect) { + const existing = await getRuntimeState(agent.id); + if (existing) return existing; + + return db + .insert(agentRuntimeState) + .values({ + agentId: agent.id, + companyId: agent.companyId, + adapterType: agent.adapterType, + stateJson: {}, + }) + .returning() + .then((rows) => rows[0]); + } + async function setRunStatus( runId: string, status: string, patch?: Partial, ) { - return db + const updated = await db .update(heartbeatRuns) .set({ status, ...patch, updatedAt: new Date() }) .where(eq(heartbeatRuns.id, runId)) .returning() .then((rows) => rows[0] ?? null); + + if (updated) { + publishLiveEvent({ + companyId: updated.companyId, + type: "heartbeat.run.status", + payload: { + runId: updated.id, + agentId: updated.agentId, + status: updated.status, + invocationSource: updated.invocationSource, + triggerDetail: updated.triggerDetail, + error: updated.error ?? null, + errorCode: updated.errorCode ?? null, + startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null, + finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null, + }, + }); + } + + return updated; } - async function finalizeAgentStatus(agentId: string, ok: boolean) { + async function setWakeupStatus( + wakeupRequestId: string | null | undefined, + status: string, + patch?: Partial, + ) { + if (!wakeupRequestId) return; + await db + .update(agentWakeupRequests) + .set({ status, ...patch, updatedAt: new Date() }) + .where(eq(agentWakeupRequests.id, wakeupRequestId)); + } + + async function appendRunEvent( + run: typeof heartbeatRuns.$inferSelect, + seq: number, + event: { + eventType: string; + stream?: "system" | "stdout" | "stderr"; + level?: "info" | "warn" | "error"; + color?: string; + message?: string; + payload?: Record; + }, + ) { + await db.insert(heartbeatRunEvents).values({ + companyId: run.companyId, + runId: run.id, + agentId: run.agentId, + seq, + eventType: event.eventType, + stream: event.stream, + level: event.level, + color: event.color, + message: event.message, + payload: event.payload, + }); + + publishLiveEvent({ + companyId: run.companyId, + type: "heartbeat.run.event", + payload: { + runId: run.id, + agentId: run.agentId, + seq, + eventType: event.eventType, + stream: event.stream ?? null, + level: event.level ?? null, + color: event.color ?? null, + message: event.message ?? null, + payload: event.payload ?? null, + }, + }); + } + + function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) { + const runtimeConfig = parseObject(agent.runtimeConfig); + const heartbeat = parseObject(runtimeConfig.heartbeat); + + return { + enabled: asBoolean(heartbeat.enabled, true), + intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)), + wakeOnAssignment: asBoolean(heartbeat.wakeOnAssignment, true), + wakeOnOnDemand: asBoolean(heartbeat.wakeOnOnDemand, true), + wakeOnAutomation: asBoolean(heartbeat.wakeOnAutomation, true), + }; + } + + async function finalizeAgentStatus( + agentId: string, + outcome: "succeeded" | "failed" | "cancelled" | "timed_out", + ) { const existing = await getAgent(agentId); if (!existing) return; @@ -61,17 +404,92 @@ export function heartbeatService(db: Db) { return; } - await db + const nextStatus = + outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error"; + + const updated = await db .update(agents) .set({ - status: ok ? "idle" : "error", + status: nextStatus, lastHeartbeatAt: new Date(), updatedAt: new Date(), }) - .where(eq(agents.id, agentId)); + .where(eq(agents.id, agentId)) + .returning() + .then((rows) => rows[0] ?? null); + + if (updated) { + publishLiveEvent({ + companyId: updated.companyId, + type: "agent.status", + payload: { + agentId: updated.id, + status: updated.status, + lastHeartbeatAt: updated.lastHeartbeatAt + ? new Date(updated.lastHeartbeatAt).toISOString() + : null, + outcome, + }, + }); + } } - async function executeHttpRun(runId: string, agentId: string, config: Record, context: Record) { + async function updateRuntimeState( + agent: typeof agents.$inferSelect, + run: typeof heartbeatRuns.$inferSelect, + result: AdapterExecutionResult, + ) { + const existing = await ensureRuntimeState(agent); + const usage = result.usage; + const inputTokens = usage?.inputTokens ?? 0; + const outputTokens = usage?.outputTokens ?? 0; + const cachedInputTokens = usage?.cachedInputTokens ?? 0; + const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100)); + + await db + .update(agentRuntimeState) + .set({ + adapterType: agent.adapterType, + sessionId: result.sessionId ?? existing.sessionId, + lastRunId: run.id, + lastRunStatus: run.status, + lastError: result.errorMessage ?? null, + totalInputTokens: existing.totalInputTokens + inputTokens, + totalOutputTokens: existing.totalOutputTokens + outputTokens, + totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens, + totalCostCents: existing.totalCostCents + additionalCostCents, + updatedAt: new Date(), + }) + .where(eq(agentRuntimeState.agentId, agent.id)); + + if (additionalCostCents > 0) { + await db.insert(costEvents).values({ + companyId: agent.companyId, + agentId: agent.id, + provider: result.provider ?? "unknown", + model: result.model ?? "unknown", + inputTokens, + outputTokens, + costCents: additionalCostCents, + occurredAt: new Date(), + }); + + await db + .update(agents) + .set({ + spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`, + updatedAt: new Date(), + }) + .where(eq(agents.id, agent.id)); + } + } + + async function executeHttpRun( + runId: string, + agentId: string, + config: Record, + context: Record, + ): Promise { const url = asString(config.url, ""); if (!url) throw new Error("HTTP adapter missing url"); @@ -98,6 +516,13 @@ export function heartbeatService(db: Db) { if (!res.ok) { throw new Error(`HTTP invoke failed with status ${res.status}`); } + + return { + exitCode: 0, + signal: null, + timedOut: false, + summary: `HTTP ${method} ${url}`, + }; } finally { clearTimeout(timer); } @@ -105,14 +530,14 @@ export function heartbeatService(db: Db) { async function executeProcessRun( runId: string, - _agentId: string, config: Record, - ) { + onLog: (stream: "stdout" | "stderr", chunk: string) => Promise, + ): Promise { const command = asString(config.command, ""); if (!command) throw new Error("Process adapter missing command"); const args = asStringArray(config.args); - const cwd = typeof config.cwd === "string" ? config.cwd : process.cwd(); + const cwd = asString(config.cwd, process.cwd()); const envConfig = parseObject(config.env); const env: Record = {}; for (const [k, v] of Object.entries(envConfig)) { @@ -122,180 +547,733 @@ export function heartbeatService(db: Db) { const timeoutSec = asNumber(config.timeoutSec, 900); const graceSec = asNumber(config.graceSec, 15); - await new Promise((resolve, reject) => { - const child = spawn(command, args, { - cwd, - env: { ...process.env, ...env }, - }); - - runningProcesses.set(runId, { child, graceSec }); - - const timeout = setTimeout(async () => { - child.kill("SIGTERM"); - await setRunStatus(runId, "timed_out", { - error: `Timed out after ${timeoutSec}s`, - finishedAt: new Date(), - }); - runningProcesses.delete(runId); - resolve(); - }, timeoutSec * 1000); - - child.stdout?.on("data", (chunk) => { - logger.info({ runId, output: String(chunk) }, "agent process stdout"); - }); - child.stderr?.on("data", (chunk) => { - logger.warn({ runId, output: String(chunk) }, "agent process stderr"); - }); - - child.on("error", (err) => { - clearTimeout(timeout); - runningProcesses.delete(runId); - reject(err); - }); - - child.on("exit", (code, signal) => { - clearTimeout(timeout); - runningProcesses.delete(runId); - - if (signal) { - resolve(); - return; - } - - if (code === 0) { - resolve(); - return; - } - - reject(new Error(`Process exited with code ${code ?? -1}`)); - }); + const proc = await runChildProcess(runId, command, args, { + cwd, + env, + timeoutSec, + graceSec, + onLog, }); + + if (proc.timedOut) { + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: true, + errorMessage: `Timed out after ${timeoutSec}s`, + }; + } + + if ((proc.exitCode ?? 0) !== 0) { + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: false, + errorMessage: `Process exited with code ${proc.exitCode ?? -1}`, + resultJson: { + stdout: proc.stdout, + stderr: proc.stderr, + }, + }; + } + + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: false, + resultJson: { + stdout: proc.stdout, + stderr: proc.stderr, + }, + }; + } + + async function executeClaudeLocalRun( + runId: string, + agent: typeof agents.$inferSelect, + runtime: typeof agentRuntimeState.$inferSelect, + config: Record, + context: Record, + onLog: (stream: "stdout" | "stderr", chunk: string) => Promise, + ): Promise { + const promptTemplate = asString( + config.promptTemplate, + "You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.", + ); + const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate); + const model = asString(config.model, ""); + const maxTurns = asNumber(config.maxTurnsPerRun, 0); + const dangerouslySkipPermissions = asBoolean(config.dangerouslySkipPermissions, false); + + const cwd = asString(config.cwd, process.cwd()); + const envConfig = parseObject(config.env); + const env: Record = {}; + for (const [k, v] of Object.entries(envConfig)) { + if (typeof v === "string") env[k] = v; + } + + const timeoutSec = asNumber(config.timeoutSec, 1800); + const graceSec = asNumber(config.graceSec, 20); + const extraArgs = asStringArray(config.extraArgs); + + const sessionId = runtime.sessionId; + const template = sessionId ? promptTemplate : bootstrapTemplate; + const prompt = renderTemplate(template, { + company: { id: agent.companyId }, + agent, + run: { id: runId, source: "on_demand" }, + context, + }); + + const args = ["--print", prompt, "--output-format", "json"]; + if (sessionId) args.push("--resume", sessionId); + if (dangerouslySkipPermissions) args.push("--dangerously-skip-permissions"); + if (model) args.push("--model", model); + if (maxTurns > 0) args.push("--max-turns", String(maxTurns)); + if (extraArgs.length > 0) args.push(...extraArgs); + + const proc = await runChildProcess(runId, "claude", args, { + cwd, + env, + timeoutSec, + graceSec, + onLog, + }); + + if (proc.timedOut) { + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: true, + errorMessage: `Timed out after ${timeoutSec}s`, + }; + } + + const parsed = parseJson(proc.stdout); + if (!parsed) { + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: false, + errorMessage: + (proc.exitCode ?? 0) === 0 + ? "Failed to parse claude JSON output" + : `Claude exited with code ${proc.exitCode ?? -1}`, + resultJson: { + stdout: proc.stdout, + stderr: proc.stderr, + }, + }; + } + + const usageObj = parseObject(parsed.usage); + const usage: UsageSummary = { + inputTokens: asNumber(usageObj.input_tokens, 0), + cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0), + outputTokens: asNumber(usageObj.output_tokens, 0), + }; + + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: false, + errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Claude exited with code ${proc.exitCode ?? -1}`, + usage, + sessionId: asString(parsed.session_id, runtime.sessionId ?? "") || runtime.sessionId, + provider: "anthropic", + model: asString(parsed.model, model), + costUsd: asNumber(parsed.total_cost_usd, 0), + resultJson: parsed, + summary: asString(parsed.result, ""), + }; + } + + async function executeCodexLocalRun( + runId: string, + agent: typeof agents.$inferSelect, + runtime: typeof agentRuntimeState.$inferSelect, + config: Record, + context: Record, + onLog: (stream: "stdout" | "stderr", chunk: string) => Promise, + ): Promise { + const promptTemplate = asString( + config.promptTemplate, + "You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.", + ); + const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate); + const model = asString(config.model, ""); + const search = asBoolean(config.search, false); + const bypass = asBoolean(config.dangerouslyBypassApprovalsAndSandbox, false); + + const cwd = asString(config.cwd, process.cwd()); + const envConfig = parseObject(config.env); + const env: Record = {}; + for (const [k, v] of Object.entries(envConfig)) { + if (typeof v === "string") env[k] = v; + } + + const timeoutSec = asNumber(config.timeoutSec, 1800); + const graceSec = asNumber(config.graceSec, 20); + const extraArgs = asStringArray(config.extraArgs); + + const sessionId = runtime.sessionId; + const template = sessionId ? promptTemplate : bootstrapTemplate; + const prompt = renderTemplate(template, { + company: { id: agent.companyId }, + agent, + run: { id: runId, source: "on_demand" }, + context, + }); + + const args = ["exec", "--json"]; + if (search) args.unshift("--search"); + if (bypass) args.push("--dangerously-bypass-approvals-and-sandbox"); + if (model) args.push("--model", model); + if (extraArgs.length > 0) args.push(...extraArgs); + if (sessionId) args.push("resume", sessionId, prompt); + else args.push(prompt); + + const proc = await runChildProcess(runId, "codex", args, { + cwd, + env, + timeoutSec, + graceSec, + onLog, + }); + + if (proc.timedOut) { + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: true, + errorMessage: `Timed out after ${timeoutSec}s`, + }; + } + + const parsed = parseCodexJsonl(proc.stdout); + + return { + exitCode: proc.exitCode, + signal: proc.signal, + timedOut: false, + errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Codex exited with code ${proc.exitCode ?? -1}`, + usage: parsed.usage, + sessionId: parsed.sessionId ?? runtime.sessionId, + provider: "openai", + model, + costUsd: null, + resultJson: { + stdout: proc.stdout, + stderr: proc.stderr, + }, + summary: parsed.summary, + }; } async function executeRun(runId: string) { - const run = await db - .select() - .from(heartbeatRuns) - .where(eq(heartbeatRuns.id, runId)) - .then((rows) => rows[0] ?? null); - - if (!run) { - return; - } + const run = await getRun(runId); + if (!run) return; + if (run.status !== "queued" && run.status !== "running") return; const agent = await getAgent(run.agentId); if (!agent) { await setRunStatus(runId, "failed", { error: "Agent not found", + errorCode: "agent_not_found", finishedAt: new Date(), }); + await setWakeupStatus(run.wakeupRequestId, "failed", { + finishedAt: new Date(), + error: "Agent not found", + }); return; } - await setRunStatus(run.id, "running", { startedAt: new Date() }); - await db - .update(agents) - .set({ status: "running", updatedAt: new Date() }) - .where(eq(agents.id, agent.id)); + const runtime = await ensureRuntimeState(agent); + + let seq = 1; + let handle: RunLogHandle | null = null; + let stdoutExcerpt = ""; + let stderrExcerpt = ""; try { + await setRunStatus(runId, "running", { + startedAt: new Date(), + sessionIdBefore: runtime.sessionId, + }); + await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() }); + + const runningAgent = await db + .update(agents) + .set({ status: "running", updatedAt: new Date() }) + .where(eq(agents.id, agent.id)) + .returning() + .then((rows) => rows[0] ?? null); + + if (runningAgent) { + publishLiveEvent({ + companyId: runningAgent.companyId, + type: "agent.status", + payload: { + agentId: runningAgent.id, + status: runningAgent.status, + outcome: "running", + }, + }); + } + + const currentRun = (await getRun(runId)) ?? run; + await appendRunEvent(currentRun, seq++, { + eventType: "lifecycle", + stream: "system", + level: "info", + message: "run started", + }); + + handle = await runLogStore.begin({ + companyId: run.companyId, + agentId: run.agentId, + runId, + }); + + await db + .update(heartbeatRuns) + .set({ + logStore: handle.store, + logRef: handle.logRef, + updatedAt: new Date(), + }) + .where(eq(heartbeatRuns.id, runId)); + + const onLog = async (stream: "stdout" | "stderr", chunk: string) => { + if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk); + if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk); + + if (handle) { + await runLogStore.append(handle, { + stream, + chunk, + ts: new Date().toISOString(), + }); + } + + const payloadChunk = + chunk.length > MAX_LIVE_LOG_CHUNK_BYTES + ? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES) + : chunk; + + publishLiveEvent({ + companyId: run.companyId, + type: "heartbeat.run.log", + payload: { + runId: run.id, + agentId: run.agentId, + stream, + chunk: payloadChunk, + truncated: payloadChunk.length !== chunk.length, + }, + }); + }; + const config = parseObject(agent.adapterConfig); const context = (run.contextSnapshot ?? {}) as Record; + let adapterResult: AdapterExecutionResult; if (agent.adapterType === "http") { - await executeHttpRun(run.id, agent.id, config, context); + adapterResult = await executeHttpRun(run.id, agent.id, config, context); + } else if (agent.adapterType === "claude_local") { + adapterResult = await executeClaudeLocalRun(run.id, agent, runtime, config, context, onLog); + } else if (agent.adapterType === "codex_local") { + adapterResult = await executeCodexLocalRun(run.id, agent, runtime, config, context, onLog); } else { - await executeProcessRun(run.id, agent.id, config); + adapterResult = await executeProcessRun(run.id, config, onLog); } - const latestRun = await db - .select() - .from(heartbeatRuns) - .where(eq(heartbeatRuns.id, run.id)) - .then((rows) => rows[0] ?? null); - - if (latestRun?.status === "timed_out" || latestRun?.status === "cancelled") { - await finalizeAgentStatus(agent.id, false); - return; + let outcome: "succeeded" | "failed" | "cancelled" | "timed_out"; + const latestRun = await getRun(run.id); + if (latestRun?.status === "cancelled") { + outcome = "cancelled"; + } else if (adapterResult.timedOut) { + outcome = "timed_out"; + } else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) { + outcome = "succeeded"; + } else { + outcome = "failed"; } - await setRunStatus(run.id, "succeeded", { finishedAt: new Date(), error: null }); - await finalizeAgentStatus(agent.id, true); + let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; + if (handle) { + logSummary = await runLogStore.finalize(handle); + } + + const status = + outcome === "succeeded" + ? "succeeded" + : outcome === "cancelled" + ? "cancelled" + : outcome === "timed_out" + ? "timed_out" + : "failed"; + + await setRunStatus(run.id, status, { + finishedAt: new Date(), + error: + outcome === "succeeded" + ? null + : adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"), + errorCode: + outcome === "timed_out" + ? "timeout" + : outcome === "cancelled" + ? "cancelled" + : outcome === "failed" + ? "adapter_failed" + : null, + exitCode: adapterResult.exitCode, + signal: adapterResult.signal, + usageJson: (adapterResult.usage ?? null) as Record | null, + resultJson: adapterResult.resultJson ?? null, + sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId, + stdoutExcerpt, + stderrExcerpt, + logBytes: logSummary?.bytes, + logSha256: logSummary?.sha256, + logCompressed: logSummary?.compressed ?? false, + }); + + await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, { + finishedAt: new Date(), + error: adapterResult.errorMessage ?? null, + }); + + const finalizedRun = await getRun(run.id); + if (finalizedRun) { + await appendRunEvent(finalizedRun, seq++, { + eventType: "lifecycle", + stream: "system", + level: outcome === "succeeded" ? "info" : "error", + message: `run ${outcome}`, + payload: { + status, + exitCode: adapterResult.exitCode, + }, + }); + } + + if (finalizedRun) { + await updateRuntimeState(agent, finalizedRun, adapterResult); + } + await finalizeAgentStatus(agent.id, outcome); } catch (err) { const message = err instanceof Error ? err.message : "Unknown adapter failure"; - await setRunStatus(run.id, "failed", { + logger.error({ err, runId }, "heartbeat execution failed"); + + let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; + if (handle) { + try { + logSummary = await runLogStore.finalize(handle); + } catch (finalizeErr) { + logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error"); + } + } + + const failedRun = await setRunStatus(run.id, "failed", { error: message, + errorCode: "adapter_failed", + finishedAt: new Date(), + stdoutExcerpt, + stderrExcerpt, + logBytes: logSummary?.bytes, + logSha256: logSummary?.sha256, + logCompressed: logSummary?.compressed ?? false, + }); + await setWakeupStatus(run.wakeupRequestId, "failed", { + finishedAt: new Date(), + error: message, + }); + + if (failedRun) { + await appendRunEvent(failedRun, seq++, { + eventType: "error", + stream: "system", + level: "error", + message, + }); + + await updateRuntimeState(agent, failedRun, { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: message, + }); + } + + await finalizeAgentStatus(agent.id, "failed"); + } + } + + async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) { + const source = opts.source ?? "on_demand"; + const triggerDetail = opts.triggerDetail ?? null; + const contextSnapshot = opts.contextSnapshot ?? {}; + + const agent = await getAgent(agentId); + if (!agent) throw notFound("Agent not found"); + + if (agent.status === "paused" || agent.status === "terminated") { + throw conflict("Agent is not invokable in its current state", { status: agent.status }); + } + + const policy = parseHeartbeatPolicy(agent); + const writeSkippedRequest = async (reason: string) => { + await db.insert(agentWakeupRequests).values({ + companyId: agent.companyId, + agentId, + source, + triggerDetail, + reason, + payload: opts.payload ?? null, + status: "skipped", + requestedByActorType: opts.requestedByActorType ?? null, + requestedByActorId: opts.requestedByActorId ?? null, + idempotencyKey: opts.idempotencyKey ?? null, finishedAt: new Date(), }); - await finalizeAgentStatus(agent.id, false); + }; + + if (source === "timer" && !policy.enabled) { + await writeSkippedRequest("heartbeat.disabled"); + return null; } + if (source === "assignment" && !policy.wakeOnAssignment) { + await writeSkippedRequest("heartbeat.wakeOnAssignment.disabled"); + return null; + } + if (source === "automation" && !policy.wakeOnAutomation) { + await writeSkippedRequest("heartbeat.wakeOnAutomation.disabled"); + return null; + } + if (source === "on_demand" && triggerDetail === "ping" && !policy.wakeOnOnDemand) { + await writeSkippedRequest("heartbeat.wakeOnOnDemand.disabled"); + return null; + } + + const activeRun = await db + .select() + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) + .orderBy(desc(heartbeatRuns.createdAt)) + .then((rows) => rows[0] ?? null); + + if (activeRun) { + await db.insert(agentWakeupRequests).values({ + companyId: agent.companyId, + agentId, + source, + triggerDetail, + reason: opts.reason ?? null, + payload: opts.payload ?? null, + status: "coalesced", + coalescedCount: 1, + requestedByActorType: opts.requestedByActorType ?? null, + requestedByActorId: opts.requestedByActorId ?? null, + idempotencyKey: opts.idempotencyKey ?? null, + runId: activeRun.id, + finishedAt: new Date(), + }); + return activeRun; + } + + const wakeupRequest = await db + .insert(agentWakeupRequests) + .values({ + companyId: agent.companyId, + agentId, + source, + triggerDetail, + reason: opts.reason ?? null, + payload: opts.payload ?? null, + status: "queued", + requestedByActorType: opts.requestedByActorType ?? null, + requestedByActorId: opts.requestedByActorId ?? null, + idempotencyKey: opts.idempotencyKey ?? null, + }) + .returning() + .then((rows) => rows[0]); + + const runtime = await getRuntimeState(agent.id); + + const run = await db + .insert(heartbeatRuns) + .values({ + companyId: agent.companyId, + agentId, + invocationSource: source, + triggerDetail, + status: "queued", + wakeupRequestId: wakeupRequest.id, + contextSnapshot, + sessionIdBefore: runtime?.sessionId ?? null, + }) + .returning() + .then((rows) => rows[0]); + + await db + .update(agentWakeupRequests) + .set({ + runId: run.id, + updatedAt: new Date(), + }) + .where(eq(agentWakeupRequests.id, wakeupRequest.id)); + + publishLiveEvent({ + companyId: run.companyId, + type: "heartbeat.run.queued", + payload: { + runId: run.id, + agentId: run.agentId, + invocationSource: run.invocationSource, + triggerDetail: run.triggerDetail, + wakeupRequestId: run.wakeupRequestId, + }, + }); + + void executeRun(run.id).catch((err) => { + logger.error({ err, runId: run.id }, "heartbeat execution failed"); + }); + + return run; } return { list: (companyId: string, agentId?: string) => { if (!agentId) { - return db.select().from(heartbeatRuns).where(eq(heartbeatRuns.companyId, companyId)); + return db + .select() + .from(heartbeatRuns) + .where(eq(heartbeatRuns.companyId, companyId)) + .orderBy(desc(heartbeatRuns.createdAt)); } return db .select() .from(heartbeatRuns) - .where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId))); + .where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId))) + .orderBy(desc(heartbeatRuns.createdAt)); + }, + + getRun, + + getRuntimeState: async (agentId: string) => { + const state = await getRuntimeState(agentId); + if (state) return state; + + const agent = await getAgent(agentId); + if (!agent) return null; + return ensureRuntimeState(agent); + }, + + resetRuntimeSession: async (agentId: string) => { + const agent = await getAgent(agentId); + if (!agent) throw notFound("Agent not found"); + await ensureRuntimeState(agent); + + return db + .update(agentRuntimeState) + .set({ + sessionId: null, + stateJson: {}, + lastError: null, + updatedAt: new Date(), + }) + .where(eq(agentRuntimeState.agentId, agentId)) + .returning() + .then((rows) => rows[0] ?? null); + }, + + listEvents: (runId: string, afterSeq = 0, limit = 200) => + db + .select() + .from(heartbeatRunEvents) + .where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq))) + .orderBy(asc(heartbeatRunEvents.seq)) + .limit(Math.max(1, Math.min(limit, 1000))), + + readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => { + const run = await getRun(runId); + if (!run) throw notFound("Heartbeat run not found"); + if (!run.logStore || !run.logRef) throw notFound("Run log not found"); + + const result = await runLogStore.read( + { + store: run.logStore as "local_file", + logRef: run.logRef, + }, + opts, + ); + + return { + runId, + store: run.logStore, + logRef: run.logRef, + ...result, + }; }, invoke: async ( agentId: string, - invocationSource: "scheduler" | "manual" | "callback" = "manual", + source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand", contextSnapshot: Record = {}, - ) => { - const agent = await getAgent(agentId); - if (!agent) throw notFound("Agent not found"); + triggerDetail: "manual" | "ping" | "callback" | "system" = "manual", + actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null }, + ) => + enqueueWakeup(agentId, { + source, + triggerDetail, + contextSnapshot, + requestedByActorType: actor?.actorType, + requestedByActorId: actor?.actorId ?? null, + }), - if (agent.status === "paused" || agent.status === "terminated") { - throw conflict("Agent is not invokable in its current state", { status: agent.status }); + wakeup: enqueueWakeup, + + tickTimers: async (now = new Date()) => { + const allAgents = await db.select().from(agents); + let checked = 0; + let enqueued = 0; + let skipped = 0; + + for (const agent of allAgents) { + if (agent.status === "paused" || agent.status === "terminated") continue; + const policy = parseHeartbeatPolicy(agent); + if (!policy.enabled || policy.intervalSec <= 0) continue; + + checked += 1; + const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0; + const elapsedMs = now.getTime() - last; + if (last && elapsedMs < policy.intervalSec * 1000) continue; + + const run = await enqueueWakeup(agent.id, { + source: "timer", + triggerDetail: "system", + reason: "heartbeat_timer", + requestedByActorType: "system", + requestedByActorId: "heartbeat_scheduler", + contextSnapshot: { + source: "scheduler", + reason: "interval_elapsed", + now: now.toISOString(), + }, + }); + if (run) enqueued += 1; + else skipped += 1; } - const activeRun = await db - .select({ id: heartbeatRuns.id }) - .from(heartbeatRuns) - .where( - and( - eq(heartbeatRuns.agentId, agentId), - inArray(heartbeatRuns.status, ["queued", "running"]), - ), - ) - .then((rows) => rows[0] ?? null); - - if (activeRun) { - throw conflict("Agent already has an active heartbeat run", { runId: activeRun.id }); - } - - const run = await db - .insert(heartbeatRuns) - .values({ - companyId: agent.companyId, - agentId, - invocationSource, - status: "queued", - contextSnapshot, - }) - .returning() - .then((rows) => rows[0]); - - void executeRun(run.id).catch((err) => { - logger.error({ err, runId: run.id }, "heartbeat execution failed"); - }); - - return run; + return { checked, enqueued, skipped }; }, cancelRun: async (runId: string) => { - const run = await db - .select() - .from(heartbeatRuns) - .where(eq(heartbeatRuns.id, runId)) - .then((rows) => rows[0] ?? null); - + const run = await getRun(runId); if (!run) throw notFound("Heartbeat run not found"); if (run.status !== "running" && run.status !== "queued") return run; @@ -313,9 +1291,25 @@ export function heartbeatService(db: Db) { const cancelled = await setRunStatus(run.id, "cancelled", { finishedAt: new Date(), error: "Cancelled by control plane", + errorCode: "cancelled", }); + await setWakeupStatus(run.wakeupRequestId, "cancelled", { + finishedAt: new Date(), + error: "Cancelled by control plane", + }); + + if (cancelled) { + await appendRunEvent(cancelled, 1, { + eventType: "lifecycle", + stream: "system", + level: "warn", + message: "run cancelled", + }); + } + runningProcesses.delete(run.id); + await finalizeAgentStatus(run.agentId, "cancelled"); return cancelled; }, @@ -323,23 +1317,19 @@ export function heartbeatService(db: Db) { const runs = await db .select() .from(heartbeatRuns) - .where( - and( - eq(heartbeatRuns.agentId, agentId), - inArray(heartbeatRuns.status, ["queued", "running"]), - ), - ); + .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))); for (const run of runs) { - await db - .update(heartbeatRuns) - .set({ - status: "cancelled", - finishedAt: new Date(), - error: "Cancelled due to agent pause", - updatedAt: new Date(), - }) - .where(eq(heartbeatRuns.id, run.id)); + await setRunStatus(run.id, "cancelled", { + finishedAt: new Date(), + error: "Cancelled due to agent pause", + errorCode: "cancelled", + }); + + await setWakeupStatus(run.wakeupRequestId, "cancelled", { + finishedAt: new Date(), + error: "Cancelled due to agent pause", + }); const running = runningProcesses.get(run.id); if (running) { diff --git a/server/src/services/index.ts b/server/src/services/index.ts index d5cba30a..6675880d 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -9,3 +9,4 @@ export { costService } from "./costs.js"; export { heartbeatService } from "./heartbeat.js"; export { dashboardService } from "./dashboard.js"; export { logActivity, type LogActivityInput } from "./activity-log.js"; +export { publishLiveEvent, subscribeCompanyLiveEvents } from "./live-events.js"; diff --git a/server/src/services/live-events.ts b/server/src/services/live-events.ts new file mode 100644 index 00000000..97f344ea --- /dev/null +++ b/server/src/services/live-events.ts @@ -0,0 +1,40 @@ +import { EventEmitter } from "node:events"; +import type { LiveEvent, LiveEventType } from "@paperclip/shared"; + +type LiveEventPayload = Record; +type LiveEventListener = (event: LiveEvent) => void; + +const emitter = new EventEmitter(); +emitter.setMaxListeners(0); + +let nextEventId = 0; + +function toLiveEvent(input: { + companyId: string; + type: LiveEventType; + payload?: LiveEventPayload; +}): LiveEvent { + nextEventId += 1; + return { + id: nextEventId, + companyId: input.companyId, + type: input.type, + createdAt: new Date().toISOString(), + payload: input.payload ?? {}, + }; +} + +export function publishLiveEvent(input: { + companyId: string; + type: LiveEventType; + payload?: LiveEventPayload; +}) { + const event = toLiveEvent(input); + emitter.emit(input.companyId, event); + return event; +} + +export function subscribeCompanyLiveEvents(companyId: string, listener: LiveEventListener) { + emitter.on(companyId, listener); + return () => emitter.off(companyId, listener); +} diff --git a/server/src/services/run-log-store.ts b/server/src/services/run-log-store.ts new file mode 100644 index 00000000..4ce60366 --- /dev/null +++ b/server/src/services/run-log-store.ts @@ -0,0 +1,159 @@ +import { createReadStream, createWriteStream, promises as fs } from "node:fs"; +import path from "node:path"; +import { createHash } from "node:crypto"; +import { notFound } from "../errors.js"; + +export type RunLogStoreType = "local_file"; + +export interface RunLogHandle { + store: RunLogStoreType; + logRef: string; +} + +export interface RunLogReadOptions { + offset?: number; + limitBytes?: number; +} + +export interface RunLogReadResult { + content: string; + nextOffset?: number; +} + +export interface RunLogFinalizeSummary { + bytes: number; + sha256?: string; + compressed: boolean; +} + +export interface RunLogStore { + begin(input: { companyId: string; agentId: string; runId: string }): Promise; + append( + handle: RunLogHandle, + event: { stream: "stdout" | "stderr" | "system"; chunk: string; ts: string }, + ): Promise; + finalize(handle: RunLogHandle): Promise; + read(handle: RunLogHandle, opts?: RunLogReadOptions): Promise; +} + +function safeSegments(...segments: string[]) { + return segments.map((segment) => segment.replace(/[^a-zA-Z0-9._-]/g, "_")); +} + +function resolveWithin(basePath: string, relativePath: string) { + const resolved = path.resolve(basePath, relativePath); + const base = path.resolve(basePath) + path.sep; + if (!resolved.startsWith(base) && resolved !== path.resolve(basePath)) { + throw new Error("Invalid log path"); + } + return resolved; +} + +function createLocalFileRunLogStore(basePath: string): RunLogStore { + async function ensureDir(relativeDir: string) { + const dir = resolveWithin(basePath, relativeDir); + await fs.mkdir(dir, { recursive: true }); + } + + async function readFileRange(filePath: string, offset: number, limitBytes: number): Promise { + const stat = await fs.stat(filePath).catch(() => null); + if (!stat) throw notFound("Run log not found"); + + const start = Math.max(0, Math.min(offset, stat.size)); + const end = Math.max(start, Math.min(start + limitBytes - 1, stat.size - 1)); + + if (start > end) { + return { content: "", nextOffset: start }; + } + + const chunks: Buffer[] = []; + await new Promise((resolve, reject) => { + const stream = createReadStream(filePath, { start, end }); + stream.on("data", (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + stream.on("error", reject); + stream.on("end", () => resolve()); + }); + + const content = Buffer.concat(chunks).toString("utf8"); + const nextOffset = end + 1 < stat.size ? end + 1 : undefined; + return { content, nextOffset }; + } + + async function sha256File(filePath: string): Promise { + return new Promise((resolve, reject) => { + const hash = createHash("sha256"); + const stream = createReadStream(filePath); + stream.on("data", (chunk) => hash.update(chunk)); + stream.on("error", reject); + stream.on("end", () => resolve(hash.digest("hex"))); + }); + } + + return { + async begin(input) { + const [companyId, agentId] = safeSegments(input.companyId, input.agentId); + const runId = safeSegments(input.runId)[0]!; + const relDir = path.join(companyId, agentId); + const relPath = path.join(relDir, `${runId}.ndjson`); + await ensureDir(relDir); + + const absPath = resolveWithin(basePath, relPath); + await fs.writeFile(absPath, "", "utf8"); + + return { store: "local_file", logRef: relPath }; + }, + + async append(handle, event) { + if (handle.store !== "local_file") return; + const absPath = resolveWithin(basePath, handle.logRef); + const line = JSON.stringify({ + ts: event.ts, + stream: event.stream, + chunk: event.chunk, + }); + await new Promise((resolve, reject) => { + const stream = createWriteStream(absPath, { flags: "a", encoding: "utf8" }); + stream.on("error", reject); + stream.end(`${line}\n`, () => resolve()); + }); + }, + + async finalize(handle) { + if (handle.store !== "local_file") { + return { bytes: 0, compressed: false }; + } + const absPath = resolveWithin(basePath, handle.logRef); + const stat = await fs.stat(absPath).catch(() => null); + if (!stat) throw notFound("Run log not found"); + + const hash = await sha256File(absPath); + return { + bytes: stat.size, + sha256: hash, + compressed: false, + }; + }, + + async read(handle, opts) { + if (handle.store !== "local_file") { + throw notFound("Run log not found"); + } + const absPath = resolveWithin(basePath, handle.logRef); + const offset = opts?.offset ?? 0; + const limitBytes = opts?.limitBytes ?? 256_000; + return readFileRange(absPath, offset, limitBytes); + }, + }; +} + +let cachedStore: RunLogStore | null = null; + +export function getRunLogStore() { + if (cachedStore) return cachedStore; + const basePath = process.env.RUN_LOG_BASE_PATH ?? path.resolve(process.cwd(), "data/run-logs"); + cachedStore = createLocalFileRunLogStore(basePath); + return cachedStore; +} +