Improve orphaned local heartbeat recovery

Persist child-process metadata for local adapter runs, keep detached runs alive when their pid still exists, queue a single automatic retry when the pid is confirmed dead, and clear detached warnings when the original run reports activity again.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta
2026-03-19 11:20:36 -05:00
parent 7f3fad64b8
commit c844ca1a40
17 changed files with 10924 additions and 22 deletions

View File

@@ -0,0 +1,321 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import net from "node:net";
import os from "node:os";
import path from "node:path";
import { spawn, type ChildProcess } from "node:child_process";
import { eq } from "drizzle-orm";
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
import {
applyPendingMigrations,
createDb,
ensurePostgresDatabase,
agents,
agentWakeupRequests,
companies,
heartbeatRunEvents,
heartbeatRuns,
issues,
} from "@paperclipai/db";
import { runningProcesses } from "../adapters/index.ts";
import { heartbeatService } from "../services/heartbeat.ts";
type EmbeddedPostgresInstance = {
initialise(): Promise<void>;
start(): Promise<void>;
stop(): Promise<void>;
};
type EmbeddedPostgresCtor = new (opts: {
databaseDir: string;
user: string;
password: string;
port: number;
persistent: boolean;
initdbFlags?: string[];
onLog?: (message: unknown) => void;
onError?: (message: unknown) => void;
}) => EmbeddedPostgresInstance;
async function getEmbeddedPostgresCtor(): Promise<EmbeddedPostgresCtor> {
const mod = await import("embedded-postgres");
return mod.default as EmbeddedPostgresCtor;
}
async function getAvailablePort(): Promise<number> {
return await new Promise((resolve, reject) => {
const server = net.createServer();
server.unref();
server.on("error", reject);
server.listen(0, "127.0.0.1", () => {
const address = server.address();
if (!address || typeof address === "string") {
server.close(() => reject(new Error("Failed to allocate test port")));
return;
}
const { port } = address;
server.close((error) => {
if (error) reject(error);
else resolve(port);
});
});
});
}
async function startTempDatabase() {
const dataDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-heartbeat-recovery-"));
const port = await getAvailablePort();
const EmbeddedPostgres = await getEmbeddedPostgresCtor();
const instance = new EmbeddedPostgres({
databaseDir: dataDir,
user: "paperclip",
password: "paperclip",
port,
persistent: true,
initdbFlags: ["--encoding=UTF8", "--locale=C"],
onLog: () => {},
onError: () => {},
});
await instance.initialise();
await instance.start();
const adminConnectionString = `postgres://paperclip:paperclip@127.0.0.1:${port}/postgres`;
await ensurePostgresDatabase(adminConnectionString, "paperclip");
const connectionString = `postgres://paperclip:paperclip@127.0.0.1:${port}/paperclip`;
await applyPendingMigrations(connectionString);
return { connectionString, instance, dataDir };
}
function spawnAliveProcess() {
return spawn(process.execPath, ["-e", "setInterval(() => {}, 1000)"], {
stdio: "ignore",
});
}
describe("heartbeat orphaned process recovery", () => {
let db!: ReturnType<typeof createDb>;
let instance: EmbeddedPostgresInstance | null = null;
let dataDir = "";
const childProcesses = new Set<ChildProcess>();
beforeAll(async () => {
const started = await startTempDatabase();
db = createDb(started.connectionString);
instance = started.instance;
dataDir = started.dataDir;
}, 20_000);
afterEach(async () => {
runningProcesses.clear();
for (const child of childProcesses) {
child.kill("SIGKILL");
}
childProcesses.clear();
await db.delete(issues);
await db.delete(heartbeatRunEvents);
await db.delete(heartbeatRuns);
await db.delete(agentWakeupRequests);
await db.delete(agents);
await db.delete(companies);
});
afterAll(async () => {
for (const child of childProcesses) {
child.kill("SIGKILL");
}
childProcesses.clear();
runningProcesses.clear();
await instance?.stop();
if (dataDir) {
fs.rmSync(dataDir, { recursive: true, force: true });
}
});
async function seedRunFixture(input?: {
adapterType?: string;
runStatus?: "running" | "queued" | "failed";
processPid?: number | null;
processLossRetryCount?: number;
includeIssue?: boolean;
runErrorCode?: string | null;
runError?: string | null;
}) {
const companyId = randomUUID();
const agentId = randomUUID();
const runId = randomUUID();
const wakeupRequestId = randomUUID();
const issueId = randomUUID();
const now = new Date("2026-03-19T00:00:00.000Z");
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: "paused",
adapterType: input?.adapterType ?? "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
});
await db.insert(agentWakeupRequests).values({
id: wakeupRequestId,
companyId,
agentId,
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: input?.includeIssue === false ? {} : { issueId },
status: "claimed",
runId,
claimedAt: now,
});
await db.insert(heartbeatRuns).values({
id: runId,
companyId,
agentId,
invocationSource: "assignment",
triggerDetail: "system",
status: input?.runStatus ?? "running",
wakeupRequestId,
contextSnapshot: input?.includeIssue === false ? {} : { issueId },
processPid: input?.processPid ?? null,
processLossRetryCount: input?.processLossRetryCount ?? 0,
errorCode: input?.runErrorCode ?? null,
error: input?.runError ?? null,
startedAt: now,
updatedAt: new Date("2026-03-19T00:00:00.000Z"),
});
if (input?.includeIssue !== false) {
await db.insert(issues).values({
id: issueId,
companyId,
title: "Recover local adapter after lost process",
status: "in_progress",
priority: "medium",
assigneeAgentId: agentId,
checkoutRunId: runId,
executionRunId: runId,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
});
}
return { companyId, agentId, runId, wakeupRequestId, issueId };
}
it("keeps a local run active when the recorded pid is still alive", async () => {
const child = spawnAliveProcess();
childProcesses.add(child);
expect(child.pid).toBeTypeOf("number");
const { runId, wakeupRequestId } = await seedRunFixture({
processPid: child.pid ?? null,
includeIssue: false,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(0);
const run = await heartbeat.getRun(runId);
expect(run?.status).toBe("running");
expect(run?.errorCode).toBe("process_detached");
expect(run?.error).toContain(String(child.pid));
const wakeup = await db
.select()
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, wakeupRequestId))
.then((rows) => rows[0] ?? null);
expect(wakeup?.status).toBe("claimed");
});
it("queues exactly one retry when the recorded local pid is dead", async () => {
const { agentId, runId, issueId } = await seedRunFixture({
processPid: 999_999_999,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
const failedRun = runs.find((row) => row.id === runId);
const retryRun = runs.find((row) => row.id !== runId);
expect(failedRun?.status).toBe("failed");
expect(failedRun?.errorCode).toBe("process_lost");
expect(retryRun?.status).toBe("queued");
expect(retryRun?.retryOfRunId).toBe(runId);
expect(retryRun?.processLossRetryCount).toBe(1);
const issue = await db
.select()
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null);
expect(issue?.executionRunId).toBe(retryRun?.id ?? null);
expect(issue?.checkoutRunId).toBe(runId);
});
it("does not queue a second retry after the first process-loss retry was already used", async () => {
const { agentId, runId, issueId } = await seedRunFixture({
processPid: 999_999_999,
processLossRetryCount: 1,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(1);
expect(runs[0]?.status).toBe("failed");
const issue = await db
.select()
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null);
expect(issue?.executionRunId).toBeNull();
expect(issue?.checkoutRunId).toBe(runId);
});
it("clears the detached warning when the run reports activity again", async () => {
const { runId } = await seedRunFixture({
includeIssue: false,
runErrorCode: "process_detached",
runError: "Lost in-memory process handle, but child pid 123 is still alive",
});
const heartbeat = heartbeatService(db);
const updated = await heartbeat.reportRunActivity(runId);
expect(updated?.errorCode).toBeNull();
expect(updated?.error).toBeNull();
const run = await heartbeat.getRun(runId);
expect(run?.errorCode).toBeNull();
expect(run?.error).toBeNull();
});
});

View File

@@ -820,6 +820,7 @@ export function issueRoutes(db: Db, storage: StorageService) {
}
if (!(await assertAgentRunCheckoutOwnership(req, res, existing))) return;
const actor = getActorInfo(req);
const { comment: commentBody, hiddenAt: hiddenAtRaw, ...updateFields } = req.body;
if (hiddenAtRaw !== undefined) {
updateFields.hiddenAt = hiddenAtRaw ? new Date(hiddenAtRaw) : null;
@@ -856,6 +857,11 @@ export function issueRoutes(db: Db, storage: StorageService) {
return;
}
if (actor.runId) {
await heartbeat.reportRunActivity(actor.runId).catch((err) =>
logger.warn({ err, runId: actor.runId }, "failed to clear detached run warning after issue activity"));
}
// Build activity details with previous values for changed fields
const previous: Record<string, unknown> = {};
for (const key of Object.keys(updateFields)) {
@@ -864,7 +870,6 @@ export function issueRoutes(db: Db, storage: StorageService) {
}
}
const actor = getActorInfo(req);
const hasFieldChanges = Object.keys(previous).length > 0;
await logActivity(db, {
companyId: issue.companyId,
@@ -1278,6 +1283,11 @@ export function issueRoutes(db: Db, storage: StorageService) {
userId: actor.actorType === "user" ? actor.actorId : undefined,
});
if (actor.runId) {
await heartbeat.reportRunActivity(actor.runId).catch((err) =>
logger.warn({ err, runId: actor.runId }, "failed to clear detached run warning after issue comment"));
}
await logActivity(db, {
companyId: currentIssue.companyId,
actorType: actor.actorType,

View File

@@ -60,6 +60,7 @@ const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1;
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10;
const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext";
const DETACHED_PROCESS_ERROR_CODE = "process_detached";
const startLocksByAgent = new Map<string, Promise<void>>();
const REPO_ONLY_CWD_SENTINEL = "/__paperclip_repo_only__";
const MANAGED_WORKSPACE_GIT_CLONE_TIMEOUT_MS = 10 * 60 * 1000;
@@ -163,6 +164,10 @@ const heartbeatRunListColumns = {
stderrExcerpt: sql<string | null>`NULL`.as("stderrExcerpt"),
errorCode: heartbeatRuns.errorCode,
externalRunId: heartbeatRuns.externalRunId,
processPid: heartbeatRuns.processPid,
processStartedAt: heartbeatRuns.processStartedAt,
retryOfRunId: heartbeatRuns.retryOfRunId,
processLossRetryCount: heartbeatRuns.processLossRetryCount,
contextSnapshot: heartbeatRuns.contextSnapshot,
createdAt: heartbeatRuns.createdAt,
updatedAt: heartbeatRuns.updatedAt,
@@ -598,6 +603,23 @@ function isSameTaskScope(left: string | null, right: string | null) {
return (left ?? null) === (right ?? null);
}
function isTrackedLocalChildProcessAdapter(adapterType: string) {
return SESSIONED_LOCAL_ADAPTERS.has(adapterType);
}
function isProcessAlive(pid: number | null | undefined) {
if (typeof pid !== "number" || !Number.isInteger(pid) || pid <= 0) return false;
try {
process.kill(pid, 0);
return true;
} catch (error) {
const code = (error as NodeJS.ErrnoException | undefined)?.code;
if (code === "EPERM") return true;
if (code === "ESRCH") return false;
return false;
}
}
function truncateDisplayId(value: string | null | undefined, max = 128) {
if (!value) return null;
return value.length > max ? value.slice(0, max) : value;
@@ -1326,6 +1348,156 @@ export function heartbeatService(db: Db) {
});
}
async function nextRunEventSeq(runId: string) {
const [row] = await db
.select({ maxSeq: sql<number | null>`max(${heartbeatRunEvents.seq})` })
.from(heartbeatRunEvents)
.where(eq(heartbeatRunEvents.runId, runId));
return Number(row?.maxSeq ?? 0) + 1;
}
async function persistRunProcessMetadata(
runId: string,
meta: { pid: number; startedAt: string },
) {
const startedAt = new Date(meta.startedAt);
return db
.update(heartbeatRuns)
.set({
processPid: meta.pid,
processStartedAt: Number.isNaN(startedAt.getTime()) ? new Date() : startedAt,
updatedAt: new Date(),
})
.where(eq(heartbeatRuns.id, runId))
.returning()
.then((rows) => rows[0] ?? null);
}
async function clearDetachedRunWarning(runId: string) {
const updated = await db
.update(heartbeatRuns)
.set({
error: null,
errorCode: null,
updatedAt: new Date(),
})
.where(and(eq(heartbeatRuns.id, runId), eq(heartbeatRuns.status, "running"), eq(heartbeatRuns.errorCode, DETACHED_PROCESS_ERROR_CODE)))
.returning()
.then((rows) => rows[0] ?? null);
if (!updated) return null;
await appendRunEvent(updated, await nextRunEventSeq(updated.id), {
eventType: "lifecycle",
stream: "system",
level: "info",
message: "Detached child process reported activity; cleared detached warning",
});
return updated;
}
async function enqueueProcessLossRetry(
run: typeof heartbeatRuns.$inferSelect,
agent: typeof agents.$inferSelect,
now: Date,
) {
const contextSnapshot = parseObject(run.contextSnapshot);
const issueId = readNonEmptyString(contextSnapshot.issueId);
const taskKey = deriveTaskKey(contextSnapshot, null);
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
const retryContextSnapshot = {
...contextSnapshot,
retryOfRunId: run.id,
wakeReason: "process_lost_retry",
retryReason: "process_lost",
};
const queued = await db.transaction(async (tx) => {
const wakeupRequest = await tx
.insert(agentWakeupRequests)
.values({
companyId: run.companyId,
agentId: run.agentId,
source: "automation",
triggerDetail: "system",
reason: "process_lost_retry",
payload: {
...(issueId ? { issueId } : {}),
retryOfRunId: run.id,
},
status: "queued",
requestedByActorType: "system",
requestedByActorId: null,
updatedAt: now,
})
.returning()
.then((rows) => rows[0]);
const retryRun = await tx
.insert(heartbeatRuns)
.values({
companyId: run.companyId,
agentId: run.agentId,
invocationSource: "automation",
triggerDetail: "system",
status: "queued",
wakeupRequestId: wakeupRequest.id,
contextSnapshot: retryContextSnapshot,
sessionIdBefore: sessionBefore,
retryOfRunId: run.id,
processLossRetryCount: (run.processLossRetryCount ?? 0) + 1,
updatedAt: now,
})
.returning()
.then((rows) => rows[0]);
await tx
.update(agentWakeupRequests)
.set({
runId: retryRun.id,
updatedAt: now,
})
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
if (issueId) {
await tx
.update(issues)
.set({
executionRunId: retryRun.id,
executionAgentNameKey: normalizeAgentNameKey(agent.name),
executionLockedAt: now,
updatedAt: now,
})
.where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id)));
}
return retryRun;
});
publishLiveEvent({
companyId: queued.companyId,
type: "heartbeat.run.queued",
payload: {
runId: queued.id,
agentId: queued.agentId,
invocationSource: queued.invocationSource,
triggerDetail: queued.triggerDetail,
wakeupRequestId: queued.wakeupRequestId,
},
});
await appendRunEvent(queued, 1, {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "Queued automatic retry after orphaned child process was confirmed dead",
payload: {
retryOfRunId: run.id,
},
});
return queued;
}
function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) {
const runtimeConfig = parseObject(agent.runtimeConfig);
const heartbeat = parseObject(runtimeConfig.heartbeat);
@@ -1453,13 +1625,17 @@ export function heartbeatService(db: Db) {
// Find all runs stuck in "running" state (queued runs are legitimately waiting; resumeQueuedRuns handles them)
const activeRuns = await db
.select()
.select({
run: heartbeatRuns,
adapterType: agents.adapterType,
})
.from(heartbeatRuns)
.innerJoin(agents, eq(heartbeatRuns.agentId, agents.id))
.where(eq(heartbeatRuns.status, "running"));
const reaped: string[] = [];
for (const run of activeRuns) {
for (const { run, adapterType } of activeRuns) {
if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue;
// Apply staleness threshold to avoid false positives
@@ -1468,25 +1644,69 @@ export function heartbeatService(db: Db) {
if (now.getTime() - refTime < staleThresholdMs) continue;
}
await setRunStatus(run.id, "failed", {
error: "Process lost -- server may have restarted",
const tracksLocalChild = isTrackedLocalChildProcessAdapter(adapterType);
if (tracksLocalChild && run.processPid && isProcessAlive(run.processPid)) {
if (run.errorCode !== DETACHED_PROCESS_ERROR_CODE) {
const detachedMessage = `Lost in-memory process handle, but child pid ${run.processPid} is still alive`;
const detachedRun = await setRunStatus(run.id, "running", {
error: detachedMessage,
errorCode: DETACHED_PROCESS_ERROR_CODE,
});
if (detachedRun) {
await appendRunEvent(detachedRun, await nextRunEventSeq(detachedRun.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: detachedMessage,
payload: {
processPid: run.processPid,
},
});
}
}
continue;
}
const shouldRetry = tracksLocalChild && !!run.processPid && (run.processLossRetryCount ?? 0) < 1;
const baseMessage = run.processPid
? `Process lost -- child pid ${run.processPid} is no longer running`
: "Process lost -- server may have restarted";
let finalizedRun = await setRunStatus(run.id, "failed", {
error: shouldRetry ? `${baseMessage}; retrying once` : baseMessage,
errorCode: "process_lost",
finishedAt: now,
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: now,
error: "Process lost -- server may have restarted",
error: shouldRetry ? `${baseMessage}; retrying once` : baseMessage,
});
const updatedRun = await getRun(run.id);
if (updatedRun) {
await appendRunEvent(updatedRun, 1, {
eventType: "lifecycle",
stream: "system",
level: "error",
message: "Process lost -- server may have restarted",
});
await releaseIssueExecutionAndPromote(updatedRun);
if (!finalizedRun) finalizedRun = await getRun(run.id);
if (!finalizedRun) continue;
let retriedRun: typeof heartbeatRuns.$inferSelect | null = null;
if (shouldRetry) {
const agent = await getAgent(run.agentId);
if (agent) {
retriedRun = await enqueueProcessLossRetry(finalizedRun, agent, now);
}
} else {
await releaseIssueExecutionAndPromote(finalizedRun);
}
await appendRunEvent(finalizedRun, await nextRunEventSeq(finalizedRun.id), {
eventType: "lifecycle",
stream: "system",
level: "error",
message: shouldRetry
? `${baseMessage}; queued retry ${retriedRun?.id ?? ""}`.trim()
: baseMessage,
payload: {
...(run.processPid ? { processPid: run.processPid } : {}),
...(retriedRun ? { retryRunId: retriedRun.id } : {}),
},
});
await finalizeAgentStatus(run.agentId, "failed");
await startNextQueuedRunForAgent(run.agentId);
runningProcesses.delete(run.id);
@@ -2152,6 +2372,9 @@ export function heartbeatService(db: Db) {
context,
onLog,
onMeta: onAdapterMeta,
onSpawn: async (meta) => {
await persistRunProcessMetadata(run.id, meta);
},
authToken: authToken ?? undefined,
});
const adapterManagedRuntimeServices = adapterResult.runtimeServices
@@ -3403,6 +3626,8 @@ export function heartbeatService(db: Db) {
wakeup: enqueueWakeup,
reportRunActivity: clearDetachedRunWarning,
reapOrphanedRuns,
resumeQueuedRuns,