server: make approval retries idempotent (#499)
This commit is contained in:
110
server/src/__tests__/approval-routes-idempotency.test.ts
Normal file
110
server/src/__tests__/approval-routes-idempotency.test.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
import express from "express";
|
||||
import request from "supertest";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { approvalRoutes } from "../routes/approvals.js";
|
||||
import { errorHandler } from "../middleware/index.js";
|
||||
|
||||
const mockApprovalService = vi.hoisted(() => ({
|
||||
list: vi.fn(),
|
||||
getById: vi.fn(),
|
||||
create: vi.fn(),
|
||||
approve: vi.fn(),
|
||||
reject: vi.fn(),
|
||||
requestRevision: vi.fn(),
|
||||
resubmit: vi.fn(),
|
||||
listComments: vi.fn(),
|
||||
addComment: vi.fn(),
|
||||
}));
|
||||
|
||||
const mockHeartbeatService = vi.hoisted(() => ({
|
||||
wakeup: vi.fn(),
|
||||
}));
|
||||
|
||||
const mockIssueApprovalService = vi.hoisted(() => ({
|
||||
listIssuesForApproval: vi.fn(),
|
||||
linkManyForApproval: vi.fn(),
|
||||
}));
|
||||
|
||||
const mockSecretService = vi.hoisted(() => ({
|
||||
normalizeHireApprovalPayloadForPersistence: vi.fn(),
|
||||
}));
|
||||
|
||||
const mockLogActivity = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("../services/index.js", () => ({
|
||||
approvalService: () => mockApprovalService,
|
||||
heartbeatService: () => mockHeartbeatService,
|
||||
issueApprovalService: () => mockIssueApprovalService,
|
||||
logActivity: mockLogActivity,
|
||||
secretService: () => mockSecretService,
|
||||
}));
|
||||
|
||||
function createApp() {
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
app.use((req, _res, next) => {
|
||||
(req as any).actor = {
|
||||
type: "board",
|
||||
userId: "user-1",
|
||||
companyIds: ["company-1"],
|
||||
source: "session",
|
||||
isInstanceAdmin: false,
|
||||
};
|
||||
next();
|
||||
});
|
||||
app.use("/api", approvalRoutes({} as any));
|
||||
app.use(errorHandler);
|
||||
return app;
|
||||
}
|
||||
|
||||
describe("approval routes idempotent retries", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockHeartbeatService.wakeup.mockResolvedValue({ id: "wake-1" });
|
||||
mockIssueApprovalService.listIssuesForApproval.mockResolvedValue([{ id: "issue-1" }]);
|
||||
mockLogActivity.mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
it("does not emit duplicate approval side effects when approve is already resolved", async () => {
|
||||
mockApprovalService.approve.mockResolvedValue({
|
||||
approval: {
|
||||
id: "approval-1",
|
||||
companyId: "company-1",
|
||||
type: "hire_agent",
|
||||
status: "approved",
|
||||
payload: {},
|
||||
requestedByAgentId: "agent-1",
|
||||
},
|
||||
applied: false,
|
||||
});
|
||||
|
||||
const res = await request(createApp())
|
||||
.post("/api/approvals/approval-1/approve")
|
||||
.send({});
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(mockIssueApprovalService.listIssuesForApproval).not.toHaveBeenCalled();
|
||||
expect(mockHeartbeatService.wakeup).not.toHaveBeenCalled();
|
||||
expect(mockLogActivity).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not emit duplicate rejection logs when reject is already resolved", async () => {
|
||||
mockApprovalService.reject.mockResolvedValue({
|
||||
approval: {
|
||||
id: "approval-1",
|
||||
companyId: "company-1",
|
||||
type: "hire_agent",
|
||||
status: "rejected",
|
||||
payload: {},
|
||||
},
|
||||
applied: false,
|
||||
});
|
||||
|
||||
const res = await request(createApp())
|
||||
.post("/api/approvals/approval-1/reject")
|
||||
.send({});
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(mockLogActivity).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
110
server/src/__tests__/approvals-service.test.ts
Normal file
110
server/src/__tests__/approvals-service.test.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { approvalService } from "../services/approvals.js";
|
||||
|
||||
const mockAgentService = vi.hoisted(() => ({
|
||||
activatePendingApproval: vi.fn(),
|
||||
create: vi.fn(),
|
||||
terminate: vi.fn(),
|
||||
}));
|
||||
|
||||
const mockNotifyHireApproved = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("../services/agents.js", () => ({
|
||||
agentService: vi.fn(() => mockAgentService),
|
||||
}));
|
||||
|
||||
vi.mock("../services/hire-hook.js", () => ({
|
||||
notifyHireApproved: mockNotifyHireApproved,
|
||||
}));
|
||||
|
||||
type ApprovalRecord = {
|
||||
id: string;
|
||||
companyId: string;
|
||||
type: string;
|
||||
status: string;
|
||||
payload: Record<string, unknown>;
|
||||
requestedByAgentId: string | null;
|
||||
};
|
||||
|
||||
function createApproval(status: string): ApprovalRecord {
|
||||
return {
|
||||
id: "approval-1",
|
||||
companyId: "company-1",
|
||||
type: "hire_agent",
|
||||
status,
|
||||
payload: { agentId: "agent-1" },
|
||||
requestedByAgentId: "requester-1",
|
||||
};
|
||||
}
|
||||
|
||||
function createDbStub(selectResults: ApprovalRecord[][], updateResults: ApprovalRecord[]) {
|
||||
const selectWhere = vi.fn();
|
||||
for (const result of selectResults) {
|
||||
selectWhere.mockResolvedValueOnce(result);
|
||||
}
|
||||
|
||||
const from = vi.fn(() => ({ where: selectWhere }));
|
||||
const select = vi.fn(() => ({ from }));
|
||||
|
||||
const returning = vi.fn().mockResolvedValue(updateResults);
|
||||
const updateWhere = vi.fn(() => ({ returning }));
|
||||
const set = vi.fn(() => ({ where: updateWhere }));
|
||||
const update = vi.fn(() => ({ set }));
|
||||
|
||||
return {
|
||||
db: { select, update },
|
||||
selectWhere,
|
||||
returning,
|
||||
};
|
||||
}
|
||||
|
||||
describe("approvalService resolution idempotency", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockAgentService.activatePendingApproval.mockResolvedValue(undefined);
|
||||
mockAgentService.create.mockResolvedValue({ id: "agent-1" });
|
||||
mockAgentService.terminate.mockResolvedValue(undefined);
|
||||
mockNotifyHireApproved.mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
it("treats repeated approve retries as no-ops after another worker resolves the approval", async () => {
|
||||
const dbStub = createDbStub(
|
||||
[[createApproval("pending")], [createApproval("approved")]],
|
||||
[],
|
||||
);
|
||||
|
||||
const svc = approvalService(dbStub.db as any);
|
||||
const result = await svc.approve("approval-1", "board", "ship it");
|
||||
|
||||
expect(result.applied).toBe(false);
|
||||
expect(result.approval.status).toBe("approved");
|
||||
expect(mockAgentService.activatePendingApproval).not.toHaveBeenCalled();
|
||||
expect(mockNotifyHireApproved).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("treats repeated reject retries as no-ops after another worker resolves the approval", async () => {
|
||||
const dbStub = createDbStub(
|
||||
[[createApproval("pending")], [createApproval("rejected")]],
|
||||
[],
|
||||
);
|
||||
|
||||
const svc = approvalService(dbStub.db as any);
|
||||
const result = await svc.reject("approval-1", "board", "not now");
|
||||
|
||||
expect(result.applied).toBe(false);
|
||||
expect(result.approval.status).toBe("rejected");
|
||||
expect(mockAgentService.terminate).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("still performs side effects when the resolution update is newly applied", async () => {
|
||||
const approved = createApproval("approved");
|
||||
const dbStub = createDbStub([[createApproval("pending")]], [approved]);
|
||||
|
||||
const svc = approvalService(dbStub.db as any);
|
||||
const result = await svc.approve("approval-1", "board", "ship it");
|
||||
|
||||
expect(result.applied).toBe(true);
|
||||
expect(mockAgentService.activatePendingApproval).toHaveBeenCalledWith("agent-1");
|
||||
expect(mockNotifyHireApproved).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@@ -121,85 +121,92 @@ export function approvalRoutes(db: Db) {
|
||||
router.post("/approvals/:id/approve", validate(resolveApprovalSchema), async (req, res) => {
|
||||
assertBoard(req);
|
||||
const id = req.params.id as string;
|
||||
const approval = await svc.approve(id, req.body.decidedByUserId ?? "board", req.body.decisionNote);
|
||||
const linkedIssues = await issueApprovalsSvc.listIssuesForApproval(approval.id);
|
||||
const linkedIssueIds = linkedIssues.map((issue) => issue.id);
|
||||
const primaryIssueId = linkedIssueIds[0] ?? null;
|
||||
const { approval, applied } = await svc.approve(
|
||||
id,
|
||||
req.body.decidedByUserId ?? "board",
|
||||
req.body.decisionNote,
|
||||
);
|
||||
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.approved",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: {
|
||||
type: approval.type,
|
||||
requestedByAgentId: approval.requestedByAgentId,
|
||||
linkedIssueIds,
|
||||
},
|
||||
});
|
||||
if (applied) {
|
||||
const linkedIssues = await issueApprovalsSvc.listIssuesForApproval(approval.id);
|
||||
const linkedIssueIds = linkedIssues.map((issue) => issue.id);
|
||||
const primaryIssueId = linkedIssueIds[0] ?? null;
|
||||
|
||||
if (approval.requestedByAgentId) {
|
||||
try {
|
||||
const wakeRun = await heartbeat.wakeup(approval.requestedByAgentId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "approval_approved",
|
||||
payload: {
|
||||
approvalId: approval.id,
|
||||
approvalStatus: approval.status,
|
||||
issueId: primaryIssueId,
|
||||
issueIds: linkedIssueIds,
|
||||
},
|
||||
requestedByActorType: "user",
|
||||
requestedByActorId: req.actor.userId ?? "board",
|
||||
contextSnapshot: {
|
||||
source: "approval.approved",
|
||||
approvalId: approval.id,
|
||||
approvalStatus: approval.status,
|
||||
issueId: primaryIssueId,
|
||||
issueIds: linkedIssueIds,
|
||||
taskId: primaryIssueId,
|
||||
wakeReason: "approval_approved",
|
||||
},
|
||||
});
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.approved",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: {
|
||||
type: approval.type,
|
||||
requestedByAgentId: approval.requestedByAgentId,
|
||||
linkedIssueIds,
|
||||
},
|
||||
});
|
||||
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.requester_wakeup_queued",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: {
|
||||
requesterAgentId: approval.requestedByAgentId,
|
||||
wakeRunId: wakeRun?.id ?? null,
|
||||
linkedIssueIds,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{
|
||||
err,
|
||||
approvalId: approval.id,
|
||||
requestedByAgentId: approval.requestedByAgentId,
|
||||
},
|
||||
"failed to queue requester wakeup after approval",
|
||||
);
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.requester_wakeup_failed",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: {
|
||||
requesterAgentId: approval.requestedByAgentId,
|
||||
linkedIssueIds,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
},
|
||||
});
|
||||
if (approval.requestedByAgentId) {
|
||||
try {
|
||||
const wakeRun = await heartbeat.wakeup(approval.requestedByAgentId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "approval_approved",
|
||||
payload: {
|
||||
approvalId: approval.id,
|
||||
approvalStatus: approval.status,
|
||||
issueId: primaryIssueId,
|
||||
issueIds: linkedIssueIds,
|
||||
},
|
||||
requestedByActorType: "user",
|
||||
requestedByActorId: req.actor.userId ?? "board",
|
||||
contextSnapshot: {
|
||||
source: "approval.approved",
|
||||
approvalId: approval.id,
|
||||
approvalStatus: approval.status,
|
||||
issueId: primaryIssueId,
|
||||
issueIds: linkedIssueIds,
|
||||
taskId: primaryIssueId,
|
||||
wakeReason: "approval_approved",
|
||||
},
|
||||
});
|
||||
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.requester_wakeup_queued",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: {
|
||||
requesterAgentId: approval.requestedByAgentId,
|
||||
wakeRunId: wakeRun?.id ?? null,
|
||||
linkedIssueIds,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{
|
||||
err,
|
||||
approvalId: approval.id,
|
||||
requestedByAgentId: approval.requestedByAgentId,
|
||||
},
|
||||
"failed to queue requester wakeup after approval",
|
||||
);
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.requester_wakeup_failed",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: {
|
||||
requesterAgentId: approval.requestedByAgentId,
|
||||
linkedIssueIds,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,17 +216,23 @@ export function approvalRoutes(db: Db) {
|
||||
router.post("/approvals/:id/reject", validate(resolveApprovalSchema), async (req, res) => {
|
||||
assertBoard(req);
|
||||
const id = req.params.id as string;
|
||||
const approval = await svc.reject(id, req.body.decidedByUserId ?? "board", req.body.decisionNote);
|
||||
const { approval, applied } = await svc.reject(
|
||||
id,
|
||||
req.body.decidedByUserId ?? "board",
|
||||
req.body.decisionNote,
|
||||
);
|
||||
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.rejected",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: { type: approval.type },
|
||||
});
|
||||
if (applied) {
|
||||
await logActivity(db, {
|
||||
companyId: approval.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "approval.rejected",
|
||||
entityType: "approval",
|
||||
entityId: approval.id,
|
||||
details: { type: approval.type },
|
||||
});
|
||||
}
|
||||
|
||||
res.json(redactApprovalPayload(approval));
|
||||
});
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { and, asc, eq } from "drizzle-orm";
|
||||
import { and, asc, eq, inArray } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { approvalComments, approvals } from "@paperclipai/db";
|
||||
import { notFound, unprocessable } from "../errors.js";
|
||||
@@ -8,6 +8,9 @@ import { notifyHireApproved } from "./hire-hook.js";
|
||||
export function approvalService(db: Db) {
|
||||
const agentsSvc = agentService(db);
|
||||
const canResolveStatuses = new Set(["pending", "revision_requested"]);
|
||||
const resolvableStatuses = Array.from(canResolveStatuses);
|
||||
type ApprovalRecord = typeof approvals.$inferSelect;
|
||||
type ResolutionResult = { approval: ApprovalRecord; applied: boolean };
|
||||
|
||||
async function getExistingApproval(id: string) {
|
||||
const existing = await db
|
||||
@@ -19,6 +22,50 @@ export function approvalService(db: Db) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
async function resolveApproval(
|
||||
id: string,
|
||||
targetStatus: "approved" | "rejected",
|
||||
decidedByUserId: string,
|
||||
decisionNote: string | null | undefined,
|
||||
): Promise<ResolutionResult> {
|
||||
const existing = await getExistingApproval(id);
|
||||
if (!canResolveStatuses.has(existing.status)) {
|
||||
if (existing.status === targetStatus) {
|
||||
return { approval: existing, applied: false };
|
||||
}
|
||||
throw unprocessable(
|
||||
`Only pending or revision requested approvals can be ${targetStatus === "approved" ? "approved" : "rejected"}`,
|
||||
);
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const updated = await db
|
||||
.update(approvals)
|
||||
.set({
|
||||
status: targetStatus,
|
||||
decidedByUserId,
|
||||
decisionNote: decisionNote ?? null,
|
||||
decidedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(approvals.id, id), inArray(approvals.status, resolvableStatuses)))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (updated) {
|
||||
return { approval: updated, applied: true };
|
||||
}
|
||||
|
||||
const latest = await getExistingApproval(id);
|
||||
if (latest.status === targetStatus) {
|
||||
return { approval: latest, applied: false };
|
||||
}
|
||||
|
||||
throw unprocessable(
|
||||
`Only pending or revision requested approvals can be ${targetStatus === "approved" ? "approved" : "rejected"}`,
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
list: (companyId: string, status?: string) => {
|
||||
const conditions = [eq(approvals.companyId, companyId)];
|
||||
@@ -41,27 +88,16 @@ export function approvalService(db: Db) {
|
||||
.then((rows) => rows[0]),
|
||||
|
||||
approve: async (id: string, decidedByUserId: string, decisionNote?: string | null) => {
|
||||
const existing = await getExistingApproval(id);
|
||||
if (!canResolveStatuses.has(existing.status)) {
|
||||
throw unprocessable("Only pending or revision requested approvals can be approved");
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const updated = await db
|
||||
.update(approvals)
|
||||
.set({
|
||||
status: "approved",
|
||||
decidedByUserId,
|
||||
decisionNote: decisionNote ?? null,
|
||||
decidedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(approvals.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
const { approval: updated, applied } = await resolveApproval(
|
||||
id,
|
||||
"approved",
|
||||
decidedByUserId,
|
||||
decisionNote,
|
||||
);
|
||||
|
||||
let hireApprovedAgentId: string | null = null;
|
||||
if (updated.type === "hire_agent") {
|
||||
const now = new Date();
|
||||
if (applied && updated.type === "hire_agent") {
|
||||
const payload = updated.payload as Record<string, unknown>;
|
||||
const payloadAgentId = typeof payload.agentId === "string" ? payload.agentId : null;
|
||||
if (payloadAgentId) {
|
||||
@@ -103,30 +139,18 @@ export function approvalService(db: Db) {
|
||||
}
|
||||
}
|
||||
|
||||
return updated;
|
||||
return { approval: updated, applied };
|
||||
},
|
||||
|
||||
reject: async (id: string, decidedByUserId: string, decisionNote?: string | null) => {
|
||||
const existing = await getExistingApproval(id);
|
||||
if (!canResolveStatuses.has(existing.status)) {
|
||||
throw unprocessable("Only pending or revision requested approvals can be rejected");
|
||||
}
|
||||
const { approval: updated, applied } = await resolveApproval(
|
||||
id,
|
||||
"rejected",
|
||||
decidedByUserId,
|
||||
decisionNote,
|
||||
);
|
||||
|
||||
const now = new Date();
|
||||
const updated = await db
|
||||
.update(approvals)
|
||||
.set({
|
||||
status: "rejected",
|
||||
decidedByUserId,
|
||||
decisionNote: decisionNote ?? null,
|
||||
decidedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(approvals.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
if (updated.type === "hire_agent") {
|
||||
if (applied && updated.type === "hire_agent") {
|
||||
const payload = updated.payload as Record<string, unknown>;
|
||||
const payloadAgentId = typeof payload.agentId === "string" ? payload.agentId : null;
|
||||
if (payloadAgentId) {
|
||||
@@ -134,7 +158,7 @@ export function approvalService(db: Db) {
|
||||
}
|
||||
}
|
||||
|
||||
return updated;
|
||||
return { approval: updated, applied };
|
||||
},
|
||||
|
||||
requestRevision: async (id: string, decidedByUserId: string, decisionNote?: string | null) => {
|
||||
|
||||
Reference in New Issue
Block a user