import { and, asc, desc, eq, inArray, isNull, or, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; import { agents, assets, companies, companyMemberships, goals, heartbeatRuns, issueAttachments, issueComments, issues, projects, } from "@paperclip/db"; import { conflict, notFound, unprocessable } from "../errors.js"; const ALL_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked", "done", "cancelled"]; function assertTransition(from: string, to: string) { if (from === to) return; if (!ALL_ISSUE_STATUSES.includes(to)) { throw conflict(`Unknown issue status: ${to}`); } } function applyStatusSideEffects( status: string | undefined, patch: Partial, ): Partial { if (!status) return patch; if (status === "in_progress" && !patch.startedAt) { patch.startedAt = new Date(); } if (status === "done") { patch.completedAt = new Date(); } if (status === "cancelled") { patch.cancelledAt = new Date(); } return patch; } export interface IssueFilters { status?: string; assigneeAgentId?: string; projectId?: string; } function sameRunLock(checkoutRunId: string | null, actorRunId: string | null) { if (actorRunId) return checkoutRunId === actorRunId; return checkoutRunId == null; } const TERMINAL_HEARTBEAT_RUN_STATUSES = new Set(["succeeded", "failed", "cancelled", "timed_out"]); export function issueService(db: Db) { async function assertAssignableAgent(companyId: string, agentId: string) { const assignee = await db .select({ id: agents.id, companyId: agents.companyId, status: agents.status, }) .from(agents) .where(eq(agents.id, agentId)) .then((rows) => rows[0] ?? null); if (!assignee) throw notFound("Assignee agent not found"); if (assignee.companyId !== companyId) { throw unprocessable("Assignee must belong to same company"); } if (assignee.status === "pending_approval") { throw conflict("Cannot assign work to pending approval agents"); } if (assignee.status === "terminated") { throw conflict("Cannot assign work to terminated agents"); } } async function assertAssignableUser(companyId: string, userId: string) { const membership = await db .select({ id: companyMemberships.id }) .from(companyMemberships) .where( and( eq(companyMemberships.companyId, companyId), eq(companyMemberships.principalType, "user"), eq(companyMemberships.principalId, userId), eq(companyMemberships.status, "active"), ), ) .then((rows) => rows[0] ?? null); if (!membership) { throw notFound("Assignee user not found"); } } async function isTerminalOrMissingHeartbeatRun(runId: string) { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); if (!run) return true; return TERMINAL_HEARTBEAT_RUN_STATUSES.has(run.status); } async function adoptStaleCheckoutRun(input: { issueId: string; actorAgentId: string; actorRunId: string; expectedCheckoutRunId: string; }) { const stale = await isTerminalOrMissingHeartbeatRun(input.expectedCheckoutRunId); if (!stale) return null; const now = new Date(); const adopted = await db .update(issues) .set({ checkoutRunId: input.actorRunId, executionRunId: input.actorRunId, executionLockedAt: now, updatedAt: now, }) .where( and( eq(issues.id, input.issueId), eq(issues.status, "in_progress"), eq(issues.assigneeAgentId, input.actorAgentId), eq(issues.checkoutRunId, input.expectedCheckoutRunId), ), ) .returning({ id: issues.id, status: issues.status, assigneeAgentId: issues.assigneeAgentId, checkoutRunId: issues.checkoutRunId, executionRunId: issues.executionRunId, }) .then((rows) => rows[0] ?? null); return adopted; } return { list: async (companyId: string, filters?: IssueFilters) => { const conditions = [eq(issues.companyId, companyId)]; if (filters?.status) { const statuses = filters.status.split(",").map((s) => s.trim()); conditions.push(statuses.length === 1 ? eq(issues.status, statuses[0]) : inArray(issues.status, statuses)); } if (filters?.assigneeAgentId) { conditions.push(eq(issues.assigneeAgentId, filters.assigneeAgentId)); } if (filters?.projectId) conditions.push(eq(issues.projectId, filters.projectId)); conditions.push(isNull(issues.hiddenAt)); const priorityOrder = sql`CASE ${issues.priority} WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END`; return db.select().from(issues).where(and(...conditions)).orderBy(asc(priorityOrder), desc(issues.updatedAt)); }, getById: (id: string) => db .select() .from(issues) .where(eq(issues.id, id)) .then((rows) => rows[0] ?? null), getByIdentifier: (identifier: string) => db .select() .from(issues) .where(eq(issues.identifier, identifier.toUpperCase())) .then((rows) => rows[0] ?? null), create: async (companyId: string, data: Omit) => { if (data.assigneeAgentId && data.assigneeUserId) { throw unprocessable("Issue can only have one assignee"); } if (data.assigneeAgentId) { await assertAssignableAgent(companyId, data.assigneeAgentId); } if (data.assigneeUserId) { await assertAssignableUser(companyId, data.assigneeUserId); } if (data.status === "in_progress" && !data.assigneeAgentId && !data.assigneeUserId) { throw unprocessable("in_progress issues require an assignee"); } return db.transaction(async (tx) => { const [company] = await tx .update(companies) .set({ issueCounter: sql`${companies.issueCounter} + 1` }) .where(eq(companies.id, companyId)) .returning({ issueCounter: companies.issueCounter, issuePrefix: companies.issuePrefix }); const issueNumber = company.issueCounter; const identifier = `${company.issuePrefix}-${issueNumber}`; const values = { ...data, companyId, issueNumber, identifier } as typeof issues.$inferInsert; if (values.status === "in_progress" && !values.startedAt) { values.startedAt = new Date(); } if (values.status === "done") { values.completedAt = new Date(); } if (values.status === "cancelled") { values.cancelledAt = new Date(); } const [issue] = await tx.insert(issues).values(values).returning(); return issue; }); }, update: async (id: string, data: Partial) => { const existing = await db .select() .from(issues) .where(eq(issues.id, id)) .then((rows) => rows[0] ?? null); if (!existing) return null; if (data.status) { assertTransition(existing.status, data.status); } const patch: Partial = { ...data, updatedAt: new Date(), }; const nextAssigneeAgentId = data.assigneeAgentId !== undefined ? data.assigneeAgentId : existing.assigneeAgentId; const nextAssigneeUserId = data.assigneeUserId !== undefined ? data.assigneeUserId : existing.assigneeUserId; if (nextAssigneeAgentId && nextAssigneeUserId) { throw unprocessable("Issue can only have one assignee"); } if (patch.status === "in_progress" && !nextAssigneeAgentId && !nextAssigneeUserId) { throw unprocessable("in_progress issues require an assignee"); } if (data.assigneeAgentId) { await assertAssignableAgent(existing.companyId, data.assigneeAgentId); } if (data.assigneeUserId) { await assertAssignableUser(existing.companyId, data.assigneeUserId); } applyStatusSideEffects(data.status, patch); if (data.status && data.status !== "done") { patch.completedAt = null; } if (data.status && data.status !== "cancelled") { patch.cancelledAt = null; } if (data.status && data.status !== "in_progress") { patch.checkoutRunId = null; } if ( (data.assigneeAgentId !== undefined && data.assigneeAgentId !== existing.assigneeAgentId) || (data.assigneeUserId !== undefined && data.assigneeUserId !== existing.assigneeUserId) ) { patch.checkoutRunId = null; } return db .update(issues) .set(patch) .where(eq(issues.id, id)) .returning() .then((rows) => rows[0] ?? null); }, remove: (id: string) => db.transaction(async (tx) => { const attachmentAssetIds = await tx .select({ assetId: issueAttachments.assetId }) .from(issueAttachments) .where(eq(issueAttachments.issueId, id)); const removedIssue = await tx .delete(issues) .where(eq(issues.id, id)) .returning() .then((rows) => rows[0] ?? null); if (removedIssue && attachmentAssetIds.length > 0) { await tx .delete(assets) .where(inArray(assets.id, attachmentAssetIds.map((row) => row.assetId))); } return removedIssue; }), checkout: async (id: string, agentId: string, expectedStatuses: string[], checkoutRunId: string | null) => { const issueCompany = await db .select({ companyId: issues.companyId }) .from(issues) .where(eq(issues.id, id)) .then((rows) => rows[0] ?? null); if (!issueCompany) throw notFound("Issue not found"); await assertAssignableAgent(issueCompany.companyId, agentId); const now = new Date(); const sameRunAssigneeCondition = checkoutRunId ? and( eq(issues.assigneeAgentId, agentId), or(isNull(issues.checkoutRunId), eq(issues.checkoutRunId, checkoutRunId)), ) : and(eq(issues.assigneeAgentId, agentId), isNull(issues.checkoutRunId)); const executionLockCondition = checkoutRunId ? or(isNull(issues.executionRunId), eq(issues.executionRunId, checkoutRunId)) : isNull(issues.executionRunId); const updated = await db .update(issues) .set({ assigneeAgentId: agentId, assigneeUserId: null, checkoutRunId, executionRunId: checkoutRunId, status: "in_progress", startedAt: now, updatedAt: now, }) .where( and( eq(issues.id, id), inArray(issues.status, expectedStatuses), or(isNull(issues.assigneeAgentId), sameRunAssigneeCondition), executionLockCondition, ), ) .returning() .then((rows) => rows[0] ?? null); if (updated) return updated; const current = await db .select({ id: issues.id, status: issues.status, assigneeAgentId: issues.assigneeAgentId, checkoutRunId: issues.checkoutRunId, executionRunId: issues.executionRunId, }) .from(issues) .where(eq(issues.id, id)) .then((rows) => rows[0] ?? null); if (!current) throw notFound("Issue not found"); if ( current.assigneeAgentId === agentId && current.status === "in_progress" && current.checkoutRunId == null && (current.executionRunId == null || current.executionRunId === checkoutRunId) && checkoutRunId ) { const adopted = await db .update(issues) .set({ checkoutRunId, executionRunId: checkoutRunId, updatedAt: new Date(), }) .where( and( eq(issues.id, id), eq(issues.status, "in_progress"), eq(issues.assigneeAgentId, agentId), isNull(issues.checkoutRunId), or(isNull(issues.executionRunId), eq(issues.executionRunId, checkoutRunId)), ), ) .returning() .then((rows) => rows[0] ?? null); if (adopted) return adopted; } if ( checkoutRunId && current.assigneeAgentId === agentId && current.status === "in_progress" && current.checkoutRunId && current.checkoutRunId !== checkoutRunId ) { const adopted = await adoptStaleCheckoutRun({ issueId: id, actorAgentId: agentId, actorRunId: checkoutRunId, expectedCheckoutRunId: current.checkoutRunId, }); if (adopted) return db.select().from(issues).where(eq(issues.id, id)).then((rows) => rows[0]!); } // If this run already owns it and it's in_progress, return it (no self-409) if ( current.assigneeAgentId === agentId && current.status === "in_progress" && sameRunLock(current.checkoutRunId, checkoutRunId) ) { return db.select().from(issues).where(eq(issues.id, id)).then((rows) => rows[0]!); } throw conflict("Issue checkout conflict", { issueId: current.id, status: current.status, assigneeAgentId: current.assigneeAgentId, checkoutRunId: current.checkoutRunId, executionRunId: current.executionRunId, }); }, assertCheckoutOwner: async (id: string, actorAgentId: string, actorRunId: string | null) => { const current = await db .select({ id: issues.id, status: issues.status, assigneeAgentId: issues.assigneeAgentId, checkoutRunId: issues.checkoutRunId, }) .from(issues) .where(eq(issues.id, id)) .then((rows) => rows[0] ?? null); if (!current) throw notFound("Issue not found"); if ( current.status === "in_progress" && current.assigneeAgentId === actorAgentId && sameRunLock(current.checkoutRunId, actorRunId) ) { return { ...current, adoptedFromRunId: null as string | null }; } if ( actorRunId && current.status === "in_progress" && current.assigneeAgentId === actorAgentId && current.checkoutRunId && current.checkoutRunId !== actorRunId ) { const adopted = await adoptStaleCheckoutRun({ issueId: id, actorAgentId, actorRunId, expectedCheckoutRunId: current.checkoutRunId, }); if (adopted) { return { ...adopted, adoptedFromRunId: current.checkoutRunId, }; } } throw conflict("Issue run ownership conflict", { issueId: current.id, status: current.status, assigneeAgentId: current.assigneeAgentId, checkoutRunId: current.checkoutRunId, actorAgentId, actorRunId, }); }, release: async (id: string, actorAgentId?: string, actorRunId?: string | null) => { const existing = await db .select() .from(issues) .where(eq(issues.id, id)) .then((rows) => rows[0] ?? null); if (!existing) return null; if (actorAgentId && existing.assigneeAgentId && existing.assigneeAgentId !== actorAgentId) { throw conflict("Only assignee can release issue"); } if ( actorAgentId && existing.status === "in_progress" && existing.assigneeAgentId === actorAgentId && existing.checkoutRunId && !sameRunLock(existing.checkoutRunId, actorRunId ?? null) ) { throw conflict("Only checkout run can release issue", { issueId: existing.id, assigneeAgentId: existing.assigneeAgentId, checkoutRunId: existing.checkoutRunId, actorRunId: actorRunId ?? null, }); } return db .update(issues) .set({ status: "todo", assigneeAgentId: null, checkoutRunId: null, updatedAt: new Date(), }) .where(eq(issues.id, id)) .returning() .then((rows) => rows[0] ?? null); }, listComments: (issueId: string) => db .select() .from(issueComments) .where(eq(issueComments.issueId, issueId)) .orderBy(desc(issueComments.createdAt)), addComment: async (issueId: string, body: string, actor: { agentId?: string; userId?: string }) => { const issue = await db .select({ companyId: issues.companyId }) .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null); if (!issue) throw notFound("Issue not found"); return db .insert(issueComments) .values({ companyId: issue.companyId, issueId, authorAgentId: actor.agentId ?? null, authorUserId: actor.userId ?? null, body, }) .returning() .then((rows) => rows[0]); }, createAttachment: async (input: { issueId: string; issueCommentId?: string | null; provider: string; objectKey: string; contentType: string; byteSize: number; sha256: string; originalFilename?: string | null; createdByAgentId?: string | null; createdByUserId?: string | null; }) => { const issue = await db .select({ id: issues.id, companyId: issues.companyId }) .from(issues) .where(eq(issues.id, input.issueId)) .then((rows) => rows[0] ?? null); if (!issue) throw notFound("Issue not found"); if (input.issueCommentId) { const comment = await db .select({ id: issueComments.id, companyId: issueComments.companyId, issueId: issueComments.issueId }) .from(issueComments) .where(eq(issueComments.id, input.issueCommentId)) .then((rows) => rows[0] ?? null); if (!comment) throw notFound("Issue comment not found"); if (comment.companyId !== issue.companyId || comment.issueId !== issue.id) { throw unprocessable("Attachment comment must belong to same issue and company"); } } return db.transaction(async (tx) => { const [asset] = await tx .insert(assets) .values({ companyId: issue.companyId, provider: input.provider, objectKey: input.objectKey, contentType: input.contentType, byteSize: input.byteSize, sha256: input.sha256, originalFilename: input.originalFilename ?? null, createdByAgentId: input.createdByAgentId ?? null, createdByUserId: input.createdByUserId ?? null, }) .returning(); const [attachment] = await tx .insert(issueAttachments) .values({ companyId: issue.companyId, issueId: issue.id, assetId: asset.id, issueCommentId: input.issueCommentId ?? null, }) .returning(); return { id: attachment.id, companyId: attachment.companyId, issueId: attachment.issueId, issueCommentId: attachment.issueCommentId, assetId: attachment.assetId, provider: asset.provider, objectKey: asset.objectKey, contentType: asset.contentType, byteSize: asset.byteSize, sha256: asset.sha256, originalFilename: asset.originalFilename, createdByAgentId: asset.createdByAgentId, createdByUserId: asset.createdByUserId, createdAt: attachment.createdAt, updatedAt: attachment.updatedAt, }; }); }, listAttachments: async (issueId: string) => db .select({ id: issueAttachments.id, companyId: issueAttachments.companyId, issueId: issueAttachments.issueId, issueCommentId: issueAttachments.issueCommentId, assetId: issueAttachments.assetId, provider: assets.provider, objectKey: assets.objectKey, contentType: assets.contentType, byteSize: assets.byteSize, sha256: assets.sha256, originalFilename: assets.originalFilename, createdByAgentId: assets.createdByAgentId, createdByUserId: assets.createdByUserId, createdAt: issueAttachments.createdAt, updatedAt: issueAttachments.updatedAt, }) .from(issueAttachments) .innerJoin(assets, eq(issueAttachments.assetId, assets.id)) .where(eq(issueAttachments.issueId, issueId)) .orderBy(desc(issueAttachments.createdAt)), getAttachmentById: async (id: string) => db .select({ id: issueAttachments.id, companyId: issueAttachments.companyId, issueId: issueAttachments.issueId, issueCommentId: issueAttachments.issueCommentId, assetId: issueAttachments.assetId, provider: assets.provider, objectKey: assets.objectKey, contentType: assets.contentType, byteSize: assets.byteSize, sha256: assets.sha256, originalFilename: assets.originalFilename, createdByAgentId: assets.createdByAgentId, createdByUserId: assets.createdByUserId, createdAt: issueAttachments.createdAt, updatedAt: issueAttachments.updatedAt, }) .from(issueAttachments) .innerJoin(assets, eq(issueAttachments.assetId, assets.id)) .where(eq(issueAttachments.id, id)) .then((rows) => rows[0] ?? null), removeAttachment: async (id: string) => db.transaction(async (tx) => { const existing = await tx .select({ id: issueAttachments.id, companyId: issueAttachments.companyId, issueId: issueAttachments.issueId, issueCommentId: issueAttachments.issueCommentId, assetId: issueAttachments.assetId, provider: assets.provider, objectKey: assets.objectKey, contentType: assets.contentType, byteSize: assets.byteSize, sha256: assets.sha256, originalFilename: assets.originalFilename, createdByAgentId: assets.createdByAgentId, createdByUserId: assets.createdByUserId, createdAt: issueAttachments.createdAt, updatedAt: issueAttachments.updatedAt, }) .from(issueAttachments) .innerJoin(assets, eq(issueAttachments.assetId, assets.id)) .where(eq(issueAttachments.id, id)) .then((rows) => rows[0] ?? null); if (!existing) return null; await tx.delete(issueAttachments).where(eq(issueAttachments.id, id)); await tx.delete(assets).where(eq(assets.id, existing.assetId)); return existing; }), findMentionedAgents: async (companyId: string, body: string) => { const re = /\B@([^\s@,!?.]+)/g; const tokens = new Set(); let m: RegExpExecArray | null; while ((m = re.exec(body)) !== null) tokens.add(m[1].toLowerCase()); if (tokens.size === 0) return []; const rows = await db.select({ id: agents.id, name: agents.name }) .from(agents).where(eq(agents.companyId, companyId)); return rows.filter(a => tokens.has(a.name.toLowerCase())).map(a => a.id); }, getAncestors: async (issueId: string) => { const raw: Array<{ id: string; identifier: string | null; title: string; description: string | null; status: string; priority: string; assigneeAgentId: string | null; projectId: string | null; goalId: string | null; }> = []; const visited = new Set([issueId]); const start = await db.select().from(issues).where(eq(issues.id, issueId)).then(r => r[0] ?? null); let currentId = start?.parentId ?? null; while (currentId && !visited.has(currentId) && raw.length < 50) { visited.add(currentId); const parent = await db.select({ id: issues.id, identifier: issues.identifier, title: issues.title, description: issues.description, status: issues.status, priority: issues.priority, assigneeAgentId: issues.assigneeAgentId, projectId: issues.projectId, goalId: issues.goalId, parentId: issues.parentId, }).from(issues).where(eq(issues.id, currentId)).then(r => r[0] ?? null); if (!parent) break; raw.push({ id: parent.id, identifier: parent.identifier ?? null, title: parent.title, description: parent.description ?? null, status: parent.status, priority: parent.priority, assigneeAgentId: parent.assigneeAgentId ?? null, projectId: parent.projectId ?? null, goalId: parent.goalId ?? null, }); currentId = parent.parentId ?? null; } // Batch-fetch referenced projects and goals const projectIds = [...new Set(raw.map(a => a.projectId).filter((id): id is string => id != null))]; const goalIds = [...new Set(raw.map(a => a.goalId).filter((id): id is string => id != null))]; const projectMap = new Map(); const goalMap = new Map(); if (projectIds.length > 0) { const rows = await db.select({ id: projects.id, name: projects.name, description: projects.description, status: projects.status, goalId: projects.goalId, }).from(projects).where(inArray(projects.id, projectIds)); for (const r of rows) { projectMap.set(r.id, r); // Also collect goalIds from projects if (r.goalId && !goalIds.includes(r.goalId)) goalIds.push(r.goalId); } } if (goalIds.length > 0) { const rows = await db.select({ id: goals.id, title: goals.title, description: goals.description, level: goals.level, status: goals.status, }).from(goals).where(inArray(goals.id, goalIds)); for (const r of rows) goalMap.set(r.id, r); } return raw.map(a => ({ ...a, project: a.projectId ? projectMap.get(a.projectId) ?? null : null, goal: a.goalId ? goalMap.get(a.goalId) ?? null : null, })); }, staleCount: async (companyId: string, minutes = 60) => { const cutoff = new Date(Date.now() - minutes * 60 * 1000); const result = await db .select({ count: sql`count(*)` }) .from(issues) .where( and( eq(issues.companyId, companyId), eq(issues.status, "in_progress"), isNull(issues.hiddenAt), sql`${issues.startedAt} < ${cutoff.toISOString()}`, ), ) .then((rows) => rows[0]); return Number(result?.count ?? 0); }, }; }