Merge branch 'master' of github.com-dotta:paperclipai/paperclip
* 'master' of github.com-dotta:paperclipai/paperclip: fix: resolve type errors in process-lost-reaper PR fix(heartbeat): prevent false process_lost failures on queued and non-child-process runs Revert "Merge pull request #707 from paperclipai/nm/premerge-lockfile-refresh" fix: ensure embedded PostgreSQL databases use UTF-8 encoding
This commit is contained in:
@@ -38,9 +38,9 @@
|
||||
"@paperclipai/adapter-codex-local": "workspace:*",
|
||||
"@paperclipai/adapter-cursor-local": "workspace:*",
|
||||
"@paperclipai/adapter-gemini-local": "workspace:*",
|
||||
"@paperclipai/adapter-openclaw-gateway": "workspace:*",
|
||||
"@paperclipai/adapter-opencode-local": "workspace:*",
|
||||
"@paperclipai/adapter-pi-local": "workspace:*",
|
||||
"@paperclipai/adapter-openclaw-gateway": "workspace:*",
|
||||
"@paperclipai/adapter-utils": "workspace:*",
|
||||
"@paperclipai/db": "workspace:*",
|
||||
"@paperclipai/shared": "workspace:*",
|
||||
|
||||
@@ -53,6 +53,7 @@ type EmbeddedPostgresCtor = new (opts: {
|
||||
password: string;
|
||||
port: number;
|
||||
persistent: boolean;
|
||||
initdbFlags?: string[];
|
||||
onLog?: (message: unknown) => void;
|
||||
onError?: (message: unknown) => void;
|
||||
}) => EmbeddedPostgresInstance;
|
||||
@@ -334,6 +335,7 @@ export async function startServer(): Promise<StartedServer> {
|
||||
password: "paperclip",
|
||||
port,
|
||||
persistent: true,
|
||||
initdbFlags: ["--encoding=UTF8", "--locale=C"],
|
||||
onLog: appendEmbeddedPostgresLog,
|
||||
onError: appendEmbeddedPostgresLog,
|
||||
});
|
||||
@@ -512,11 +514,14 @@ export async function startServer(): Promise<StartedServer> {
|
||||
if (config.heartbeatSchedulerEnabled) {
|
||||
const heartbeat = heartbeatService(db as any);
|
||||
|
||||
// Reap orphaned runs at startup (no threshold -- runningProcesses is empty)
|
||||
void heartbeat.reapOrphanedRuns().catch((err) => {
|
||||
logger.error({ err }, "startup reap of orphaned heartbeat runs failed");
|
||||
});
|
||||
|
||||
// Reap orphaned running runs at startup while in-memory execution state is empty,
|
||||
// then resume any persisted queued runs that were waiting on the previous process.
|
||||
void heartbeat
|
||||
.reapOrphanedRuns()
|
||||
.then(() => heartbeat.resumeQueuedRuns())
|
||||
.catch((err) => {
|
||||
logger.error({ err }, "startup heartbeat recovery failed");
|
||||
});
|
||||
setInterval(() => {
|
||||
void heartbeat
|
||||
.tickTimers(new Date())
|
||||
@@ -529,11 +534,13 @@ export async function startServer(): Promise<StartedServer> {
|
||||
logger.error({ err }, "heartbeat timer tick failed");
|
||||
});
|
||||
|
||||
// Periodically reap orphaned runs (5-min staleness threshold)
|
||||
// Periodically reap orphaned runs (5-min staleness threshold) and make sure
|
||||
// persisted queued work is still being driven forward.
|
||||
void heartbeat
|
||||
.reapOrphanedRuns({ staleThresholdMs: 5 * 60 * 1000 })
|
||||
.then(() => heartbeat.resumeQueuedRuns())
|
||||
.catch((err) => {
|
||||
logger.error({ err }, "periodic reap of orphaned heartbeat runs failed");
|
||||
logger.error({ err }, "periodic heartbeat recovery failed");
|
||||
});
|
||||
}, config.heartbeatSchedulerIntervalMs);
|
||||
}
|
||||
|
||||
@@ -455,6 +455,7 @@ export function heartbeatService(db: Db) {
|
||||
const runLogStore = getRunLogStore();
|
||||
const secretsSvc = secretService(db);
|
||||
const issuesSvc = issueService(db);
|
||||
const activeRunExecutions = new Set<string>();
|
||||
|
||||
async function getAgent(agentId: string) {
|
||||
return db
|
||||
@@ -959,7 +960,7 @@ export function heartbeatService(db: Db) {
|
||||
const reaped: string[] = [];
|
||||
|
||||
for (const run of activeRuns) {
|
||||
if (runningProcesses.has(run.id)) continue;
|
||||
if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue;
|
||||
|
||||
// Apply staleness threshold to avoid false positives
|
||||
if (staleThresholdMs > 0) {
|
||||
@@ -998,6 +999,18 @@ export function heartbeatService(db: Db) {
|
||||
return { reaped: reaped.length, runIds: reaped };
|
||||
}
|
||||
|
||||
async function resumeQueuedRuns() {
|
||||
const queuedRuns = await db
|
||||
.select({ agentId: heartbeatRuns.agentId })
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.status, "queued"));
|
||||
|
||||
const agentIds = [...new Set(queuedRuns.map((r) => r.agentId))];
|
||||
for (const agentId of agentIds) {
|
||||
await startNextQueuedRunForAgent(agentId);
|
||||
}
|
||||
}
|
||||
|
||||
async function updateRuntimeState(
|
||||
agent: typeof agents.$inferSelect,
|
||||
run: typeof heartbeatRuns.$inferSelect,
|
||||
@@ -1089,6 +1102,9 @@ export function heartbeatService(db: Db) {
|
||||
run = claimed;
|
||||
}
|
||||
|
||||
activeRunExecutions.add(run.id);
|
||||
|
||||
try {
|
||||
const agent = await getAgent(run.agentId);
|
||||
if (!agent) {
|
||||
await setRunStatus(runId, "failed", {
|
||||
@@ -1676,10 +1692,41 @@ export function heartbeatService(db: Db) {
|
||||
}
|
||||
|
||||
await finalizeAgentStatus(agent.id, "failed");
|
||||
} finally {
|
||||
await releaseRuntimeServicesForRun(run.id);
|
||||
await startNextQueuedRunForAgent(agent.id);
|
||||
}
|
||||
} catch (outerErr) {
|
||||
// Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun).
|
||||
// The inner catch did not fire, so we must record the failure here.
|
||||
const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure";
|
||||
logger.error({ err: outerErr, runId }, "heartbeat execution setup failed");
|
||||
await setRunStatus(runId, "failed", {
|
||||
error: message,
|
||||
errorCode: "adapter_failed",
|
||||
finishedAt: new Date(),
|
||||
}).catch(() => undefined);
|
||||
await setWakeupStatus(run.wakeupRequestId, "failed", {
|
||||
finishedAt: new Date(),
|
||||
error: message,
|
||||
}).catch(() => undefined);
|
||||
const failedRun = await getRun(runId).catch(() => null);
|
||||
if (failedRun) {
|
||||
// Emit a run-log event so the failure is visible in the run timeline,
|
||||
// consistent with what the inner catch block does for adapter failures.
|
||||
await appendRunEvent(failedRun, 1, {
|
||||
eventType: "error",
|
||||
stream: "system",
|
||||
level: "error",
|
||||
message,
|
||||
}).catch(() => undefined);
|
||||
await releaseIssueExecutionAndPromote(failedRun).catch(() => undefined);
|
||||
}
|
||||
// Ensure the agent is not left stuck in "running" if the inner catch handler's
|
||||
// DB calls threw (e.g. a transient DB error in finalizeAgentStatus).
|
||||
await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined);
|
||||
} finally {
|
||||
await releaseRuntimeServicesForRun(run.id).catch(() => undefined);
|
||||
activeRunExecutions.delete(run.id);
|
||||
await startNextQueuedRunForAgent(run.agentId);
|
||||
}
|
||||
}
|
||||
|
||||
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
|
||||
@@ -2425,6 +2472,8 @@ export function heartbeatService(db: Db) {
|
||||
|
||||
reapOrphanedRuns,
|
||||
|
||||
resumeQueuedRuns,
|
||||
|
||||
tickTimers: async (now = new Date()) => {
|
||||
const allAgents = await db.select().from(agents);
|
||||
let checked = 0;
|
||||
|
||||
Reference in New Issue
Block a user