import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; import { agents, agentRuntimeState, agentTaskSessions, agentWakeupRequests, heartbeatRunEvents, heartbeatRuns, costEvents, issues, } from "@paperclip/db"; import { conflict, notFound } from "../errors.js"; import { logger } from "../middleware/logger.js"; import { publishLiveEvent } from "./live-events.js"; import { getRunLogStore, type RunLogHandle } from "./run-log-store.js"; import { getServerAdapter, runningProcesses } from "../adapters/index.js"; import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec } from "../adapters/index.js"; import { createLocalAgentJwt } from "../agent-auth-jwt.js"; import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js"; import { secretService } from "./secrets.js"; const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1; const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10; const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext"; const startLocksByAgent = new Map>(); function appendExcerpt(prev: string, chunk: string) { return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES); } function normalizeMaxConcurrentRuns(value: unknown) { const parsed = Math.floor(asNumber(value, HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT)); if (!Number.isFinite(parsed)) return HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT; return Math.max(HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT, Math.min(HEARTBEAT_MAX_CONCURRENT_RUNS_MAX, parsed)); } async function withAgentStartLock(agentId: string, fn: () => Promise) { const previous = startLocksByAgent.get(agentId) ?? Promise.resolve(); const run = previous.then(fn); const marker = run.then( () => undefined, () => undefined, ); startLocksByAgent.set(agentId, marker); try { return await run; } finally { if (startLocksByAgent.get(agentId) === marker) { startLocksByAgent.delete(agentId); } } } interface WakeupOptions { source?: "timer" | "assignment" | "on_demand" | "automation"; triggerDetail?: "manual" | "ping" | "callback" | "system"; reason?: string | null; payload?: Record | null; idempotencyKey?: string | null; requestedByActorType?: "user" | "agent" | "system"; requestedByActorId?: string | null; contextSnapshot?: Record; } function readNonEmptyString(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value : null; } function deriveTaskKey( contextSnapshot: Record | null | undefined, payload: Record | null | undefined, ) { return ( readNonEmptyString(contextSnapshot?.taskKey) ?? readNonEmptyString(contextSnapshot?.taskId) ?? readNonEmptyString(contextSnapshot?.issueId) ?? readNonEmptyString(payload?.taskKey) ?? readNonEmptyString(payload?.taskId) ?? readNonEmptyString(payload?.issueId) ?? null ); } function deriveCommentId( contextSnapshot: Record | null | undefined, payload: Record | null | undefined, ) { return ( readNonEmptyString(contextSnapshot?.wakeCommentId) ?? readNonEmptyString(contextSnapshot?.commentId) ?? readNonEmptyString(payload?.commentId) ?? null ); } function enrichWakeContextSnapshot(input: { contextSnapshot: Record; reason: string | null; source: WakeupOptions["source"]; triggerDetail: WakeupOptions["triggerDetail"] | null; payload: Record | null; }) { const { contextSnapshot, reason, source, triggerDetail, payload } = input; const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]); const commentIdFromPayload = readNonEmptyString(payload?.["commentId"]); const taskKey = deriveTaskKey(contextSnapshot, payload); const wakeCommentId = deriveCommentId(contextSnapshot, payload); if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) { contextSnapshot.wakeReason = reason; } if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) { contextSnapshot.issueId = issueIdFromPayload; } if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) { contextSnapshot.taskId = issueIdFromPayload; } if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) { contextSnapshot.taskKey = taskKey; } if (!readNonEmptyString(contextSnapshot["commentId"]) && commentIdFromPayload) { contextSnapshot.commentId = commentIdFromPayload; } if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) { contextSnapshot.wakeCommentId = wakeCommentId; } if (!readNonEmptyString(contextSnapshot["wakeSource"]) && source) { contextSnapshot.wakeSource = source; } if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) { contextSnapshot.wakeTriggerDetail = triggerDetail; } return { contextSnapshot, issueIdFromPayload, commentIdFromPayload, taskKey, wakeCommentId, }; } function mergeCoalescedContextSnapshot( existingRaw: unknown, incoming: Record, ) { const existing = parseObject(existingRaw); const merged: Record = { ...existing, ...incoming, }; const commentId = deriveCommentId(incoming, null); if (commentId) { merged.commentId = commentId; merged.wakeCommentId = commentId; } return merged; } function runTaskKey(run: typeof heartbeatRuns.$inferSelect) { return deriveTaskKey(run.contextSnapshot as Record | null, null); } function isSameTaskScope(left: string | null, right: string | null) { return (left ?? null) === (right ?? null); } function truncateDisplayId(value: string | null | undefined, max = 128) { if (!value) return null; return value.length > max ? value.slice(0, max) : value; } function normalizeAgentNameKey(value: string | null | undefined) { if (typeof value !== "string") return null; const normalized = value.trim().toLowerCase(); return normalized.length > 0 ? normalized : null; } const defaultSessionCodec: AdapterSessionCodec = { deserialize(raw: unknown) { const asObj = parseObject(raw); if (Object.keys(asObj).length > 0) return asObj; const sessionId = readNonEmptyString((raw as Record | null)?.sessionId); if (sessionId) return { sessionId }; return null; }, serialize(params: Record | null) { if (!params || Object.keys(params).length === 0) return null; return params; }, getDisplayId(params: Record | null) { return readNonEmptyString(params?.sessionId); }, }; function getAdapterSessionCodec(adapterType: string) { const adapter = getServerAdapter(adapterType); return adapter.sessionCodec ?? defaultSessionCodec; } function normalizeSessionParams(params: Record | null | undefined) { if (!params) return null; return Object.keys(params).length > 0 ? params : null; } function resolveNextSessionState(input: { codec: AdapterSessionCodec; adapterResult: AdapterExecutionResult; previousParams: Record | null; previousDisplayId: string | null; previousLegacySessionId: string | null; }) { const { codec, adapterResult, previousParams, previousDisplayId, previousLegacySessionId } = input; if (adapterResult.clearSession) { return { params: null as Record | null, displayId: null as string | null, legacySessionId: null as string | null, }; } const explicitParams = adapterResult.sessionParams; const hasExplicitParams = adapterResult.sessionParams !== undefined; const hasExplicitSessionId = adapterResult.sessionId !== undefined; const explicitSessionId = readNonEmptyString(adapterResult.sessionId); const hasExplicitDisplay = adapterResult.sessionDisplayId !== undefined; const explicitDisplayId = readNonEmptyString(adapterResult.sessionDisplayId); const shouldUsePrevious = !hasExplicitParams && !hasExplicitSessionId && !hasExplicitDisplay; const candidateParams = hasExplicitParams ? explicitParams : hasExplicitSessionId ? (explicitSessionId ? { sessionId: explicitSessionId } : null) : previousParams; const serialized = normalizeSessionParams(codec.serialize(normalizeSessionParams(candidateParams) ?? null)); const deserialized = normalizeSessionParams(codec.deserialize(serialized)); const displayId = truncateDisplayId( explicitDisplayId ?? (codec.getDisplayId ? codec.getDisplayId(deserialized) : null) ?? readNonEmptyString(deserialized?.sessionId) ?? (shouldUsePrevious ? previousDisplayId : null) ?? explicitSessionId ?? (shouldUsePrevious ? previousLegacySessionId : null), ); const legacySessionId = explicitSessionId ?? readNonEmptyString(deserialized?.sessionId) ?? displayId ?? (shouldUsePrevious ? previousLegacySessionId : null); return { params: serialized, displayId, legacySessionId, }; } export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); const secretsSvc = secretService(db); async function getAgent(agentId: string) { return db .select() .from(agents) .where(eq(agents.id, agentId)) .then((rows) => rows[0] ?? null); } async function getRun(runId: string) { return db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); } async function getRuntimeState(agentId: string) { return db .select() .from(agentRuntimeState) .where(eq(agentRuntimeState.agentId, agentId)) .then((rows) => rows[0] ?? null); } async function getTaskSession( companyId: string, agentId: string, adapterType: string, taskKey: string, ) { return db .select() .from(agentTaskSessions) .where( and( eq(agentTaskSessions.companyId, companyId), eq(agentTaskSessions.agentId, agentId), eq(agentTaskSessions.adapterType, adapterType), eq(agentTaskSessions.taskKey, taskKey), ), ) .then((rows) => rows[0] ?? null); } async function resolveSessionBeforeForWakeup( agent: typeof agents.$inferSelect, taskKey: string | null, ) { if (taskKey) { const codec = getAdapterSessionCodec(agent.adapterType); const existingTaskSession = await getTaskSession( agent.companyId, agent.id, agent.adapterType, taskKey, ); const parsedParams = normalizeSessionParams( codec.deserialize(existingTaskSession?.sessionParamsJson ?? null), ); return truncateDisplayId( existingTaskSession?.sessionDisplayId ?? (codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ?? readNonEmptyString(parsedParams?.sessionId), ); } const runtimeForRun = await getRuntimeState(agent.id); return runtimeForRun?.sessionId ?? null; } async function upsertTaskSession(input: { companyId: string; agentId: string; adapterType: string; taskKey: string; sessionParamsJson: Record | null; sessionDisplayId: string | null; lastRunId: string | null; lastError: string | null; }) { const existing = await getTaskSession( input.companyId, input.agentId, input.adapterType, input.taskKey, ); if (existing) { return db .update(agentTaskSessions) .set({ sessionParamsJson: input.sessionParamsJson, sessionDisplayId: input.sessionDisplayId, lastRunId: input.lastRunId, lastError: input.lastError, updatedAt: new Date(), }) .where(eq(agentTaskSessions.id, existing.id)) .returning() .then((rows) => rows[0] ?? null); } return db .insert(agentTaskSessions) .values({ companyId: input.companyId, agentId: input.agentId, adapterType: input.adapterType, taskKey: input.taskKey, sessionParamsJson: input.sessionParamsJson, sessionDisplayId: input.sessionDisplayId, lastRunId: input.lastRunId, lastError: input.lastError, }) .returning() .then((rows) => rows[0] ?? null); } async function clearTaskSessions( companyId: string, agentId: string, opts?: { taskKey?: string | null; adapterType?: string | null }, ) { const conditions = [ eq(agentTaskSessions.companyId, companyId), eq(agentTaskSessions.agentId, agentId), ]; if (opts?.taskKey) { conditions.push(eq(agentTaskSessions.taskKey, opts.taskKey)); } if (opts?.adapterType) { conditions.push(eq(agentTaskSessions.adapterType, opts.adapterType)); } return db .delete(agentTaskSessions) .where(and(...conditions)) .returning() .then((rows) => rows.length); } async function ensureRuntimeState(agent: typeof agents.$inferSelect) { const existing = await getRuntimeState(agent.id); if (existing) return existing; return db .insert(agentRuntimeState) .values({ agentId: agent.id, companyId: agent.companyId, adapterType: agent.adapterType, stateJson: {}, }) .returning() .then((rows) => rows[0]); } async function setRunStatus( runId: string, status: string, patch?: Partial, ) { const updated = await db .update(heartbeatRuns) .set({ status, ...patch, updatedAt: new Date() }) .where(eq(heartbeatRuns.id, runId)) .returning() .then((rows) => rows[0] ?? null); if (updated) { publishLiveEvent({ companyId: updated.companyId, type: "heartbeat.run.status", payload: { runId: updated.id, agentId: updated.agentId, status: updated.status, invocationSource: updated.invocationSource, triggerDetail: updated.triggerDetail, error: updated.error ?? null, errorCode: updated.errorCode ?? null, startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null, finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null, }, }); } return updated; } async function setWakeupStatus( wakeupRequestId: string | null | undefined, status: string, patch?: Partial, ) { if (!wakeupRequestId) return; await db .update(agentWakeupRequests) .set({ status, ...patch, updatedAt: new Date() }) .where(eq(agentWakeupRequests.id, wakeupRequestId)); } async function appendRunEvent( run: typeof heartbeatRuns.$inferSelect, seq: number, event: { eventType: string; stream?: "system" | "stdout" | "stderr"; level?: "info" | "warn" | "error"; color?: string; message?: string; payload?: Record; }, ) { await db.insert(heartbeatRunEvents).values({ companyId: run.companyId, runId: run.id, agentId: run.agentId, seq, eventType: event.eventType, stream: event.stream, level: event.level, color: event.color, message: event.message, payload: event.payload, }); publishLiveEvent({ companyId: run.companyId, type: "heartbeat.run.event", payload: { runId: run.id, agentId: run.agentId, seq, eventType: event.eventType, stream: event.stream ?? null, level: event.level ?? null, color: event.color ?? null, message: event.message ?? null, payload: event.payload ?? null, }, }); } function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) { const runtimeConfig = parseObject(agent.runtimeConfig); const heartbeat = parseObject(runtimeConfig.heartbeat); return { enabled: asBoolean(heartbeat.enabled, true), intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)), wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true), maxConcurrentRuns: normalizeMaxConcurrentRuns(heartbeat.maxConcurrentRuns), }; } async function countRunningRunsForAgent(agentId: string) { const [{ count }] = await db .select({ count: sql`count(*)` }) .from(heartbeatRuns) .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"))); return Number(count ?? 0); } async function claimQueuedRun(run: typeof heartbeatRuns.$inferSelect) { if (run.status !== "queued") return run; const claimedAt = new Date(); const claimed = await db .update(heartbeatRuns) .set({ status: "running", startedAt: run.startedAt ?? claimedAt, updatedAt: claimedAt, }) .where(and(eq(heartbeatRuns.id, run.id), eq(heartbeatRuns.status, "queued"))) .returning() .then((rows) => rows[0] ?? null); if (!claimed) return null; publishLiveEvent({ companyId: claimed.companyId, type: "heartbeat.run.status", payload: { runId: claimed.id, agentId: claimed.agentId, status: claimed.status, invocationSource: claimed.invocationSource, triggerDetail: claimed.triggerDetail, error: claimed.error ?? null, errorCode: claimed.errorCode ?? null, startedAt: claimed.startedAt ? new Date(claimed.startedAt).toISOString() : null, finishedAt: claimed.finishedAt ? new Date(claimed.finishedAt).toISOString() : null, }, }); await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt }); return claimed; } async function finalizeAgentStatus( agentId: string, outcome: "succeeded" | "failed" | "cancelled" | "timed_out", ) { const existing = await getAgent(agentId); if (!existing) return; if (existing.status === "paused" || existing.status === "terminated") { return; } const runningCount = await countRunningRunsForAgent(agentId); const nextStatus = runningCount > 0 ? "running" : outcome === "succeeded" || outcome === "cancelled" ? "idle" : "error"; const updated = await db .update(agents) .set({ status: nextStatus, lastHeartbeatAt: new Date(), updatedAt: new Date(), }) .where(eq(agents.id, agentId)) .returning() .then((rows) => rows[0] ?? null); if (updated) { publishLiveEvent({ companyId: updated.companyId, type: "agent.status", payload: { agentId: updated.id, status: updated.status, lastHeartbeatAt: updated.lastHeartbeatAt ? new Date(updated.lastHeartbeatAt).toISOString() : null, outcome, }, }); } } async function reapOrphanedRuns(opts?: { staleThresholdMs?: number }) { const staleThresholdMs = opts?.staleThresholdMs ?? 0; const now = new Date(); // Find all runs in "queued" or "running" state const activeRuns = await db .select() .from(heartbeatRuns) .where(inArray(heartbeatRuns.status, ["queued", "running"])); const reaped: string[] = []; for (const run of activeRuns) { if (runningProcesses.has(run.id)) continue; // Apply staleness threshold to avoid false positives if (staleThresholdMs > 0) { const refTime = run.updatedAt ? new Date(run.updatedAt).getTime() : 0; if (now.getTime() - refTime < staleThresholdMs) continue; } await setRunStatus(run.id, "failed", { error: "Process lost -- server may have restarted", errorCode: "process_lost", finishedAt: now, }); await setWakeupStatus(run.wakeupRequestId, "failed", { finishedAt: now, error: "Process lost -- server may have restarted", }); const updatedRun = await getRun(run.id); if (updatedRun) { await appendRunEvent(updatedRun, 1, { eventType: "lifecycle", stream: "system", level: "error", message: "Process lost -- server may have restarted", }); await releaseIssueExecutionAndPromote(updatedRun); } await finalizeAgentStatus(run.agentId, "failed"); await startNextQueuedRunForAgent(run.agentId); runningProcesses.delete(run.id); reaped.push(run.id); } if (reaped.length > 0) { logger.warn({ reapedCount: reaped.length, runIds: reaped }, "reaped orphaned heartbeat runs"); } return { reaped: reaped.length, runIds: reaped }; } async function updateRuntimeState( agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, result: AdapterExecutionResult, session: { legacySessionId: string | null }, ) { await ensureRuntimeState(agent); const usage = result.usage; const inputTokens = usage?.inputTokens ?? 0; const outputTokens = usage?.outputTokens ?? 0; const cachedInputTokens = usage?.cachedInputTokens ?? 0; const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100)); await db .update(agentRuntimeState) .set({ adapterType: agent.adapterType, sessionId: session.legacySessionId, lastRunId: run.id, lastRunStatus: run.status, lastError: result.errorMessage ?? null, totalInputTokens: sql`${agentRuntimeState.totalInputTokens} + ${inputTokens}`, totalOutputTokens: sql`${agentRuntimeState.totalOutputTokens} + ${outputTokens}`, totalCachedInputTokens: sql`${agentRuntimeState.totalCachedInputTokens} + ${cachedInputTokens}`, totalCostCents: sql`${agentRuntimeState.totalCostCents} + ${additionalCostCents}`, updatedAt: new Date(), }) .where(eq(agentRuntimeState.agentId, agent.id)); if (additionalCostCents > 0) { await db.insert(costEvents).values({ companyId: agent.companyId, agentId: agent.id, provider: result.provider ?? "unknown", model: result.model ?? "unknown", inputTokens, outputTokens, costCents: additionalCostCents, occurredAt: new Date(), }); await db .update(agents) .set({ spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`, updatedAt: new Date(), }) .where(eq(agents.id, agent.id)); } } async function startNextQueuedRunForAgent(agentId: string) { return withAgentStartLock(agentId, async () => { const agent = await getAgent(agentId); if (!agent) return []; const policy = parseHeartbeatPolicy(agent); const runningCount = await countRunningRunsForAgent(agentId); const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount); if (availableSlots <= 0) return []; const queuedRuns = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued"))) .orderBy(asc(heartbeatRuns.createdAt)) .limit(availableSlots); if (queuedRuns.length === 0) return []; const claimedRuns: Array = []; for (const queuedRun of queuedRuns) { const claimed = await claimQueuedRun(queuedRun); if (claimed) claimedRuns.push(claimed); } if (claimedRuns.length === 0) return []; for (const claimedRun of claimedRuns) { void executeRun(claimedRun.id).catch((err) => { logger.error({ err, runId: claimedRun.id }, "queued heartbeat execution failed"); }); } return claimedRuns; }); } async function executeRun(runId: string) { let run = await getRun(runId); if (!run) return; if (run.status !== "queued" && run.status !== "running") return; if (run.status === "queued") { const claimed = await claimQueuedRun(run); if (!claimed) { // Another worker has already claimed or finalized this run. return; } run = claimed; } const agent = await getAgent(run.agentId); if (!agent) { await setRunStatus(runId, "failed", { error: "Agent not found", errorCode: "agent_not_found", finishedAt: new Date(), }); await setWakeupStatus(run.wakeupRequestId, "failed", { finishedAt: new Date(), error: "Agent not found", }); const failedRun = await getRun(runId); if (failedRun) await releaseIssueExecutionAndPromote(failedRun); return; } const runtime = await ensureRuntimeState(agent); const context = parseObject(run.contextSnapshot); const taskKey = deriveTaskKey(context, null); const sessionCodec = getAdapterSessionCodec(agent.adapterType); const taskSession = taskKey ? await getTaskSession(agent.companyId, agent.id, agent.adapterType, taskKey) : null; const previousSessionParams = normalizeSessionParams( sessionCodec.deserialize(taskSession?.sessionParamsJson ?? null), ); const runtimeSessionFallback = taskKey ? null : runtime.sessionId; const previousSessionDisplayId = truncateDisplayId( taskSession?.sessionDisplayId ?? (sessionCodec.getDisplayId ? sessionCodec.getDisplayId(previousSessionParams) : null) ?? readNonEmptyString(previousSessionParams?.sessionId) ?? runtimeSessionFallback, ); const runtimeForAdapter = { sessionId: readNonEmptyString(previousSessionParams?.sessionId) ?? runtimeSessionFallback, sessionParams: previousSessionParams, sessionDisplayId: previousSessionDisplayId, taskKey, }; let seq = 1; let handle: RunLogHandle | null = null; let stdoutExcerpt = ""; let stderrExcerpt = ""; try { const startedAt = run.startedAt ?? new Date(); const runningWithSession = await db .update(heartbeatRuns) .set({ startedAt, sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId, updatedAt: new Date(), }) .where(eq(heartbeatRuns.id, run.id)) .returning() .then((rows) => rows[0] ?? null); if (runningWithSession) run = runningWithSession; const runningAgent = await db .update(agents) .set({ status: "running", updatedAt: new Date() }) .where(eq(agents.id, agent.id)) .returning() .then((rows) => rows[0] ?? null); if (runningAgent) { publishLiveEvent({ companyId: runningAgent.companyId, type: "agent.status", payload: { agentId: runningAgent.id, status: runningAgent.status, outcome: "running", }, }); } const currentRun = run; await appendRunEvent(currentRun, seq++, { eventType: "lifecycle", stream: "system", level: "info", message: "run started", }); handle = await runLogStore.begin({ companyId: run.companyId, agentId: run.agentId, runId, }); await db .update(heartbeatRuns) .set({ logStore: handle.store, logRef: handle.logRef, updatedAt: new Date(), }) .where(eq(heartbeatRuns.id, runId)); const onLog = async (stream: "stdout" | "stderr", chunk: string) => { if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk); if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk); if (handle) { await runLogStore.append(handle, { stream, chunk, ts: new Date().toISOString(), }); } const payloadChunk = chunk.length > MAX_LIVE_LOG_CHUNK_BYTES ? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES) : chunk; publishLiveEvent({ companyId: run.companyId, type: "heartbeat.run.log", payload: { runId: run.id, agentId: run.agentId, stream, chunk: payloadChunk, truncated: payloadChunk.length !== chunk.length, }, }); }; const config = parseObject(agent.adapterConfig); const resolvedConfig = await secretsSvc.resolveAdapterConfigForRuntime( agent.companyId, config, ); const onAdapterMeta = async (meta: AdapterInvocationMeta) => { await appendRunEvent(currentRun, seq++, { eventType: "adapter.invoke", stream: "system", level: "info", message: "adapter invocation", payload: meta as unknown as Record, }); }; const adapter = getServerAdapter(agent.adapterType); const authToken = adapter.supportsLocalAgentJwt ? createLocalAgentJwt(agent.id, agent.companyId, agent.adapterType, run.id) : null; if (adapter.supportsLocalAgentJwt && !authToken) { logger.warn( { companyId: agent.companyId, agentId: agent.id, runId: run.id, adapterType: agent.adapterType, }, "local agent jwt secret missing or invalid; running without injected PAPERCLIP_API_KEY", ); } const adapterResult = await adapter.execute({ runId: run.id, agent, runtime: runtimeForAdapter, config: resolvedConfig, context, onLog, onMeta: onAdapterMeta, authToken: authToken ?? undefined, }); const nextSessionState = resolveNextSessionState({ codec: sessionCodec, adapterResult, previousParams: previousSessionParams, previousDisplayId: runtimeForAdapter.sessionDisplayId, previousLegacySessionId: runtimeForAdapter.sessionId, }); let outcome: "succeeded" | "failed" | "cancelled" | "timed_out"; const latestRun = await getRun(run.id); if (latestRun?.status === "cancelled") { outcome = "cancelled"; } else if (adapterResult.timedOut) { outcome = "timed_out"; } else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) { outcome = "succeeded"; } else { outcome = "failed"; } let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; if (handle) { logSummary = await runLogStore.finalize(handle); } const status = outcome === "succeeded" ? "succeeded" : outcome === "cancelled" ? "cancelled" : outcome === "timed_out" ? "timed_out" : "failed"; const usageJson = adapterResult.usage || adapterResult.costUsd != null ? ({ ...(adapterResult.usage ?? {}), ...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}), } as Record) : null; await setRunStatus(run.id, status, { finishedAt: new Date(), error: outcome === "succeeded" ? null : adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"), errorCode: outcome === "timed_out" ? "timeout" : outcome === "cancelled" ? "cancelled" : outcome === "failed" ? "adapter_failed" : null, exitCode: adapterResult.exitCode, signal: adapterResult.signal, usageJson, resultJson: adapterResult.resultJson ?? null, sessionIdAfter: nextSessionState.displayId ?? nextSessionState.legacySessionId, stdoutExcerpt, stderrExcerpt, logBytes: logSummary?.bytes, logSha256: logSummary?.sha256, logCompressed: logSummary?.compressed ?? false, }); await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, { finishedAt: new Date(), error: adapterResult.errorMessage ?? null, }); const finalizedRun = await getRun(run.id); if (finalizedRun) { await appendRunEvent(finalizedRun, seq++, { eventType: "lifecycle", stream: "system", level: outcome === "succeeded" ? "info" : "error", message: `run ${outcome}`, payload: { status, exitCode: adapterResult.exitCode, }, }); await releaseIssueExecutionAndPromote(finalizedRun); } if (finalizedRun) { await updateRuntimeState(agent, finalizedRun, adapterResult, { legacySessionId: nextSessionState.legacySessionId, }); if (taskKey) { if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) { await clearTaskSessions(agent.companyId, agent.id, { taskKey, adapterType: agent.adapterType, }); } else { await upsertTaskSession({ companyId: agent.companyId, agentId: agent.id, adapterType: agent.adapterType, taskKey, sessionParamsJson: nextSessionState.params, sessionDisplayId: nextSessionState.displayId, lastRunId: finalizedRun.id, lastError: outcome === "succeeded" ? null : (adapterResult.errorMessage ?? "run_failed"), }); } } } await finalizeAgentStatus(agent.id, outcome); } catch (err) { const message = err instanceof Error ? err.message : "Unknown adapter failure"; logger.error({ err, runId }, "heartbeat execution failed"); let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; if (handle) { try { logSummary = await runLogStore.finalize(handle); } catch (finalizeErr) { logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error"); } } const failedRun = await setRunStatus(run.id, "failed", { error: message, errorCode: "adapter_failed", finishedAt: new Date(), stdoutExcerpt, stderrExcerpt, logBytes: logSummary?.bytes, logSha256: logSummary?.sha256, logCompressed: logSummary?.compressed ?? false, }); await setWakeupStatus(run.wakeupRequestId, "failed", { finishedAt: new Date(), error: message, }); if (failedRun) { await appendRunEvent(failedRun, seq++, { eventType: "error", stream: "system", level: "error", message, }); await releaseIssueExecutionAndPromote(failedRun); await updateRuntimeState(agent, failedRun, { exitCode: null, signal: null, timedOut: false, errorMessage: message, }, { legacySessionId: runtimeForAdapter.sessionId, }); if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) { await upsertTaskSession({ companyId: agent.companyId, agentId: agent.id, adapterType: agent.adapterType, taskKey, sessionParamsJson: previousSessionParams, sessionDisplayId: previousSessionDisplayId, lastRunId: failedRun.id, lastError: message, }); } } await finalizeAgentStatus(agent.id, "failed"); } finally { await startNextQueuedRunForAgent(agent.id); } } async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) { const promotedRun = await db.transaction(async (tx) => { await tx.execute( sql`select id from issues where company_id = ${run.companyId} and execution_run_id = ${run.id} for update`, ); const issue = await tx .select({ id: issues.id, companyId: issues.companyId, }) .from(issues) .where(and(eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id))) .then((rows) => rows[0] ?? null); if (!issue) return; await tx .update(issues) .set({ executionRunId: null, executionAgentNameKey: null, executionLockedAt: null, updatedAt: new Date(), }) .where(eq(issues.id, issue.id)); while (true) { const deferred = await tx .select() .from(agentWakeupRequests) .where( and( eq(agentWakeupRequests.companyId, issue.companyId), eq(agentWakeupRequests.status, "deferred_issue_execution"), sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issue.id}`, ), ) .orderBy(asc(agentWakeupRequests.requestedAt)) .limit(1) .then((rows) => rows[0] ?? null); if (!deferred) return null; const deferredAgent = await tx .select() .from(agents) .where(eq(agents.id, deferred.agentId)) .then((rows) => rows[0] ?? null); if ( !deferredAgent || deferredAgent.companyId !== issue.companyId || deferredAgent.status === "paused" || deferredAgent.status === "terminated" || deferredAgent.status === "pending_approval" ) { await tx .update(agentWakeupRequests) .set({ status: "failed", finishedAt: new Date(), error: "Deferred wake could not be promoted: agent is not invokable", updatedAt: new Date(), }) .where(eq(agentWakeupRequests.id, deferred.id)); continue; } const deferredPayload = parseObject(deferred.payload); const deferredContextSeed = parseObject(deferredPayload[DEFERRED_WAKE_CONTEXT_KEY]); const promotedContextSeed: Record = { ...deferredContextSeed }; const promotedReason = readNonEmptyString(deferred.reason) ?? "issue_execution_promoted"; const promotedSource = (readNonEmptyString(deferred.source) as WakeupOptions["source"]) ?? "automation"; const promotedTriggerDetail = (readNonEmptyString(deferred.triggerDetail) as WakeupOptions["triggerDetail"]) ?? null; const promotedPayload = deferredPayload; delete promotedPayload[DEFERRED_WAKE_CONTEXT_KEY]; const { contextSnapshot: promotedContextSnapshot, taskKey: promotedTaskKey, } = enrichWakeContextSnapshot({ contextSnapshot: promotedContextSeed, reason: promotedReason, source: promotedSource, triggerDetail: promotedTriggerDetail, payload: promotedPayload, }); const sessionBefore = await resolveSessionBeforeForWakeup(deferredAgent, promotedTaskKey); const now = new Date(); const newRun = await tx .insert(heartbeatRuns) .values({ companyId: deferredAgent.companyId, agentId: deferredAgent.id, invocationSource: promotedSource, triggerDetail: promotedTriggerDetail, status: "queued", wakeupRequestId: deferred.id, contextSnapshot: promotedContextSnapshot, sessionIdBefore: sessionBefore, }) .returning() .then((rows) => rows[0]); await tx .update(agentWakeupRequests) .set({ status: "queued", reason: "issue_execution_promoted", runId: newRun.id, claimedAt: null, finishedAt: null, error: null, updatedAt: now, }) .where(eq(agentWakeupRequests.id, deferred.id)); await tx .update(issues) .set({ executionRunId: newRun.id, executionAgentNameKey: normalizeAgentNameKey(deferredAgent.name), executionLockedAt: now, updatedAt: now, }) .where(eq(issues.id, issue.id)); return newRun; } }); if (!promotedRun) return; publishLiveEvent({ companyId: promotedRun.companyId, type: "heartbeat.run.queued", payload: { runId: promotedRun.id, agentId: promotedRun.agentId, invocationSource: promotedRun.invocationSource, triggerDetail: promotedRun.triggerDetail, wakeupRequestId: promotedRun.wakeupRequestId, }, }); await startNextQueuedRunForAgent(promotedRun.agentId); } async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) { const source = opts.source ?? "on_demand"; const triggerDetail = opts.triggerDetail ?? null; const contextSnapshot: Record = { ...(opts.contextSnapshot ?? {}) }; const reason = opts.reason ?? null; const payload = opts.payload ?? null; const { contextSnapshot: enrichedContextSnapshot, issueIdFromPayload, taskKey, wakeCommentId, } = enrichWakeContextSnapshot({ contextSnapshot, reason, source, triggerDetail, payload, }); const issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueIdFromPayload; const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); if ( agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval" ) { throw conflict("Agent is not invokable in its current state", { status: agent.status }); } const policy = parseHeartbeatPolicy(agent); const writeSkippedRequest = async (reason: string) => { await db.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, source, triggerDetail, reason, payload, status: "skipped", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, finishedAt: new Date(), }); }; if (source === "timer" && !policy.enabled) { await writeSkippedRequest("heartbeat.disabled"); return null; } if (source !== "timer" && !policy.wakeOnDemand) { await writeSkippedRequest("heartbeat.wakeOnDemand.disabled"); return null; } if (issueId) { const agentNameKey = normalizeAgentNameKey(agent.name); const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey); const outcome = await db.transaction(async (tx) => { await tx.execute( sql`select id from issues where id = ${issueId} and company_id = ${agent.companyId} for update`, ); const issue = await tx .select({ id: issues.id, companyId: issues.companyId, executionRunId: issues.executionRunId, executionAgentNameKey: issues.executionAgentNameKey, }) .from(issues) .where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId))) .then((rows) => rows[0] ?? null); if (!issue) { await tx.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, source, triggerDetail, reason: "issue_execution_issue_not_found", payload, status: "skipped", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, finishedAt: new Date(), }); return { kind: "skipped" as const }; } let activeExecutionRun = issue.executionRunId ? await tx .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, issue.executionRunId)) .then((rows) => rows[0] ?? null) : null; if (activeExecutionRun && activeExecutionRun.status !== "queued" && activeExecutionRun.status !== "running") { activeExecutionRun = null; } if (!activeExecutionRun && issue.executionRunId) { await tx .update(issues) .set({ executionRunId: null, executionAgentNameKey: null, executionLockedAt: null, updatedAt: new Date(), }) .where(eq(issues.id, issue.id)); } if (!activeExecutionRun) { const legacyRun = await tx .select() .from(heartbeatRuns) .where( and( eq(heartbeatRuns.companyId, issue.companyId), inArray(heartbeatRuns.status, ["queued", "running"]), sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issue.id}`, ), ) .orderBy( sql`case when ${heartbeatRuns.status} = 'running' then 0 else 1 end`, asc(heartbeatRuns.createdAt), ) .limit(1) .then((rows) => rows[0] ?? null); if (legacyRun) { activeExecutionRun = legacyRun; const legacyAgent = await tx .select({ name: agents.name }) .from(agents) .where(eq(agents.id, legacyRun.agentId)) .then((rows) => rows[0] ?? null); await tx .update(issues) .set({ executionRunId: legacyRun.id, executionAgentNameKey: normalizeAgentNameKey(legacyAgent?.name), executionLockedAt: new Date(), updatedAt: new Date(), }) .where(eq(issues.id, issue.id)); } } if (activeExecutionRun) { const executionAgent = await tx .select({ name: agents.name }) .from(agents) .where(eq(agents.id, activeExecutionRun.agentId)) .then((rows) => rows[0] ?? null); const executionAgentNameKey = normalizeAgentNameKey(issue.executionAgentNameKey) ?? normalizeAgentNameKey(executionAgent?.name); if (executionAgentNameKey && executionAgentNameKey === agentNameKey) { const mergedContextSnapshot = mergeCoalescedContextSnapshot( activeExecutionRun.contextSnapshot, enrichedContextSnapshot, ); const mergedRun = await tx .update(heartbeatRuns) .set({ contextSnapshot: mergedContextSnapshot, updatedAt: new Date(), }) .where(eq(heartbeatRuns.id, activeExecutionRun.id)) .returning() .then((rows) => rows[0] ?? activeExecutionRun); await tx.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, source, triggerDetail, reason: "issue_execution_same_name", payload, status: "coalesced", coalescedCount: 1, requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, runId: mergedRun.id, finishedAt: new Date(), }); return { kind: "coalesced" as const, run: mergedRun }; } const deferredPayload = { ...(payload ?? {}), issueId, [DEFERRED_WAKE_CONTEXT_KEY]: enrichedContextSnapshot, }; await tx.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, source, triggerDetail, reason: "issue_execution_deferred", payload: deferredPayload, status: "deferred_issue_execution", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, }); return { kind: "deferred" as const }; } const wakeupRequest = await tx .insert(agentWakeupRequests) .values({ companyId: agent.companyId, agentId, source, triggerDetail, reason, payload, status: "queued", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, }) .returning() .then((rows) => rows[0]); const newRun = await tx .insert(heartbeatRuns) .values({ companyId: agent.companyId, agentId, invocationSource: source, triggerDetail, status: "queued", wakeupRequestId: wakeupRequest.id, contextSnapshot: enrichedContextSnapshot, sessionIdBefore: sessionBefore, }) .returning() .then((rows) => rows[0]); await tx .update(agentWakeupRequests) .set({ runId: newRun.id, updatedAt: new Date(), }) .where(eq(agentWakeupRequests.id, wakeupRequest.id)); await tx .update(issues) .set({ executionRunId: newRun.id, executionAgentNameKey: agentNameKey, executionLockedAt: new Date(), updatedAt: new Date(), }) .where(eq(issues.id, issue.id)); return { kind: "queued" as const, run: newRun }; }); if (outcome.kind === "deferred" || outcome.kind === "skipped") return null; if (outcome.kind === "coalesced") return outcome.run; const newRun = outcome.run; publishLiveEvent({ companyId: newRun.companyId, type: "heartbeat.run.queued", payload: { runId: newRun.id, agentId: newRun.agentId, invocationSource: newRun.invocationSource, triggerDetail: newRun.triggerDetail, wakeupRequestId: newRun.wakeupRequestId, }, }); await startNextQueuedRunForAgent(agent.id); return newRun; } const activeRuns = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) .orderBy(desc(heartbeatRuns.createdAt)); const sameScopeQueuedRun = activeRuns.find( (candidate) => candidate.status === "queued" && isSameTaskScope(runTaskKey(candidate), taskKey), ); const sameScopeRunningRun = activeRuns.find( (candidate) => candidate.status === "running" && isSameTaskScope(runTaskKey(candidate), taskKey), ); const shouldQueueFollowupForCommentWake = Boolean(wakeCommentId) && Boolean(sameScopeRunningRun) && !sameScopeQueuedRun; const coalescedTargetRun = sameScopeQueuedRun ?? (shouldQueueFollowupForCommentWake ? null : sameScopeRunningRun ?? null); if (coalescedTargetRun) { const mergedContextSnapshot = mergeCoalescedContextSnapshot( coalescedTargetRun.contextSnapshot, contextSnapshot, ); const mergedRun = await db .update(heartbeatRuns) .set({ contextSnapshot: mergedContextSnapshot, updatedAt: new Date(), }) .where(eq(heartbeatRuns.id, coalescedTargetRun.id)) .returning() .then((rows) => rows[0] ?? coalescedTargetRun); await db.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, source, triggerDetail, reason, payload, status: "coalesced", coalescedCount: 1, requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, runId: mergedRun.id, finishedAt: new Date(), }); return mergedRun; } const wakeupRequest = await db .insert(agentWakeupRequests) .values({ companyId: agent.companyId, agentId, source, triggerDetail, reason, payload, status: "queued", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, }) .returning() .then((rows) => rows[0]); const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey); const newRun = await db .insert(heartbeatRuns) .values({ companyId: agent.companyId, agentId, invocationSource: source, triggerDetail, status: "queued", wakeupRequestId: wakeupRequest.id, contextSnapshot: enrichedContextSnapshot, sessionIdBefore: sessionBefore, }) .returning() .then((rows) => rows[0]); await db .update(agentWakeupRequests) .set({ runId: newRun.id, updatedAt: new Date(), }) .where(eq(agentWakeupRequests.id, wakeupRequest.id)); publishLiveEvent({ companyId: newRun.companyId, type: "heartbeat.run.queued", payload: { runId: newRun.id, agentId: newRun.agentId, invocationSource: newRun.invocationSource, triggerDetail: newRun.triggerDetail, wakeupRequestId: newRun.wakeupRequestId, }, }); await startNextQueuedRunForAgent(agent.id); return newRun; } return { list: (companyId: string, agentId?: string) => { if (!agentId) { return db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.companyId, companyId)) .orderBy(desc(heartbeatRuns.createdAt)); } return db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId))) .orderBy(desc(heartbeatRuns.createdAt)); }, getRun, getRuntimeState: async (agentId: string) => { const state = await getRuntimeState(agentId); const agent = await getAgent(agentId); if (!agent) return null; const ensured = state ?? (await ensureRuntimeState(agent)); const latestTaskSession = await db .select() .from(agentTaskSessions) .where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agent.id))) .orderBy(desc(agentTaskSessions.updatedAt)) .limit(1) .then((rows) => rows[0] ?? null); return { ...ensured, sessionDisplayId: latestTaskSession?.sessionDisplayId ?? ensured.sessionId, sessionParamsJson: latestTaskSession?.sessionParamsJson ?? null, }; }, listTaskSessions: async (agentId: string) => { const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); return db .select() .from(agentTaskSessions) .where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agentId))) .orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt)); }, resetRuntimeSession: async (agentId: string, opts?: { taskKey?: string | null }) => { const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); await ensureRuntimeState(agent); const taskKey = readNonEmptyString(opts?.taskKey); const clearedTaskSessions = await clearTaskSessions( agent.companyId, agent.id, taskKey ? { taskKey, adapterType: agent.adapterType } : undefined, ); const runtimePatch: Partial = { sessionId: null, lastError: null, updatedAt: new Date(), }; if (!taskKey) { runtimePatch.stateJson = {}; } const updated = await db .update(agentRuntimeState) .set(runtimePatch) .where(eq(agentRuntimeState.agentId, agentId)) .returning() .then((rows) => rows[0] ?? null); if (!updated) return null; return { ...updated, sessionDisplayId: null, sessionParamsJson: null, clearedTaskSessions, }; }, listEvents: (runId: string, afterSeq = 0, limit = 200) => db .select() .from(heartbeatRunEvents) .where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq))) .orderBy(asc(heartbeatRunEvents.seq)) .limit(Math.max(1, Math.min(limit, 1000))), readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => { const run = await getRun(runId); if (!run) throw notFound("Heartbeat run not found"); if (!run.logStore || !run.logRef) throw notFound("Run log not found"); const result = await runLogStore.read( { store: run.logStore as "local_file", logRef: run.logRef, }, opts, ); return { runId, store: run.logStore, logRef: run.logRef, ...result, }; }, invoke: async ( agentId: string, source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand", contextSnapshot: Record = {}, triggerDetail: "manual" | "ping" | "callback" | "system" = "manual", actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null }, ) => enqueueWakeup(agentId, { source, triggerDetail, contextSnapshot, requestedByActorType: actor?.actorType, requestedByActorId: actor?.actorId ?? null, }), wakeup: enqueueWakeup, reapOrphanedRuns, tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0; let enqueued = 0; let skipped = 0; for (const agent of allAgents) { if (agent.status === "paused" || agent.status === "terminated") continue; const policy = parseHeartbeatPolicy(agent); if (!policy.enabled || policy.intervalSec <= 0) continue; checked += 1; const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0; const elapsedMs = now.getTime() - last; if (last && elapsedMs < policy.intervalSec * 1000) continue; const run = await enqueueWakeup(agent.id, { source: "timer", triggerDetail: "system", reason: "heartbeat_timer", requestedByActorType: "system", requestedByActorId: "heartbeat_scheduler", contextSnapshot: { source: "scheduler", reason: "interval_elapsed", now: now.toISOString(), }, }); if (run) enqueued += 1; else skipped += 1; } return { checked, enqueued, skipped }; }, cancelRun: async (runId: string) => { const run = await getRun(runId); if (!run) throw notFound("Heartbeat run not found"); if (run.status !== "running" && run.status !== "queued") return run; const running = runningProcesses.get(run.id); if (running) { running.child.kill("SIGTERM"); const graceMs = Math.max(1, running.graceSec) * 1000; setTimeout(() => { if (!running.child.killed) { running.child.kill("SIGKILL"); } }, graceMs); } const cancelled = await setRunStatus(run.id, "cancelled", { finishedAt: new Date(), error: "Cancelled by control plane", errorCode: "cancelled", }); await setWakeupStatus(run.wakeupRequestId, "cancelled", { finishedAt: new Date(), error: "Cancelled by control plane", }); if (cancelled) { await appendRunEvent(cancelled, 1, { eventType: "lifecycle", stream: "system", level: "warn", message: "run cancelled", }); await releaseIssueExecutionAndPromote(cancelled); } runningProcesses.delete(run.id); await finalizeAgentStatus(run.agentId, "cancelled"); await startNextQueuedRunForAgent(run.agentId); return cancelled; }, cancelActiveForAgent: async (agentId: string) => { const runs = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))); for (const run of runs) { await setRunStatus(run.id, "cancelled", { finishedAt: new Date(), error: "Cancelled due to agent pause", errorCode: "cancelled", }); await setWakeupStatus(run.wakeupRequestId, "cancelled", { finishedAt: new Date(), error: "Cancelled due to agent pause", }); const running = runningProcesses.get(run.id); if (running) { running.child.kill("SIGTERM"); runningProcesses.delete(run.id); } await releaseIssueExecutionAndPromote(run); } return runs.length; }, getActiveRunForAgent: async (agentId: string) => { const [run] = await db .select() .from(heartbeatRuns) .where( and( eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"), ), ) .orderBy(desc(heartbeatRuns.startedAt)) .limit(1); return run ?? null; }, }; }