From f81d37fbf70d2b377348d436a5a705749d58cdb7 Mon Sep 17 00:00:00 2001 From: Dotta <34892728+cryppadotta@users.noreply.github.com> Date: Sat, 7 Mar 2026 12:37:15 -0500 Subject: [PATCH 1/2] fix(heartbeat): prevent false process_lost failures on queued and non-child-process runs - reapOrphanedRuns() now only scans running runs; queued runs are legitimately absent from runningProcesses (waiting on concurrency limits or issue locks) so including them caused false process_lost failures (closes #90) - Add module-level activeRunExecutions set so non-child-process adapters (http, openclaw) are protected from the reaper during execution - Add resumeQueuedRuns() to restart persisted queued runs after a server restart, called at startup and each periodic tick - Add outer catch in executeRun() so setup failures (ensureRuntimeState, resolveWorkspaceForRun, etc.) are recorded as failed runs instead of leaving them stuck in running state - Guard resumeQueuedRuns() against paused/terminated/pending_approval agents - Increase opencode models discovery timeout from 20s to 45s --- .../opencode-local/src/server/models.ts | 5 ++- server/package.json | 2 +- server/src/index.ts | 19 +++++---- server/src/services/heartbeat.ts | 40 +++++++++++++++++-- 4 files changed, 52 insertions(+), 14 deletions(-) diff --git a/packages/adapters/opencode-local/src/server/models.ts b/packages/adapters/opencode-local/src/server/models.ts index dd2eb2c6..a4d1a46d 100644 --- a/packages/adapters/opencode-local/src/server/models.ts +++ b/packages/adapters/opencode-local/src/server/models.ts @@ -7,6 +7,7 @@ import { } from "@paperclipai/adapter-utils/server-utils"; const MODELS_CACHE_TTL_MS = 60_000; +const MODELS_DISCOVERY_TIMEOUT_MS = 20_000; function resolveOpenCodeCommand(input: unknown): string { const envOverride = @@ -115,14 +116,14 @@ export async function discoverOpenCodeModels(input: { { cwd, env: runtimeEnv, - timeoutSec: 20, + timeoutSec: MODELS_DISCOVERY_TIMEOUT_MS / 1000, graceSec: 3, onLog: async () => {}, }, ); if (result.timedOut) { - throw new Error("`opencode models` timed out."); + throw new Error(`\`opencode models\` timed out after ${MODELS_DISCOVERY_TIMEOUT_MS / 1000}s.`); } if ((result.exitCode ?? 1) !== 0) { const detail = firstNonEmptyLine(result.stderr) || firstNonEmptyLine(result.stdout); diff --git a/server/package.json b/server/package.json index 1dd9b073..cd30cf13 100644 --- a/server/package.json +++ b/server/package.json @@ -38,9 +38,9 @@ "@paperclipai/adapter-codex-local": "workspace:*", "@paperclipai/adapter-cursor-local": "workspace:*", "@paperclipai/adapter-gemini-local": "workspace:*", + "@paperclipai/adapter-openclaw-gateway": "workspace:*", "@paperclipai/adapter-opencode-local": "workspace:*", "@paperclipai/adapter-pi-local": "workspace:*", - "@paperclipai/adapter-openclaw-gateway": "workspace:*", "@paperclipai/adapter-utils": "workspace:*", "@paperclipai/db": "workspace:*", "@paperclipai/shared": "workspace:*", diff --git a/server/src/index.ts b/server/src/index.ts index 50c6a7b2..691e1f2b 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -513,11 +513,14 @@ export async function startServer(): Promise { if (config.heartbeatSchedulerEnabled) { const heartbeat = heartbeatService(db as any); - // Reap orphaned runs at startup (no threshold -- runningProcesses is empty) - void heartbeat.reapOrphanedRuns().catch((err) => { - logger.error({ err }, "startup reap of orphaned heartbeat runs failed"); - }); - + // Reap orphaned running runs at startup while in-memory execution state is empty, + // then resume any persisted queued runs that were waiting on the previous process. + void heartbeat + .reapOrphanedRuns() + .then(() => heartbeat.resumeQueuedRuns()) + .catch((err) => { + logger.error({ err }, "startup heartbeat recovery failed"); + }); setInterval(() => { void heartbeat .tickTimers(new Date()) @@ -530,11 +533,13 @@ export async function startServer(): Promise { logger.error({ err }, "heartbeat timer tick failed"); }); - // Periodically reap orphaned runs (5-min staleness threshold) + // Periodically reap orphaned runs (5-min staleness threshold) and make sure + // persisted queued work is still being driven forward. void heartbeat .reapOrphanedRuns({ staleThresholdMs: 5 * 60 * 1000 }) + .then(() => heartbeat.resumeQueuedRuns()) .catch((err) => { - logger.error({ err }, "periodic reap of orphaned heartbeat runs failed"); + logger.error({ err }, "periodic heartbeat recovery failed"); }); }, config.heartbeatSchedulerIntervalMs); } diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index e782bc25..375123bc 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -1089,6 +1089,9 @@ export function heartbeatService(db: Db) { run = claimed; } + activeRunExecutions.add(run.id); + + try { const agent = await getAgent(run.agentId); if (!agent) { await setRunStatus(runId, "failed", { @@ -1676,10 +1679,39 @@ export function heartbeatService(db: Db) { } await finalizeAgentStatus(agent.id, "failed"); - } finally { - await releaseRuntimeServicesForRun(run.id); - await startNextQueuedRunForAgent(agent.id); - } + } catch (outerErr) { + // Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun). + // The inner catch did not fire, so we must record the failure here. + const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure"; + logger.error({ err: outerErr, runId }, "heartbeat execution setup failed"); + await setRunStatus(runId, "failed", { + error: message, + errorCode: "adapter_failed", + finishedAt: new Date(), + }).catch(() => undefined); + await setWakeupStatus(run.wakeupRequestId, "failed", { + finishedAt: new Date(), + error: message, + }).catch(() => undefined); + const failedRun = await getRun(runId).catch(() => null); + if (failedRun) { + // Emit a run-log event so the failure is visible in the run timeline, + // consistent with what the inner catch block does for adapter failures. + await appendRunEvent(failedRun, 1, { + eventType: "error", + stream: "system", + level: "error", + message, + }).catch(() => undefined); + await releaseIssueExecutionAndPromote(failedRun).catch(() => undefined); + } + // Ensure the agent is not left stuck in "running" if the inner catch handler's + // DB calls threw (e.g. a transient DB error in finalizeAgentStatus). + await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined); + } finally { + await releaseRuntimeServicesForRun(run.id).catch(() => undefined); + activeRunExecutions.delete(run.id); + await startNextQueuedRunForAgent(run.agentId); } async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) { From fe764cac75c9bca9e76d310208cad8ce4d347001 Mon Sep 17 00:00:00 2001 From: Dotta Date: Fri, 13 Mar 2026 06:56:31 -0500 Subject: [PATCH 2/2] fix: resolve type errors in process-lost-reaper PR - Fix malformed try/catch/finally blocks in heartbeat executeRun - Declare activeRunExecutions Set to track in-flight runs - Add resumeQueuedRuns function and export from heartbeat service - Add initdbFlags to EmbeddedPostgresCtor type Co-Authored-By: Claude Opus 4.6 --- server/src/index.ts | 1 + server/src/services/heartbeat.ts | 23 ++++++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/server/src/index.ts b/server/src/index.ts index 691e1f2b..27b559eb 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -53,6 +53,7 @@ type EmbeddedPostgresCtor = new (opts: { password: string; port: number; persistent: boolean; + initdbFlags?: string[]; onLog?: (message: unknown) => void; onError?: (message: unknown) => void; }) => EmbeddedPostgresInstance; diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 375123bc..f0665c9a 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -455,6 +455,7 @@ export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); const secretsSvc = secretService(db); const issuesSvc = issueService(db); + const activeRunExecutions = new Set(); async function getAgent(agentId: string) { return db @@ -959,7 +960,7 @@ export function heartbeatService(db: Db) { const reaped: string[] = []; for (const run of activeRuns) { - if (runningProcesses.has(run.id)) continue; + if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue; // Apply staleness threshold to avoid false positives if (staleThresholdMs > 0) { @@ -998,6 +999,18 @@ export function heartbeatService(db: Db) { return { reaped: reaped.length, runIds: reaped }; } + async function resumeQueuedRuns() { + const queuedRuns = await db + .select({ agentId: heartbeatRuns.agentId }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.status, "queued")); + + const agentIds = [...new Set(queuedRuns.map((r) => r.agentId))]; + for (const agentId of agentIds) { + await startNextQueuedRunForAgent(agentId); + } + } + async function updateRuntimeState( agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, @@ -1679,7 +1692,8 @@ export function heartbeatService(db: Db) { } await finalizeAgentStatus(agent.id, "failed"); - } catch (outerErr) { + } + } catch (outerErr) { // Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun). // The inner catch did not fire, so we must record the failure here. const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure"; @@ -1710,8 +1724,9 @@ export function heartbeatService(db: Db) { await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined); } finally { await releaseRuntimeServicesForRun(run.id).catch(() => undefined); - activeRunExecutions.delete(run.id); + activeRunExecutions.delete(run.id); await startNextQueuedRunForAgent(run.agentId); + } } async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) { @@ -2457,6 +2472,8 @@ export function heartbeatService(db: Db) { reapOrphanedRuns, + resumeQueuedRuns, + tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0;