Server: migration prompts, structured logging, heartbeat reaping, and issue-run tracking

Replace auto-migrate-if-empty with interactive migration flow that inspects
pending migrations and prompts before applying. Add pino-pretty for structured
console + file logging. Add reapOrphanedRuns to clean up stuck heartbeat runs
on startup and periodically. Track runId through auth middleware, activity logs,
and all mutation routes. Add issue-run cross-reference queries, live-run and
active-run endpoints for issues, issue identifier lookup, reopen-via-comment
flow, and done/cancelled -> todo status transitions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Forgotten
2026-02-19 09:09:40 -06:00
parent 21b7bc8da0
commit a90063415e
14 changed files with 605 additions and 67 deletions

View File

@@ -10,6 +10,7 @@ export interface LogActivityInput {
entityType: string;
entityId: string;
agentId?: string | null;
runId?: string | null;
details?: Record<string, unknown> | null;
}
@@ -22,6 +23,7 @@ export async function logActivity(db: Db, input: LogActivityInput) {
entityType: input.entityType,
entityId: input.entityId,
agentId: input.agentId ?? null,
runId: input.runId ?? null,
details: input.details ?? null,
});
@@ -35,6 +37,7 @@ export async function logActivity(db: Db, input: LogActivityInput) {
entityType: input.entityType,
entityId: input.entityId,
agentId: input.agentId ?? null,
runId: input.runId ?? null,
details: input.details ?? null,
},
});

View File

@@ -1,6 +1,6 @@
import { and, desc, eq } from "drizzle-orm";
import { and, desc, eq, isNotNull, sql } from "drizzle-orm";
import type { Db } from "@paperclip/db";
import { activityLog } from "@paperclip/db";
import { activityLog, heartbeatRuns, issues } from "@paperclip/db";
export interface ActivityFilters {
companyId: string;
@@ -10,6 +10,7 @@ export interface ActivityFilters {
}
export function activityService(db: Db) {
const issueIdAsText = sql<string>`${issues.id}::text`;
return {
list: (filters: ActivityFilters) => {
const conditions = [eq(activityLog.companyId, filters.companyId)];
@@ -27,6 +28,58 @@ export function activityService(db: Db) {
return db.select().from(activityLog).where(and(...conditions)).orderBy(desc(activityLog.createdAt));
},
forIssue: (issueId: string) =>
db
.select()
.from(activityLog)
.where(
and(
eq(activityLog.entityType, "issue"),
eq(activityLog.entityId, issueId),
),
)
.orderBy(desc(activityLog.createdAt)),
runsForIssue: (issueId: string) =>
db
.selectDistinctOn([activityLog.runId], {
runId: activityLog.runId,
status: heartbeatRuns.status,
agentId: heartbeatRuns.agentId,
startedAt: heartbeatRuns.startedAt,
finishedAt: heartbeatRuns.finishedAt,
createdAt: heartbeatRuns.createdAt,
invocationSource: heartbeatRuns.invocationSource,
})
.from(activityLog)
.innerJoin(heartbeatRuns, eq(activityLog.runId, heartbeatRuns.id))
.where(
and(
eq(activityLog.entityType, "issue"),
eq(activityLog.entityId, issueId),
isNotNull(activityLog.runId),
),
)
.orderBy(activityLog.runId, desc(heartbeatRuns.createdAt)),
issuesForRun: (runId: string) =>
db
.selectDistinctOn([issueIdAsText], {
issueId: issues.id,
title: issues.title,
status: issues.status,
priority: issues.priority,
})
.from(activityLog)
.innerJoin(issues, eq(activityLog.entityId, issueIdAsText))
.where(
and(
eq(activityLog.runId, runId),
eq(activityLog.entityType, "issue"),
),
)
.orderBy(issueIdAsText),
create: (data: typeof activityLog.$inferInsert) =>
db
.insert(activityLog)

View File

@@ -34,6 +34,10 @@ interface WakeupOptions {
contextSnapshot?: Record<string, unknown>;
}
function readNonEmptyString(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value : null;
}
export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore();
@@ -216,6 +220,56 @@ export function heartbeatService(db: Db) {
}
}
async function reapOrphanedRuns(opts?: { staleThresholdMs?: number }) {
const staleThresholdMs = opts?.staleThresholdMs ?? 0;
const now = new Date();
// Find all runs in "queued" or "running" state
const activeRuns = await db
.select()
.from(heartbeatRuns)
.where(inArray(heartbeatRuns.status, ["queued", "running"]));
const reaped: string[] = [];
for (const run of activeRuns) {
if (runningProcesses.has(run.id)) continue;
// Apply staleness threshold to avoid false positives
if (staleThresholdMs > 0) {
const refTime = run.updatedAt ? new Date(run.updatedAt).getTime() : 0;
if (now.getTime() - refTime < staleThresholdMs) continue;
}
await setRunStatus(run.id, "failed", {
error: "Process lost -- server may have restarted",
errorCode: "process_lost",
finishedAt: now,
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: now,
error: "Process lost -- server may have restarted",
});
const updatedRun = await getRun(run.id);
if (updatedRun) {
await appendRunEvent(updatedRun, 1, {
eventType: "lifecycle",
stream: "system",
level: "error",
message: "Process lost -- server may have restarted",
});
}
await finalizeAgentStatus(run.agentId, "failed");
runningProcesses.delete(run.id);
reaped.push(run.id);
}
if (reaped.length > 0) {
logger.warn({ reapedCount: reaped.length, runIds: reaped }, "reaped orphaned heartbeat runs");
}
return { reaped: reaped.length, runIds: reaped };
}
async function updateRuntimeState(
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
@@ -543,7 +597,26 @@ export function heartbeatService(db: Db) {
async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) {
const source = opts.source ?? "on_demand";
const triggerDetail = opts.triggerDetail ?? null;
const contextSnapshot = opts.contextSnapshot ?? {};
const contextSnapshot: Record<string, unknown> = { ...(opts.contextSnapshot ?? {}) };
const reason = opts.reason ?? null;
const payload = opts.payload ?? null;
const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]);
if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) {
contextSnapshot.wakeReason = reason;
}
if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) {
contextSnapshot.issueId = issueIdFromPayload;
}
if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) {
contextSnapshot.taskId = issueIdFromPayload;
}
if (!readNonEmptyString(contextSnapshot["wakeSource"])) {
contextSnapshot.wakeSource = source;
}
if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) {
contextSnapshot.wakeTriggerDetail = triggerDetail;
}
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
@@ -560,7 +633,7 @@ export function heartbeatService(db: Db) {
source,
triggerDetail,
reason,
payload: opts.payload ?? null,
payload,
status: "skipped",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
@@ -591,8 +664,8 @@ export function heartbeatService(db: Db) {
agentId,
source,
triggerDetail,
reason: opts.reason ?? null,
payload: opts.payload ?? null,
reason,
payload,
status: "coalesced",
coalescedCount: 1,
requestedByActorType: opts.requestedByActorType ?? null,
@@ -611,8 +684,8 @@ export function heartbeatService(db: Db) {
agentId,
source,
triggerDetail,
reason: opts.reason ?? null,
payload: opts.payload ?? null,
reason,
payload,
status: "queued",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
@@ -757,6 +830,8 @@ export function heartbeatService(db: Db) {
wakeup: enqueueWakeup,
reapOrphanedRuns,
tickTimers: async (now = new Date()) => {
const allAgents = await db.select().from(agents);
let checked = 0;
@@ -860,5 +935,20 @@ export function heartbeatService(db: Db) {
return runs.length;
},
getActiveRunForAgent: async (agentId: string) => {
const [run] = await db
.select()
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.agentId, agentId),
eq(heartbeatRuns.status, "running"),
),
)
.orderBy(desc(heartbeatRuns.startedAt))
.limit(1);
return run ?? null;
},
};
}

View File

@@ -1,6 +1,6 @@
import { and, asc, desc, eq, inArray, isNull, or, sql } from "drizzle-orm";
import type { Db } from "@paperclip/db";
import { agents, issues, issueComments } from "@paperclip/db";
import { agents, companies, issues, issueComments } from "@paperclip/db";
import { conflict, notFound, unprocessable } from "../errors.js";
const ISSUE_TRANSITIONS: Record<string, string[]> = {
@@ -9,8 +9,8 @@ const ISSUE_TRANSITIONS: Record<string, string[]> = {
in_progress: ["in_review", "blocked", "done", "cancelled"],
in_review: ["in_progress", "done", "cancelled"],
blocked: ["todo", "in_progress", "cancelled"],
done: [],
cancelled: [],
done: ["todo"],
cancelled: ["todo"],
};
function assertTransition(from: string, to: string) {
@@ -69,23 +69,38 @@ export function issueService(db: Db) {
.where(eq(issues.id, id))
.then((rows) => rows[0] ?? null),
create: (companyId: string, data: Omit<typeof issues.$inferInsert, "companyId">) => {
const values = { ...data, companyId } as typeof issues.$inferInsert;
if (values.status === "in_progress" && !values.startedAt) {
values.startedAt = new Date();
}
if (values.status === "done") {
values.completedAt = new Date();
}
if (values.status === "cancelled") {
values.cancelledAt = new Date();
}
getByIdentifier: (identifier: string) =>
db
.select()
.from(issues)
.where(eq(issues.identifier, identifier.toUpperCase()))
.then((rows) => rows[0] ?? null),
return db
.insert(issues)
.values(values)
.returning()
.then((rows) => rows[0]);
create: async (companyId: string, data: Omit<typeof issues.$inferInsert, "companyId">) => {
return db.transaction(async (tx) => {
const [company] = await tx
.update(companies)
.set({ issueCounter: sql`${companies.issueCounter} + 1` })
.where(eq(companies.id, companyId))
.returning({ issueCounter: companies.issueCounter, issuePrefix: companies.issuePrefix });
const issueNumber = company.issueCounter;
const identifier = `${company.issuePrefix}-${issueNumber}`;
const values = { ...data, companyId, issueNumber, identifier } as typeof issues.$inferInsert;
if (values.status === "in_progress" && !values.startedAt) {
values.startedAt = new Date();
}
if (values.status === "done") {
values.completedAt = new Date();
}
if (values.status === "cancelled") {
values.cancelledAt = new Date();
}
const [issue] = await tx.insert(issues).values(values).returning();
return issue;
});
},
update: async (id: string, data: Partial<typeof issues.$inferInsert>) => {
@@ -110,6 +125,12 @@ export function issueService(db: Db) {
}
applyStatusSideEffects(data.status, patch);
if (data.status && data.status !== "done") {
patch.completedAt = null;
}
if (data.status && data.status !== "cancelled") {
patch.cancelledAt = null;
}
return db
.update(issues)