diff --git a/server/src/routes/activity.ts b/server/src/routes/activity.ts index 1ff1f6d9..087aea56 100644 --- a/server/src/routes/activity.ts +++ b/server/src/routes/activity.ts @@ -82,7 +82,7 @@ export function activityRoutes(db: Db) { return; } assertCompanyAccess(req, issue.companyId); - const result = await svc.runsForIssue(id); + const result = await svc.runsForIssue(issue.companyId, id); res.json(result); }); diff --git a/server/src/routes/issues.ts b/server/src/routes/issues.ts index 6e99db11..5d0b9fd4 100644 --- a/server/src/routes/issues.ts +++ b/server/src/routes/issues.ts @@ -89,7 +89,7 @@ export function issueRoutes(db: Db, storage: StorageService) { async function assertAgentRunCheckoutOwnership( req: Request, res: Response, - issue: { id: string; status: string; assigneeAgentId: string | null }, + issue: { id: string; companyId: string; status: string; assigneeAgentId: string | null }, ) { if (req.actor.type !== "agent") return true; const actorAgentId = req.actor.agentId; @@ -102,7 +102,25 @@ export function issueRoutes(db: Db, storage: StorageService) { } const runId = requireAgentRunId(req, res); if (!runId) return false; - await svc.assertCheckoutOwner(issue.id, actorAgentId, runId); + const ownership = await svc.assertCheckoutOwner(issue.id, actorAgentId, runId); + if (ownership.adoptedFromRunId) { + const actor = getActorInfo(req); + await logActivity(db, { + companyId: issue.companyId, + actorType: actor.actorType, + actorId: actor.actorId, + agentId: actor.agentId, + runId: actor.runId, + action: "issue.checkout_lock_adopted", + entityType: "issue", + entityId: issue.id, + details: { + previousCheckoutRunId: ownership.adoptedFromRunId, + checkoutRunId: runId, + reason: "stale_checkout_run", + }, + }); + } return true; } @@ -239,7 +257,7 @@ export function issueRoutes(db: Db, storage: StorageService) { action: "issue.created", entityType: "issue", entityId: issue.id, - details: { title: issue.title }, + details: { title: issue.title, identifier: issue.identifier }, }); if (issue.assigneeAgentId) { @@ -297,7 +315,7 @@ export function issueRoutes(db: Db, storage: StorageService) { action: "issue.updated", entityType: "issue", entityId: issue.id, - details: { ...updateFields, _previous: Object.keys(previous).length > 0 ? previous : undefined }, + details: { ...updateFields, identifier: issue.identifier, _previous: Object.keys(previous).length > 0 ? previous : undefined }, }); let comment = null; @@ -477,6 +495,7 @@ export function issueRoutes(db: Db, storage: StorageService) { return; } assertCompanyAccess(req, existing.companyId); + if (!(await assertAgentRunCheckoutOwnership(req, res, existing))) return; const actorRunId = requireAgentRunId(req, res); if (req.actor.type === "agent" && !actorRunId) return; @@ -558,6 +577,7 @@ export function issueRoutes(db: Db, storage: StorageService) { reopened: true, reopenedFrom: reopenFromStatus, source: "comment", + identifier: currentIssue.identifier, }, }); } diff --git a/server/src/services/activity.ts b/server/src/services/activity.ts index ef34a07d..8bf235a0 100644 --- a/server/src/services/activity.ts +++ b/server/src/services/activity.ts @@ -1,4 +1,4 @@ -import { and, desc, eq, isNotNull, isNull, or, sql } from "drizzle-orm"; +import { and, desc, eq, isNull, or, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; import { activityLog, heartbeatRuns, issues } from "@paperclip/db"; @@ -60,10 +60,10 @@ export function activityService(db: Db) { ) .orderBy(desc(activityLog.createdAt)), - runsForIssue: (issueId: string) => + runsForIssue: (companyId: string, issueId: string) => db - .selectDistinctOn([activityLog.runId], { - runId: activityLog.runId, + .select({ + runId: heartbeatRuns.id, status: heartbeatRuns.status, agentId: heartbeatRuns.agentId, startedAt: heartbeatRuns.startedAt, @@ -73,19 +73,37 @@ export function activityService(db: Db) { usageJson: heartbeatRuns.usageJson, resultJson: heartbeatRuns.resultJson, }) - .from(activityLog) - .innerJoin(heartbeatRuns, eq(activityLog.runId, heartbeatRuns.id)) + .from(heartbeatRuns) .where( and( - eq(activityLog.entityType, "issue"), - eq(activityLog.entityId, issueId), - isNotNull(activityLog.runId), + eq(heartbeatRuns.companyId, companyId), + or( + sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issueId}`, + sql`exists ( + select 1 + from ${activityLog} + where ${activityLog.companyId} = ${companyId} + and ${activityLog.entityType} = 'issue' + and ${activityLog.entityId} = ${issueId} + and ${activityLog.runId} = ${heartbeatRuns.id} + )`, + ), ), ) - .orderBy(activityLog.runId, desc(heartbeatRuns.createdAt)), + .orderBy(desc(heartbeatRuns.createdAt)), - issuesForRun: (runId: string) => - db + issuesForRun: async (runId: string) => { + const run = await db + .select({ + companyId: heartbeatRuns.companyId, + contextSnapshot: heartbeatRuns.contextSnapshot, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, runId)) + .then((rows) => rows[0] ?? null); + if (!run) return []; + + const fromActivity = await db .selectDistinctOn([issueIdAsText], { issueId: issues.id, identifier: issues.identifier, @@ -97,12 +115,43 @@ export function activityService(db: Db) { .innerJoin(issues, eq(activityLog.entityId, issueIdAsText)) .where( and( + eq(activityLog.companyId, run.companyId), eq(activityLog.runId, runId), eq(activityLog.entityType, "issue"), isNull(issues.hiddenAt), ), ) - .orderBy(issueIdAsText), + .orderBy(issueIdAsText); + + const context = run.contextSnapshot; + const contextIssueId = + context && typeof context === "object" && typeof (context as Record).issueId === "string" + ? ((context as Record).issueId as string) + : null; + if (!contextIssueId) return fromActivity; + if (fromActivity.some((issue) => issue.issueId === contextIssueId)) return fromActivity; + + const fromContext = await db + .select({ + issueId: issues.id, + identifier: issues.identifier, + title: issues.title, + status: issues.status, + priority: issues.priority, + }) + .from(issues) + .where( + and( + eq(issues.companyId, run.companyId), + eq(issues.id, contextIssueId), + isNull(issues.hiddenAt), + ), + ) + .then((rows) => rows[0] ?? null); + + if (!fromContext) return fromActivity; + return [fromContext, ...fromActivity]; + }, create: (data: typeof activityLog.$inferInsert) => db diff --git a/server/src/services/issues.ts b/server/src/services/issues.ts index 47bdd9cb..be7a0f5e 100644 --- a/server/src/services/issues.ts +++ b/server/src/services/issues.ts @@ -5,6 +5,7 @@ import { assets, companies, goals, + heartbeatRuns, issueAttachments, issueComments, issues, @@ -50,6 +51,8 @@ function sameRunLock(checkoutRunId: string | null, actorRunId: string | null) { return checkoutRunId == null; } +const TERMINAL_HEARTBEAT_RUN_STATUSES = new Set(["succeeded", "failed", "cancelled", "timed_out"]); + export function issueService(db: Db) { async function assertAssignableAgent(companyId: string, agentId: string) { const assignee = await db @@ -74,6 +77,54 @@ export function issueService(db: Db) { } } + async function isTerminalOrMissingHeartbeatRun(runId: string) { + const run = await db + .select({ status: heartbeatRuns.status }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, runId)) + .then((rows) => rows[0] ?? null); + if (!run) return true; + return TERMINAL_HEARTBEAT_RUN_STATUSES.has(run.status); + } + + async function adoptStaleCheckoutRun(input: { + issueId: string; + actorAgentId: string; + actorRunId: string; + expectedCheckoutRunId: string; + }) { + const stale = await isTerminalOrMissingHeartbeatRun(input.expectedCheckoutRunId); + if (!stale) return null; + + const now = new Date(); + const adopted = await db + .update(issues) + .set({ + checkoutRunId: input.actorRunId, + executionRunId: input.actorRunId, + executionLockedAt: now, + updatedAt: now, + }) + .where( + and( + eq(issues.id, input.issueId), + eq(issues.status, "in_progress"), + eq(issues.assigneeAgentId, input.actorAgentId), + eq(issues.checkoutRunId, input.expectedCheckoutRunId), + ), + ) + .returning({ + id: issues.id, + status: issues.status, + assigneeAgentId: issues.assigneeAgentId, + checkoutRunId: issues.checkoutRunId, + executionRunId: issues.executionRunId, + }) + .then((rows) => rows[0] ?? null); + + return adopted; + } + return { list: async (companyId: string, filters?: IssueFilters) => { const conditions = [eq(issues.companyId, companyId)]; @@ -287,6 +338,22 @@ export function issueService(db: Db) { if (adopted) return adopted; } + if ( + checkoutRunId && + current.assigneeAgentId === agentId && + current.status === "in_progress" && + current.checkoutRunId && + current.checkoutRunId !== checkoutRunId + ) { + const adopted = await adoptStaleCheckoutRun({ + issueId: id, + actorAgentId: agentId, + actorRunId: checkoutRunId, + expectedCheckoutRunId: current.checkoutRunId, + }); + if (adopted) return db.select().from(issues).where(eq(issues.id, id)).then((rows) => rows[0]!); + } + // If this run already owns it and it's in_progress, return it (no self-409) if ( current.assigneeAgentId === agentId && @@ -324,7 +391,29 @@ export function issueService(db: Db) { current.assigneeAgentId === actorAgentId && sameRunLock(current.checkoutRunId, actorRunId) ) { - return current; + return { ...current, adoptedFromRunId: null as string | null }; + } + + if ( + actorRunId && + current.status === "in_progress" && + current.assigneeAgentId === actorAgentId && + current.checkoutRunId && + current.checkoutRunId !== actorRunId + ) { + const adopted = await adoptStaleCheckoutRun({ + issueId: id, + actorAgentId, + actorRunId, + expectedCheckoutRunId: current.checkoutRunId, + }); + + if (adopted) { + return { + ...adopted, + adoptedFromRunId: current.checkoutRunId, + }; + } } throw conflict("Issue run ownership conflict", {