diff --git a/packages/adapters/opencode-local/src/server/models.ts b/packages/adapters/opencode-local/src/server/models.ts index dd2eb2c6..a4d1a46d 100644 --- a/packages/adapters/opencode-local/src/server/models.ts +++ b/packages/adapters/opencode-local/src/server/models.ts @@ -7,6 +7,7 @@ import { } from "@paperclipai/adapter-utils/server-utils"; const MODELS_CACHE_TTL_MS = 60_000; +const MODELS_DISCOVERY_TIMEOUT_MS = 20_000; function resolveOpenCodeCommand(input: unknown): string { const envOverride = @@ -115,14 +116,14 @@ export async function discoverOpenCodeModels(input: { { cwd, env: runtimeEnv, - timeoutSec: 20, + timeoutSec: MODELS_DISCOVERY_TIMEOUT_MS / 1000, graceSec: 3, onLog: async () => {}, }, ); if (result.timedOut) { - throw new Error("`opencode models` timed out."); + throw new Error(`\`opencode models\` timed out after ${MODELS_DISCOVERY_TIMEOUT_MS / 1000}s.`); } if ((result.exitCode ?? 1) !== 0) { const detail = firstNonEmptyLine(result.stderr) || firstNonEmptyLine(result.stdout); diff --git a/server/package.json b/server/package.json index 1dd9b073..cd30cf13 100644 --- a/server/package.json +++ b/server/package.json @@ -38,9 +38,9 @@ "@paperclipai/adapter-codex-local": "workspace:*", "@paperclipai/adapter-cursor-local": "workspace:*", "@paperclipai/adapter-gemini-local": "workspace:*", + "@paperclipai/adapter-openclaw-gateway": "workspace:*", "@paperclipai/adapter-opencode-local": "workspace:*", "@paperclipai/adapter-pi-local": "workspace:*", - "@paperclipai/adapter-openclaw-gateway": "workspace:*", "@paperclipai/adapter-utils": "workspace:*", "@paperclipai/db": "workspace:*", "@paperclipai/shared": "workspace:*", diff --git a/server/src/index.ts b/server/src/index.ts index 50c6a7b2..27b559eb 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -53,6 +53,7 @@ type EmbeddedPostgresCtor = new (opts: { password: string; port: number; persistent: boolean; + initdbFlags?: string[]; onLog?: (message: unknown) => void; onError?: (message: unknown) => void; }) => EmbeddedPostgresInstance; @@ -513,11 +514,14 @@ export async function startServer(): Promise { if (config.heartbeatSchedulerEnabled) { const heartbeat = heartbeatService(db as any); - // Reap orphaned runs at startup (no threshold -- runningProcesses is empty) - void heartbeat.reapOrphanedRuns().catch((err) => { - logger.error({ err }, "startup reap of orphaned heartbeat runs failed"); - }); - + // Reap orphaned running runs at startup while in-memory execution state is empty, + // then resume any persisted queued runs that were waiting on the previous process. + void heartbeat + .reapOrphanedRuns() + .then(() => heartbeat.resumeQueuedRuns()) + .catch((err) => { + logger.error({ err }, "startup heartbeat recovery failed"); + }); setInterval(() => { void heartbeat .tickTimers(new Date()) @@ -530,11 +534,13 @@ export async function startServer(): Promise { logger.error({ err }, "heartbeat timer tick failed"); }); - // Periodically reap orphaned runs (5-min staleness threshold) + // Periodically reap orphaned runs (5-min staleness threshold) and make sure + // persisted queued work is still being driven forward. void heartbeat .reapOrphanedRuns({ staleThresholdMs: 5 * 60 * 1000 }) + .then(() => heartbeat.resumeQueuedRuns()) .catch((err) => { - logger.error({ err }, "periodic reap of orphaned heartbeat runs failed"); + logger.error({ err }, "periodic heartbeat recovery failed"); }); }, config.heartbeatSchedulerIntervalMs); } diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index e782bc25..f0665c9a 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -455,6 +455,7 @@ export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); const secretsSvc = secretService(db); const issuesSvc = issueService(db); + const activeRunExecutions = new Set(); async function getAgent(agentId: string) { return db @@ -959,7 +960,7 @@ export function heartbeatService(db: Db) { const reaped: string[] = []; for (const run of activeRuns) { - if (runningProcesses.has(run.id)) continue; + if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue; // Apply staleness threshold to avoid false positives if (staleThresholdMs > 0) { @@ -998,6 +999,18 @@ export function heartbeatService(db: Db) { return { reaped: reaped.length, runIds: reaped }; } + async function resumeQueuedRuns() { + const queuedRuns = await db + .select({ agentId: heartbeatRuns.agentId }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.status, "queued")); + + const agentIds = [...new Set(queuedRuns.map((r) => r.agentId))]; + for (const agentId of agentIds) { + await startNextQueuedRunForAgent(agentId); + } + } + async function updateRuntimeState( agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, @@ -1089,6 +1102,9 @@ export function heartbeatService(db: Db) { run = claimed; } + activeRunExecutions.add(run.id); + + try { const agent = await getAgent(run.agentId); if (!agent) { await setRunStatus(runId, "failed", { @@ -1676,10 +1692,41 @@ export function heartbeatService(db: Db) { } await finalizeAgentStatus(agent.id, "failed"); - } finally { - await releaseRuntimeServicesForRun(run.id); - await startNextQueuedRunForAgent(agent.id); } + } catch (outerErr) { + // Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun). + // The inner catch did not fire, so we must record the failure here. + const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure"; + logger.error({ err: outerErr, runId }, "heartbeat execution setup failed"); + await setRunStatus(runId, "failed", { + error: message, + errorCode: "adapter_failed", + finishedAt: new Date(), + }).catch(() => undefined); + await setWakeupStatus(run.wakeupRequestId, "failed", { + finishedAt: new Date(), + error: message, + }).catch(() => undefined); + const failedRun = await getRun(runId).catch(() => null); + if (failedRun) { + // Emit a run-log event so the failure is visible in the run timeline, + // consistent with what the inner catch block does for adapter failures. + await appendRunEvent(failedRun, 1, { + eventType: "error", + stream: "system", + level: "error", + message, + }).catch(() => undefined); + await releaseIssueExecutionAndPromote(failedRun).catch(() => undefined); + } + // Ensure the agent is not left stuck in "running" if the inner catch handler's + // DB calls threw (e.g. a transient DB error in finalizeAgentStatus). + await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined); + } finally { + await releaseRuntimeServicesForRun(run.id).catch(() => undefined); + activeRunExecutions.delete(run.id); + await startNextQueuedRunForAgent(run.agentId); + } } async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) { @@ -2425,6 +2472,8 @@ export function heartbeatService(db: Db) { reapOrphanedRuns, + resumeQueuedRuns, + tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0;