From 99eb31760086c31bcd60a0fa012ee362b4ed519b Mon Sep 17 00:00:00 2001 From: dotta Date: Fri, 20 Mar 2026 16:15:32 -0500 Subject: [PATCH] fix: harden routine dispatch and permissions --- server/src/__tests__/routines-routes.test.ts | 156 +++++++++ server/src/__tests__/routines-service.test.ts | 73 ++++- server/src/routes/routines.ts | 20 +- server/src/services/routines.ts | 307 ++++++++++-------- 4 files changed, 417 insertions(+), 139 deletions(-) create mode 100644 server/src/__tests__/routines-routes.test.ts diff --git a/server/src/__tests__/routines-routes.test.ts b/server/src/__tests__/routines-routes.test.ts new file mode 100644 index 00000000..b6495571 --- /dev/null +++ b/server/src/__tests__/routines-routes.test.ts @@ -0,0 +1,156 @@ +import express from "express"; +import request from "supertest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { routineRoutes } from "../routes/routines.js"; +import { errorHandler } from "../middleware/index.js"; + +const companyId = "22222222-2222-4222-8222-222222222222"; +const agentId = "11111111-1111-4111-8111-111111111111"; +const routineId = "33333333-3333-4333-8333-333333333333"; +const projectId = "44444444-4444-4444-8444-444444444444"; +const otherAgentId = "55555555-5555-4555-8555-555555555555"; + +const routine = { + id: routineId, + companyId, + projectId, + goalId: null, + parentIssueId: null, + title: "Daily routine", + description: null, + assigneeAgentId: agentId, + priority: "medium", + status: "active", + concurrencyPolicy: "coalesce_if_active", + catchUpPolicy: "skip_missed", + createdByAgentId: null, + createdByUserId: null, + updatedByAgentId: null, + updatedByUserId: null, + lastTriggeredAt: null, + lastEnqueuedAt: null, + createdAt: new Date("2026-03-20T00:00:00.000Z"), + updatedAt: new Date("2026-03-20T00:00:00.000Z"), +}; + +const mockRoutineService = vi.hoisted(() => ({ + list: vi.fn(), + get: vi.fn(), + getDetail: vi.fn(), + update: vi.fn(), + create: vi.fn(), + listRuns: vi.fn(), + createTrigger: vi.fn(), + getTrigger: vi.fn(), + updateTrigger: vi.fn(), + deleteTrigger: vi.fn(), + rotateTriggerSecret: vi.fn(), + runRoutine: vi.fn(), + firePublicTrigger: vi.fn(), +})); + +const mockAccessService = vi.hoisted(() => ({ + canUser: vi.fn(), +})); + +const mockLogActivity = vi.hoisted(() => vi.fn()); + +vi.mock("../services/index.js", () => ({ + accessService: () => mockAccessService, + logActivity: mockLogActivity, + routineService: () => mockRoutineService, +})); + +function createApp(actor: Record) { + const app = express(); + app.use(express.json()); + app.use((req, _res, next) => { + (req as any).actor = actor; + next(); + }); + app.use("/api", routineRoutes({} as any)); + app.use(errorHandler); + return app; +} + +describe("routine routes", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockRoutineService.create.mockResolvedValue(routine); + mockRoutineService.get.mockResolvedValue(routine); + mockRoutineService.update.mockResolvedValue({ ...routine, assigneeAgentId: otherAgentId }); + mockAccessService.canUser.mockResolvedValue(false); + mockLogActivity.mockResolvedValue(undefined); + }); + + it("requires tasks:assign permission for non-admin board routine creation", async () => { + const app = createApp({ + type: "board", + userId: "board-user", + source: "session", + isInstanceAdmin: false, + companyIds: [companyId], + }); + + const res = await request(app) + .post(`/api/companies/${companyId}/routines`) + .send({ + projectId, + title: "Daily routine", + assigneeAgentId: agentId, + }); + + expect(res.status).toBe(403); + expect(res.body.error).toContain("tasks:assign"); + expect(mockRoutineService.create).not.toHaveBeenCalled(); + }); + + it("requires tasks:assign permission to retarget a routine assignee", async () => { + const app = createApp({ + type: "board", + userId: "board-user", + source: "session", + isInstanceAdmin: false, + companyIds: [companyId], + }); + + const res = await request(app) + .patch(`/api/routines/${routineId}`) + .send({ + assigneeAgentId: otherAgentId, + }); + + expect(res.status).toBe(403); + expect(res.body.error).toContain("tasks:assign"); + expect(mockRoutineService.update).not.toHaveBeenCalled(); + }); + + it("allows routine creation when the board user has tasks:assign", async () => { + mockAccessService.canUser.mockResolvedValue(true); + const app = createApp({ + type: "board", + userId: "board-user", + source: "session", + isInstanceAdmin: false, + companyIds: [companyId], + }); + + const res = await request(app) + .post(`/api/companies/${companyId}/routines`) + .send({ + projectId, + title: "Daily routine", + assigneeAgentId: agentId, + }); + + expect(res.status).toBe(201); + expect(mockRoutineService.create).toHaveBeenCalledWith(companyId, expect.objectContaining({ + projectId, + title: "Daily routine", + assigneeAgentId: agentId, + }), { + agentId: null, + userId: "board-user", + }); + }); +}); diff --git a/server/src/__tests__/routines-service.test.ts b/server/src/__tests__/routines-service.test.ts index 2d07d63a..31e487ac 100644 --- a/server/src/__tests__/routines-service.test.ts +++ b/server/src/__tests__/routines-service.test.ts @@ -176,7 +176,30 @@ describe("routine service live-execution coalescing", () => { heartbeat: { wakeup: async (wakeupAgentId, wakeupOpts) => { wakeups.push({ agentId: wakeupAgentId, opts: wakeupOpts }); - return opts?.wakeup ? opts.wakeup(wakeupAgentId, wakeupOpts) : null; + if (opts?.wakeup) return opts.wakeup(wakeupAgentId, wakeupOpts); + const issueId = + (typeof wakeupOpts.payload?.issueId === "string" && wakeupOpts.payload.issueId) || + (typeof wakeupOpts.contextSnapshot?.issueId === "string" && wakeupOpts.contextSnapshot.issueId) || + null; + if (!issueId) return null; + const queuedRunId = randomUUID(); + await db.insert(heartbeatRuns).values({ + id: queuedRunId, + companyId, + agentId: wakeupAgentId, + invocationSource: wakeupOpts.source ?? "assignment", + triggerDetail: wakeupOpts.triggerDetail ?? null, + status: "queued", + contextSnapshot: { ...(wakeupOpts.contextSnapshot ?? {}), issueId }, + }); + await db + .update(issues) + .set({ + executionRunId: queuedRunId, + executionLockedAt: new Date(), + }) + .where(eq(issues.id, issueId)); + return { id: queuedRunId }; }, }, }); @@ -350,4 +373,52 @@ describe("routine service live-execution coalescing", () => { expect(routineIssues).toHaveLength(1); expect(routineIssues[0]?.id).toBe(previousIssue.id); }); + + it("serializes concurrent dispatches until the first execution issue is linked to a queued run", async () => { + const { routine, svc } = await seedFixture({ + wakeup: async (wakeupAgentId, wakeupOpts) => { + const issueId = + (typeof wakeupOpts.payload?.issueId === "string" && wakeupOpts.payload.issueId) || + (typeof wakeupOpts.contextSnapshot?.issueId === "string" && wakeupOpts.contextSnapshot.issueId) || + null; + await new Promise((resolve) => setTimeout(resolve, 25)); + if (!issueId) return null; + const queuedRunId = randomUUID(); + await db.insert(heartbeatRuns).values({ + id: queuedRunId, + companyId: routine.companyId, + agentId: wakeupAgentId, + invocationSource: wakeupOpts.source ?? "assignment", + triggerDetail: wakeupOpts.triggerDetail ?? null, + status: "queued", + contextSnapshot: { ...(wakeupOpts.contextSnapshot ?? {}), issueId }, + }); + await db + .update(issues) + .set({ + executionRunId: queuedRunId, + executionLockedAt: new Date(), + }) + .where(eq(issues.id, issueId)); + return { id: queuedRunId }; + }, + }); + + const [first, second] = await Promise.all([ + svc.runRoutine(routine.id, { source: "manual" }), + svc.runRoutine(routine.id, { source: "manual" }), + ]); + + expect([first.status, second.status].sort()).toEqual(["coalesced", "issue_created"]); + expect(first.linkedIssueId).toBeTruthy(); + expect(second.linkedIssueId).toBeTruthy(); + expect(first.linkedIssueId).toBe(second.linkedIssueId); + + const routineIssues = await db + .select({ id: issues.id }) + .from(issues) + .where(eq(issues.originId, routine.id)); + + expect(routineIssues).toHaveLength(1); + }); }); diff --git a/server/src/routes/routines.ts b/server/src/routes/routines.ts index 41452420..5560fbb6 100644 --- a/server/src/routes/routines.ts +++ b/server/src/routes/routines.ts @@ -9,13 +9,24 @@ import { updateRoutineTriggerSchema, } from "@paperclipai/shared"; import { validate } from "../middleware/validate.js"; -import { logActivity, routineService } from "../services/index.js"; +import { accessService, logActivity, routineService } from "../services/index.js"; import { assertCompanyAccess, getActorInfo } from "./authz.js"; import { forbidden, unauthorized } from "../errors.js"; export function routineRoutes(db: Db) { const router = Router(); const svc = routineService(db); + const access = accessService(db); + + async function assertBoardCanAssignTasks(req: Request, companyId: string) { + assertCompanyAccess(req, companyId); + if (req.actor.type !== "board") return; + if (req.actor.source === "local_implicit" || req.actor.isInstanceAdmin) return; + const allowed = await access.canUser(companyId, req.actor.userId, "tasks:assign"); + if (!allowed) { + throw forbidden("Missing permission: tasks:assign"); + } + } function assertCanManageCompanyRoutine(req: Request, companyId: string, assigneeAgentId?: string | null) { assertCompanyAccess(req, companyId); @@ -47,6 +58,7 @@ export function routineRoutes(db: Db) { router.post("/companies/:companyId/routines", validate(createRoutineSchema), async (req, res) => { const companyId = req.params.companyId as string; + await assertBoardCanAssignTasks(req, companyId); assertCanManageCompanyRoutine(req, companyId, req.body.assigneeAgentId); const created = await svc.create(companyId, req.body, { agentId: req.actor.type === "agent" ? req.actor.agentId : null, @@ -83,6 +95,12 @@ export function routineRoutes(db: Db) { res.status(404).json({ error: "Routine not found" }); return; } + const assigneeWillChange = + req.body.assigneeAgentId !== undefined && + req.body.assigneeAgentId !== routine.assigneeAgentId; + if (assigneeWillChange) { + await assertBoardCanAssignTasks(req, routine.companyId); + } if (req.actor.type === "agent" && req.body.assigneeAgentId && req.body.assigneeAgentId !== req.actor.agentId) { throw forbidden("Agents can only assign routines to themselves"); } diff --git a/server/src/services/routines.ts b/server/src/services/routines.ts index 06adcaa8..2243c1b1 100644 --- a/server/src/services/routines.ts +++ b/server/src/services/routines.ts @@ -26,11 +26,13 @@ import type { UpdateRoutineTrigger, } from "@paperclipai/shared"; import { conflict, forbidden, notFound, unauthorized, unprocessable } from "../errors.js"; +import { logger } from "../middleware/logger.js"; import { issueService } from "./issues.js"; import { secretService } from "./secrets.js"; import { parseCron, validateCron } from "./cron.js"; import { heartbeatService } from "./heartbeat.js"; import { queueIssueAssignmentWakeup, type IssueAssignmentWakeupDeps } from "./issue-assignment-wakeup.js"; +import { logActivity } from "./activity-log.js"; const OPEN_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked"]; const LIVE_HEARTBEAT_RUN_STATUSES = ["queued", "running"]; @@ -386,8 +388,8 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup status: string; issueId?: string | null; nextRunAt?: Date | null; - }) { - await db + }, executor: Db = db) { + await executor .update(routines) .set({ lastTriggeredAt: input.triggeredAt, @@ -397,7 +399,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup .where(eq(routines.id, input.routineId)); if (input.triggerId) { - await db + await executor .update(routineTriggers) .set({ lastFiredAt: input.triggeredAt, @@ -409,8 +411,8 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup } } - async function findLiveExecutionIssue(routine: typeof routines.$inferSelect) { - const executionBoundIssue = await db + async function findLiveExecutionIssue(routine: typeof routines.$inferSelect, executor: Db = db) { + const executionBoundIssue = await executor .select() .from(issues) .innerJoin( @@ -434,7 +436,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup .then((rows) => rows[0]?.issues ?? null); if (executionBoundIssue) return executionBoundIssue; - return db + return executor .select() .from(issues) .innerJoin( @@ -459,8 +461,8 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup .then((rows) => rows[0]?.issues ?? null); } - async function finalizeRun(runId: string, patch: Partial) { - return db + async function finalizeRun(runId: string, patch: Partial, executor: Db = db) { + return executor .update(routineRuns) .set({ ...patch, @@ -509,150 +511,181 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup payload?: Record | null; idempotencyKey?: string | null; }) { - if (input.idempotencyKey) { - const existing = await db - .select() - .from(routineRuns) - .where( - and( - eq(routineRuns.companyId, input.routine.companyId), - eq(routineRuns.routineId, input.routine.id), - eq(routineRuns.source, input.source), - eq(routineRuns.idempotencyKey, input.idempotencyKey), - input.trigger ? eq(routineRuns.triggerId, input.trigger.id) : isNull(routineRuns.triggerId), - ), - ) - .orderBy(desc(routineRuns.createdAt)) - .limit(1) - .then((rows) => rows[0] ?? null); - if (existing) return existing; - } + const run = await db.transaction(async (tx) => { + const txDb = tx as unknown as Db; + await tx.execute( + sql`select id from ${routines} where ${routines.id} = ${input.routine.id} and ${routines.companyId} = ${input.routine.companyId} for update`, + ); - const triggeredAt = new Date(); - const [run] = await db - .insert(routineRuns) - .values({ - companyId: input.routine.companyId, - routineId: input.routine.id, - triggerId: input.trigger?.id ?? null, - source: input.source, - status: "received", - triggeredAt, - idempotencyKey: input.idempotencyKey ?? null, - triggerPayload: input.payload ?? null, - }) - .returning(); - - const nextRunAt = input.trigger?.kind === "schedule" && input.trigger.cronExpression && input.trigger.timezone - ? nextCronTickInTimeZone(input.trigger.cronExpression, input.trigger.timezone, triggeredAt) - : undefined; - - try { - const activeIssue = await findLiveExecutionIssue(input.routine); - if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") { - const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced"; - const updated = await finalizeRun(run.id, { - status, - linkedIssueId: activeIssue.id, - coalescedIntoRunId: activeIssue.originRunId, - completedAt: triggeredAt, - }); - await updateRoutineTouchedState({ - routineId: input.routine.id, - triggerId: input.trigger?.id ?? null, - triggeredAt, - status, - issueId: activeIssue.id, - nextRunAt, - }); - return updated ?? run; + if (input.idempotencyKey) { + const existing = await txDb + .select() + .from(routineRuns) + .where( + and( + eq(routineRuns.companyId, input.routine.companyId), + eq(routineRuns.routineId, input.routine.id), + eq(routineRuns.source, input.source), + eq(routineRuns.idempotencyKey, input.idempotencyKey), + input.trigger ? eq(routineRuns.triggerId, input.trigger.id) : isNull(routineRuns.triggerId), + ), + ) + .orderBy(desc(routineRuns.createdAt)) + .limit(1) + .then((rows) => rows[0] ?? null); + if (existing) return existing; } - let createdIssue; + const triggeredAt = new Date(); + const [createdRun] = await txDb + .insert(routineRuns) + .values({ + companyId: input.routine.companyId, + routineId: input.routine.id, + triggerId: input.trigger?.id ?? null, + source: input.source, + status: "received", + triggeredAt, + idempotencyKey: input.idempotencyKey ?? null, + triggerPayload: input.payload ?? null, + }) + .returning(); + + const nextRunAt = input.trigger?.kind === "schedule" && input.trigger.cronExpression && input.trigger.timezone + ? nextCronTickInTimeZone(input.trigger.cronExpression, input.trigger.timezone, triggeredAt) + : undefined; + try { - createdIssue = await issueSvc.create(input.routine.companyId, { - projectId: input.routine.projectId, - goalId: input.routine.goalId, - parentId: input.routine.parentIssueId, - title: input.routine.title, - description: input.routine.description, - status: "todo", - priority: input.routine.priority, - assigneeAgentId: input.routine.assigneeAgentId, - originKind: "routine_execution", - originId: input.routine.id, - originRunId: run.id, - }); - } catch (error) { - const isOpenExecutionConflict = - !!error && - typeof error === "object" && - "code" in error && - (error as { code?: string }).code === "23505" && - "constraint" in error && - (error as { constraint?: string }).constraint === "issues_open_routine_execution_uq"; - if (!isOpenExecutionConflict || input.routine.concurrencyPolicy === "always_enqueue") { - throw error; + const activeIssue = await findLiveExecutionIssue(input.routine, txDb); + if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") { + const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced"; + const updated = await finalizeRun(createdRun.id, { + status, + linkedIssueId: activeIssue.id, + coalescedIntoRunId: activeIssue.originRunId, + completedAt: triggeredAt, + }, txDb); + await updateRoutineTouchedState({ + routineId: input.routine.id, + triggerId: input.trigger?.id ?? null, + triggeredAt, + status, + issueId: activeIssue.id, + nextRunAt, + }, txDb); + return updated ?? createdRun; } - const existingIssue = await findLiveExecutionIssue(input.routine); - if (!existingIssue) throw error; - const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced"; - const updated = await finalizeRun(run.id, { - status, - linkedIssueId: existingIssue.id, - coalescedIntoRunId: existingIssue.originRunId, - completedAt: triggeredAt, + let createdIssue; + try { + createdIssue = await issueSvc.create(input.routine.companyId, { + projectId: input.routine.projectId, + goalId: input.routine.goalId, + parentId: input.routine.parentIssueId, + title: input.routine.title, + description: input.routine.description, + status: "todo", + priority: input.routine.priority, + assigneeAgentId: input.routine.assigneeAgentId, + originKind: "routine_execution", + originId: input.routine.id, + originRunId: createdRun.id, + }); + } catch (error) { + const isOpenExecutionConflict = + !!error && + typeof error === "object" && + "code" in error && + (error as { code?: string }).code === "23505" && + "constraint" in error && + (error as { constraint?: string }).constraint === "issues_open_routine_execution_uq"; + if (!isOpenExecutionConflict || input.routine.concurrencyPolicy === "always_enqueue") { + throw error; + } + + const existingIssue = await findLiveExecutionIssue(input.routine, txDb); + if (!existingIssue) throw error; + const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced"; + const updated = await finalizeRun(createdRun.id, { + status, + linkedIssueId: existingIssue.id, + coalescedIntoRunId: existingIssue.originRunId, + completedAt: triggeredAt, + }, txDb); + await updateRoutineTouchedState({ + routineId: input.routine.id, + triggerId: input.trigger?.id ?? null, + triggeredAt, + status, + issueId: existingIssue.id, + nextRunAt, + }, txDb); + return updated ?? createdRun; + } + + // Keep the dispatch lock until the issue is linked to a queued heartbeat run. + await queueIssueAssignmentWakeup({ + heartbeat, + issue: createdIssue, + reason: "issue_assigned", + mutation: "create", + contextSource: "routine.dispatch", + requestedByActorType: input.source === "schedule" ? "system" : undefined, }); + const updated = await finalizeRun(createdRun.id, { + status: "issue_created", + linkedIssueId: createdIssue.id, + }, txDb); await updateRoutineTouchedState({ routineId: input.routine.id, triggerId: input.trigger?.id ?? null, triggeredAt, - status, - issueId: existingIssue.id, + status: "issue_created", + issueId: createdIssue.id, nextRunAt, - }); - return updated ?? run; + }, txDb); + return updated ?? createdRun; + } catch (error) { + const failureReason = error instanceof Error ? error.message : String(error); + const failed = await finalizeRun(createdRun.id, { + status: "failed", + failureReason, + completedAt: new Date(), + }, txDb); + await updateRoutineTouchedState({ + routineId: input.routine.id, + triggerId: input.trigger?.id ?? null, + triggeredAt, + status: "failed", + nextRunAt, + }, txDb); + return failed ?? createdRun; } + }); - const updated = await finalizeRun(run.id, { - status: "issue_created", - linkedIssueId: createdIssue.id, - }); - // Ensure the wake request is durably queued before reporting the routine run as created. - await queueIssueAssignmentWakeup({ - heartbeat, - issue: createdIssue, - reason: "issue_assigned", - mutation: "create", - contextSource: "routine.dispatch", - requestedByActorType: input.source === "schedule" ? "system" : undefined, - }); - await updateRoutineTouchedState({ - routineId: input.routine.id, - triggerId: input.trigger?.id ?? null, - triggeredAt, - status: "issue_created", - issueId: createdIssue.id, - nextRunAt, - }); - return updated ?? run; - } catch (error) { - const failureReason = error instanceof Error ? error.message : String(error); - const failed = await finalizeRun(run.id, { - status: "failed", - failureReason, - completedAt: new Date(), - }); - await updateRoutineTouchedState({ - routineId: input.routine.id, - triggerId: input.trigger?.id ?? null, - triggeredAt, - status: "failed", - nextRunAt, - }); - return failed ?? run; + if (input.source === "schedule" || input.source === "webhook") { + const actorId = input.source === "schedule" ? "routine-scheduler" : "routine-webhook"; + try { + await logActivity(db, { + companyId: input.routine.companyId, + actorType: "system", + actorId, + action: "routine.run_triggered", + entityType: "routine_run", + entityId: run.id, + details: { + routineId: input.routine.id, + triggerId: input.trigger?.id ?? null, + source: run.source, + status: run.status, + }, + }); + } catch (err) { + logger.warn({ err, routineId: input.routine.id, runId: run.id }, "failed to log automated routine run"); + } } + + return run; } return {