Merge pull request #251 from mjaverto/fix/process-lost-reaper

fix(heartbeat): prevent false process_lost failures on queued and non-child-process runs
This commit is contained in:
Dotta
2026-03-13 07:06:10 -05:00
committed by GitHub
4 changed files with 70 additions and 14 deletions

View File

@@ -7,6 +7,7 @@ import {
} from "@paperclipai/adapter-utils/server-utils";
const MODELS_CACHE_TTL_MS = 60_000;
const MODELS_DISCOVERY_TIMEOUT_MS = 20_000;
function resolveOpenCodeCommand(input: unknown): string {
const envOverride =
@@ -115,14 +116,14 @@ export async function discoverOpenCodeModels(input: {
{
cwd,
env: runtimeEnv,
timeoutSec: 20,
timeoutSec: MODELS_DISCOVERY_TIMEOUT_MS / 1000,
graceSec: 3,
onLog: async () => {},
},
);
if (result.timedOut) {
throw new Error("`opencode models` timed out.");
throw new Error(`\`opencode models\` timed out after ${MODELS_DISCOVERY_TIMEOUT_MS / 1000}s.`);
}
if ((result.exitCode ?? 1) !== 0) {
const detail = firstNonEmptyLine(result.stderr) || firstNonEmptyLine(result.stdout);

View File

@@ -38,9 +38,9 @@
"@paperclipai/adapter-codex-local": "workspace:*",
"@paperclipai/adapter-cursor-local": "workspace:*",
"@paperclipai/adapter-gemini-local": "workspace:*",
"@paperclipai/adapter-openclaw-gateway": "workspace:*",
"@paperclipai/adapter-opencode-local": "workspace:*",
"@paperclipai/adapter-pi-local": "workspace:*",
"@paperclipai/adapter-openclaw-gateway": "workspace:*",
"@paperclipai/adapter-utils": "workspace:*",
"@paperclipai/db": "workspace:*",
"@paperclipai/shared": "workspace:*",

View File

@@ -53,6 +53,7 @@ type EmbeddedPostgresCtor = new (opts: {
password: string;
port: number;
persistent: boolean;
initdbFlags?: string[];
onLog?: (message: unknown) => void;
onError?: (message: unknown) => void;
}) => EmbeddedPostgresInstance;
@@ -513,11 +514,14 @@ export async function startServer(): Promise<StartedServer> {
if (config.heartbeatSchedulerEnabled) {
const heartbeat = heartbeatService(db as any);
// Reap orphaned runs at startup (no threshold -- runningProcesses is empty)
void heartbeat.reapOrphanedRuns().catch((err) => {
logger.error({ err }, "startup reap of orphaned heartbeat runs failed");
});
// Reap orphaned running runs at startup while in-memory execution state is empty,
// then resume any persisted queued runs that were waiting on the previous process.
void heartbeat
.reapOrphanedRuns()
.then(() => heartbeat.resumeQueuedRuns())
.catch((err) => {
logger.error({ err }, "startup heartbeat recovery failed");
});
setInterval(() => {
void heartbeat
.tickTimers(new Date())
@@ -530,11 +534,13 @@ export async function startServer(): Promise<StartedServer> {
logger.error({ err }, "heartbeat timer tick failed");
});
// Periodically reap orphaned runs (5-min staleness threshold)
// Periodically reap orphaned runs (5-min staleness threshold) and make sure
// persisted queued work is still being driven forward.
void heartbeat
.reapOrphanedRuns({ staleThresholdMs: 5 * 60 * 1000 })
.then(() => heartbeat.resumeQueuedRuns())
.catch((err) => {
logger.error({ err }, "periodic reap of orphaned heartbeat runs failed");
logger.error({ err }, "periodic heartbeat recovery failed");
});
}, config.heartbeatSchedulerIntervalMs);
}

View File

@@ -455,6 +455,7 @@ export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore();
const secretsSvc = secretService(db);
const issuesSvc = issueService(db);
const activeRunExecutions = new Set<string>();
async function getAgent(agentId: string) {
return db
@@ -959,7 +960,7 @@ export function heartbeatService(db: Db) {
const reaped: string[] = [];
for (const run of activeRuns) {
if (runningProcesses.has(run.id)) continue;
if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue;
// Apply staleness threshold to avoid false positives
if (staleThresholdMs > 0) {
@@ -998,6 +999,18 @@ export function heartbeatService(db: Db) {
return { reaped: reaped.length, runIds: reaped };
}
async function resumeQueuedRuns() {
const queuedRuns = await db
.select({ agentId: heartbeatRuns.agentId })
.from(heartbeatRuns)
.where(eq(heartbeatRuns.status, "queued"));
const agentIds = [...new Set(queuedRuns.map((r) => r.agentId))];
for (const agentId of agentIds) {
await startNextQueuedRunForAgent(agentId);
}
}
async function updateRuntimeState(
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
@@ -1089,6 +1102,9 @@ export function heartbeatService(db: Db) {
run = claimed;
}
activeRunExecutions.add(run.id);
try {
const agent = await getAgent(run.agentId);
if (!agent) {
await setRunStatus(runId, "failed", {
@@ -1676,10 +1692,41 @@ export function heartbeatService(db: Db) {
}
await finalizeAgentStatus(agent.id, "failed");
} finally {
await releaseRuntimeServicesForRun(run.id);
await startNextQueuedRunForAgent(agent.id);
}
} catch (outerErr) {
// Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun).
// The inner catch did not fire, so we must record the failure here.
const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure";
logger.error({ err: outerErr, runId }, "heartbeat execution setup failed");
await setRunStatus(runId, "failed", {
error: message,
errorCode: "adapter_failed",
finishedAt: new Date(),
}).catch(() => undefined);
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: new Date(),
error: message,
}).catch(() => undefined);
const failedRun = await getRun(runId).catch(() => null);
if (failedRun) {
// Emit a run-log event so the failure is visible in the run timeline,
// consistent with what the inner catch block does for adapter failures.
await appendRunEvent(failedRun, 1, {
eventType: "error",
stream: "system",
level: "error",
message,
}).catch(() => undefined);
await releaseIssueExecutionAndPromote(failedRun).catch(() => undefined);
}
// Ensure the agent is not left stuck in "running" if the inner catch handler's
// DB calls threw (e.g. a transient DB error in finalizeAgentStatus).
await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined);
} finally {
await releaseRuntimeServicesForRun(run.id).catch(() => undefined);
activeRunExecutions.delete(run.id);
await startNextQueuedRunForAgent(run.agentId);
}
}
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
@@ -2425,6 +2472,8 @@ export function heartbeatService(db: Db) {
reapOrphanedRuns,
resumeQueuedRuns,
tickTimers: async (now = new Date()) => {
const allAgents = await db.select().from(agents);
let checked = 0;