From fe764cac75c9bca9e76d310208cad8ce4d347001 Mon Sep 17 00:00:00 2001 From: Dotta Date: Fri, 13 Mar 2026 06:56:31 -0500 Subject: [PATCH] fix: resolve type errors in process-lost-reaper PR - Fix malformed try/catch/finally blocks in heartbeat executeRun - Declare activeRunExecutions Set to track in-flight runs - Add resumeQueuedRuns function and export from heartbeat service - Add initdbFlags to EmbeddedPostgresCtor type Co-Authored-By: Claude Opus 4.6 --- server/src/index.ts | 1 + server/src/services/heartbeat.ts | 23 ++++++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/server/src/index.ts b/server/src/index.ts index 691e1f2b..27b559eb 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -53,6 +53,7 @@ type EmbeddedPostgresCtor = new (opts: { password: string; port: number; persistent: boolean; + initdbFlags?: string[]; onLog?: (message: unknown) => void; onError?: (message: unknown) => void; }) => EmbeddedPostgresInstance; diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 375123bc..f0665c9a 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -455,6 +455,7 @@ export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); const secretsSvc = secretService(db); const issuesSvc = issueService(db); + const activeRunExecutions = new Set(); async function getAgent(agentId: string) { return db @@ -959,7 +960,7 @@ export function heartbeatService(db: Db) { const reaped: string[] = []; for (const run of activeRuns) { - if (runningProcesses.has(run.id)) continue; + if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue; // Apply staleness threshold to avoid false positives if (staleThresholdMs > 0) { @@ -998,6 +999,18 @@ export function heartbeatService(db: Db) { return { reaped: reaped.length, runIds: reaped }; } + async function resumeQueuedRuns() { + const queuedRuns = await db + .select({ agentId: heartbeatRuns.agentId }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.status, "queued")); + + const agentIds = [...new Set(queuedRuns.map((r) => r.agentId))]; + for (const agentId of agentIds) { + await startNextQueuedRunForAgent(agentId); + } + } + async function updateRuntimeState( agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, @@ -1679,7 +1692,8 @@ export function heartbeatService(db: Db) { } await finalizeAgentStatus(agent.id, "failed"); - } catch (outerErr) { + } + } catch (outerErr) { // Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun). // The inner catch did not fire, so we must record the failure here. const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure"; @@ -1710,8 +1724,9 @@ export function heartbeatService(db: Db) { await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined); } finally { await releaseRuntimeServicesForRun(run.id).catch(() => undefined); - activeRunExecutions.delete(run.id); + activeRunExecutions.delete(run.id); await startNextQueuedRunForAgent(run.agentId); + } } async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) { @@ -2457,6 +2472,8 @@ export function heartbeatService(db: Db) { reapOrphanedRuns, + resumeQueuedRuns, + tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0;