- Add trash icon button to delete triggers (full stack: service, route, API client, UI) - Fix pause/unpause bug where saving routine could revert status by excluding status from the save payload (status is managed via dedicated pause/resume buttons) - Add toast feedback for run, pause, and resume actions - Auto-switch to Runs tab after triggering a manual run - Add live update invalidation for routine/trigger/run activity events Co-Authored-By: Paperclip <noreply@paperclip.ing>
1122 lines
41 KiB
TypeScript
1122 lines
41 KiB
TypeScript
import crypto from "node:crypto";
|
|
import { and, asc, desc, eq, inArray, isNotNull, isNull, lte, ne, or } from "drizzle-orm";
|
|
import type { Db } from "@paperclipai/db";
|
|
import {
|
|
agents,
|
|
companySecrets,
|
|
goals,
|
|
issues,
|
|
projects,
|
|
routineRuns,
|
|
routines,
|
|
routineTriggers,
|
|
} from "@paperclipai/db";
|
|
import type {
|
|
CreateRoutine,
|
|
CreateRoutineTrigger,
|
|
Routine,
|
|
RoutineDetail,
|
|
RoutineListItem,
|
|
RoutineRunSummary,
|
|
RoutineTrigger,
|
|
RoutineTriggerSecretMaterial,
|
|
RunRoutine,
|
|
UpdateRoutine,
|
|
UpdateRoutineTrigger,
|
|
} from "@paperclipai/shared";
|
|
import { conflict, forbidden, notFound, unauthorized, unprocessable } from "../errors.js";
|
|
import { issueService } from "./issues.js";
|
|
import { secretService } from "./secrets.js";
|
|
import { parseCron, validateCron } from "./cron.js";
|
|
|
|
const OPEN_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked"];
|
|
const TERMINAL_ISSUE_STATUSES = new Set(["done", "cancelled"]);
|
|
const MAX_CATCH_UP_RUNS = 25;
|
|
const WEEKDAY_INDEX: Record<string, number> = {
|
|
Sun: 0,
|
|
Mon: 1,
|
|
Tue: 2,
|
|
Wed: 3,
|
|
Thu: 4,
|
|
Fri: 5,
|
|
Sat: 6,
|
|
};
|
|
|
|
type Actor = { agentId?: string | null; userId?: string | null };
|
|
|
|
function assertTimeZone(timeZone: string) {
|
|
try {
|
|
new Intl.DateTimeFormat("en-US", { timeZone }).format(new Date());
|
|
} catch {
|
|
throw unprocessable(`Invalid timezone: ${timeZone}`);
|
|
}
|
|
}
|
|
|
|
function floorToMinute(date: Date) {
|
|
const copy = new Date(date.getTime());
|
|
copy.setUTCSeconds(0, 0);
|
|
return copy;
|
|
}
|
|
|
|
function getZonedMinuteParts(date: Date, timeZone: string) {
|
|
const formatter = new Intl.DateTimeFormat("en-US", {
|
|
timeZone,
|
|
hour12: false,
|
|
year: "numeric",
|
|
month: "numeric",
|
|
day: "numeric",
|
|
hour: "numeric",
|
|
minute: "numeric",
|
|
weekday: "short",
|
|
});
|
|
const parts = formatter.formatToParts(date);
|
|
const map = Object.fromEntries(parts.map((part) => [part.type, part.value]));
|
|
const weekday = WEEKDAY_INDEX[map.weekday ?? ""];
|
|
if (weekday == null) {
|
|
throw new Error(`Unable to resolve weekday for timezone ${timeZone}`);
|
|
}
|
|
return {
|
|
year: Number(map.year),
|
|
month: Number(map.month),
|
|
day: Number(map.day),
|
|
hour: Number(map.hour),
|
|
minute: Number(map.minute),
|
|
weekday,
|
|
};
|
|
}
|
|
|
|
function matchesCronMinute(expression: string, timeZone: string, date: Date) {
|
|
const cron = parseCron(expression);
|
|
const parts = getZonedMinuteParts(date, timeZone);
|
|
return (
|
|
cron.minutes.includes(parts.minute) &&
|
|
cron.hours.includes(parts.hour) &&
|
|
cron.daysOfMonth.includes(parts.day) &&
|
|
cron.months.includes(parts.month) &&
|
|
cron.daysOfWeek.includes(parts.weekday)
|
|
);
|
|
}
|
|
|
|
function nextCronTickInTimeZone(expression: string, timeZone: string, after: Date) {
|
|
const trimmed = expression.trim();
|
|
assertTimeZone(timeZone);
|
|
const error = validateCron(trimmed);
|
|
if (error) {
|
|
throw unprocessable(error);
|
|
}
|
|
|
|
const cursor = floorToMinute(after);
|
|
cursor.setUTCMinutes(cursor.getUTCMinutes() + 1);
|
|
const limit = 366 * 24 * 60 * 5;
|
|
for (let i = 0; i < limit; i += 1) {
|
|
if (matchesCronMinute(trimmed, timeZone, cursor)) {
|
|
return new Date(cursor.getTime());
|
|
}
|
|
cursor.setUTCMinutes(cursor.getUTCMinutes() + 1);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function nextResultText(status: string, issueId?: string | null) {
|
|
if (status === "issue_created" && issueId) return `Created execution issue ${issueId}`;
|
|
if (status === "coalesced") return "Coalesced into an existing active execution issue";
|
|
if (status === "skipped") return "Skipped because an active execution issue already exists";
|
|
if (status === "completed") return "Execution issue completed";
|
|
if (status === "failed") return "Execution failed";
|
|
return status;
|
|
}
|
|
|
|
export function routineService(db: Db) {
|
|
const issueSvc = issueService(db);
|
|
const secretsSvc = secretService(db);
|
|
|
|
async function getRoutineById(id: string) {
|
|
return db
|
|
.select()
|
|
.from(routines)
|
|
.where(eq(routines.id, id))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function getTriggerById(id: string) {
|
|
return db
|
|
.select()
|
|
.from(routineTriggers)
|
|
.where(eq(routineTriggers.id, id))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function assertRoutineAccess(companyId: string, routineId: string) {
|
|
const routine = await getRoutineById(routineId);
|
|
if (!routine) throw notFound("Routine not found");
|
|
if (routine.companyId !== companyId) throw forbidden("Routine must belong to same company");
|
|
return routine;
|
|
}
|
|
|
|
async function assertAssignableAgent(companyId: string, agentId: string) {
|
|
const agent = await db
|
|
.select({ id: agents.id, companyId: agents.companyId, status: agents.status })
|
|
.from(agents)
|
|
.where(eq(agents.id, agentId))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!agent) throw notFound("Assignee agent not found");
|
|
if (agent.companyId !== companyId) throw unprocessable("Assignee must belong to same company");
|
|
if (agent.status === "pending_approval") throw conflict("Cannot assign routines to pending approval agents");
|
|
if (agent.status === "terminated") throw conflict("Cannot assign routines to terminated agents");
|
|
}
|
|
|
|
async function assertProject(companyId: string, projectId: string) {
|
|
const project = await db
|
|
.select({ id: projects.id, companyId: projects.companyId })
|
|
.from(projects)
|
|
.where(eq(projects.id, projectId))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!project) throw notFound("Project not found");
|
|
if (project.companyId !== companyId) throw unprocessable("Project must belong to same company");
|
|
}
|
|
|
|
async function assertGoal(companyId: string, goalId: string) {
|
|
const goal = await db
|
|
.select({ id: goals.id, companyId: goals.companyId })
|
|
.from(goals)
|
|
.where(eq(goals.id, goalId))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!goal) throw notFound("Goal not found");
|
|
if (goal.companyId !== companyId) throw unprocessable("Goal must belong to same company");
|
|
}
|
|
|
|
async function assertParentIssue(companyId: string, issueId: string) {
|
|
const parentIssue = await db
|
|
.select({ id: issues.id, companyId: issues.companyId })
|
|
.from(issues)
|
|
.where(eq(issues.id, issueId))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!parentIssue) throw notFound("Parent issue not found");
|
|
if (parentIssue.companyId !== companyId) throw unprocessable("Parent issue must belong to same company");
|
|
}
|
|
|
|
async function listTriggersForRoutineIds(companyId: string, routineIds: string[]) {
|
|
if (routineIds.length === 0) return new Map<string, RoutineTrigger[]>();
|
|
const rows = await db
|
|
.select()
|
|
.from(routineTriggers)
|
|
.where(and(eq(routineTriggers.companyId, companyId), inArray(routineTriggers.routineId, routineIds)))
|
|
.orderBy(asc(routineTriggers.createdAt), asc(routineTriggers.id));
|
|
const map = new Map<string, RoutineTrigger[]>();
|
|
for (const row of rows) {
|
|
const list = map.get(row.routineId) ?? [];
|
|
list.push(row);
|
|
map.set(row.routineId, list);
|
|
}
|
|
return map;
|
|
}
|
|
|
|
async function listLatestRunByRoutineIds(companyId: string, routineIds: string[]) {
|
|
if (routineIds.length === 0) return new Map<string, RoutineRunSummary>();
|
|
const rows = await db
|
|
.selectDistinctOn([routineRuns.routineId], {
|
|
id: routineRuns.id,
|
|
companyId: routineRuns.companyId,
|
|
routineId: routineRuns.routineId,
|
|
triggerId: routineRuns.triggerId,
|
|
source: routineRuns.source,
|
|
status: routineRuns.status,
|
|
triggeredAt: routineRuns.triggeredAt,
|
|
idempotencyKey: routineRuns.idempotencyKey,
|
|
triggerPayload: routineRuns.triggerPayload,
|
|
linkedIssueId: routineRuns.linkedIssueId,
|
|
coalescedIntoRunId: routineRuns.coalescedIntoRunId,
|
|
failureReason: routineRuns.failureReason,
|
|
completedAt: routineRuns.completedAt,
|
|
createdAt: routineRuns.createdAt,
|
|
updatedAt: routineRuns.updatedAt,
|
|
triggerKind: routineTriggers.kind,
|
|
triggerLabel: routineTriggers.label,
|
|
issueIdentifier: issues.identifier,
|
|
issueTitle: issues.title,
|
|
issueStatus: issues.status,
|
|
issuePriority: issues.priority,
|
|
issueUpdatedAt: issues.updatedAt,
|
|
})
|
|
.from(routineRuns)
|
|
.leftJoin(routineTriggers, eq(routineRuns.triggerId, routineTriggers.id))
|
|
.leftJoin(issues, eq(routineRuns.linkedIssueId, issues.id))
|
|
.where(and(eq(routineRuns.companyId, companyId), inArray(routineRuns.routineId, routineIds)))
|
|
.orderBy(routineRuns.routineId, desc(routineRuns.createdAt), desc(routineRuns.id));
|
|
|
|
const map = new Map<string, RoutineRunSummary>();
|
|
for (const row of rows) {
|
|
map.set(row.routineId, {
|
|
id: row.id,
|
|
companyId: row.companyId,
|
|
routineId: row.routineId,
|
|
triggerId: row.triggerId,
|
|
source: row.source as RoutineRunSummary["source"],
|
|
status: row.status as RoutineRunSummary["status"],
|
|
triggeredAt: row.triggeredAt,
|
|
idempotencyKey: row.idempotencyKey,
|
|
triggerPayload: row.triggerPayload as Record<string, unknown> | null,
|
|
linkedIssueId: row.linkedIssueId,
|
|
coalescedIntoRunId: row.coalescedIntoRunId,
|
|
failureReason: row.failureReason,
|
|
completedAt: row.completedAt,
|
|
createdAt: row.createdAt,
|
|
updatedAt: row.updatedAt,
|
|
linkedIssue: row.linkedIssueId
|
|
? {
|
|
id: row.linkedIssueId,
|
|
identifier: row.issueIdentifier,
|
|
title: row.issueTitle ?? "Routine execution",
|
|
status: row.issueStatus ?? "todo",
|
|
priority: row.issuePriority ?? "medium",
|
|
updatedAt: row.issueUpdatedAt ?? row.updatedAt,
|
|
}
|
|
: null,
|
|
trigger: row.triggerId
|
|
? {
|
|
id: row.triggerId,
|
|
kind: row.triggerKind as NonNullable<RoutineRunSummary["trigger"]>["kind"],
|
|
label: row.triggerLabel,
|
|
}
|
|
: null,
|
|
});
|
|
}
|
|
return map;
|
|
}
|
|
|
|
async function listActiveIssueByRoutineIds(companyId: string, routineIds: string[]) {
|
|
if (routineIds.length === 0) return new Map<string, RoutineListItem["activeIssue"]>();
|
|
const rows = await db
|
|
.selectDistinctOn([issues.originId], {
|
|
originId: issues.originId,
|
|
id: issues.id,
|
|
identifier: issues.identifier,
|
|
title: issues.title,
|
|
status: issues.status,
|
|
priority: issues.priority,
|
|
updatedAt: issues.updatedAt,
|
|
})
|
|
.from(issues)
|
|
.where(
|
|
and(
|
|
eq(issues.companyId, companyId),
|
|
eq(issues.originKind, "routine_execution"),
|
|
inArray(issues.originId, routineIds),
|
|
inArray(issues.status, OPEN_ISSUE_STATUSES),
|
|
isNull(issues.hiddenAt),
|
|
),
|
|
)
|
|
.orderBy(issues.originId, desc(issues.updatedAt), desc(issues.createdAt));
|
|
|
|
const map = new Map<string, RoutineListItem["activeIssue"]>();
|
|
for (const row of rows) {
|
|
if (!row.originId) continue;
|
|
map.set(row.originId, {
|
|
id: row.id,
|
|
identifier: row.identifier,
|
|
title: row.title,
|
|
status: row.status,
|
|
priority: row.priority,
|
|
updatedAt: row.updatedAt,
|
|
});
|
|
}
|
|
return map;
|
|
}
|
|
|
|
async function updateRoutineTouchedState(input: {
|
|
routineId: string;
|
|
triggerId?: string | null;
|
|
triggeredAt: Date;
|
|
status: string;
|
|
issueId?: string | null;
|
|
nextRunAt?: Date | null;
|
|
}) {
|
|
await db
|
|
.update(routines)
|
|
.set({
|
|
lastTriggeredAt: input.triggeredAt,
|
|
lastEnqueuedAt: input.issueId ? input.triggeredAt : undefined,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(routines.id, input.routineId));
|
|
|
|
if (input.triggerId) {
|
|
await db
|
|
.update(routineTriggers)
|
|
.set({
|
|
lastFiredAt: input.triggeredAt,
|
|
lastResult: nextResultText(input.status, input.issueId),
|
|
nextRunAt: input.nextRunAt === undefined ? undefined : input.nextRunAt,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(routineTriggers.id, input.triggerId));
|
|
}
|
|
}
|
|
|
|
async function findOpenExecutionIssue(routine: typeof routines.$inferSelect) {
|
|
return db
|
|
.select()
|
|
.from(issues)
|
|
.where(
|
|
and(
|
|
eq(issues.companyId, routine.companyId),
|
|
eq(issues.originKind, "routine_execution"),
|
|
eq(issues.originId, routine.id),
|
|
inArray(issues.status, OPEN_ISSUE_STATUSES),
|
|
isNull(issues.hiddenAt),
|
|
),
|
|
)
|
|
.orderBy(desc(issues.updatedAt), desc(issues.createdAt))
|
|
.limit(1)
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function finalizeRun(runId: string, patch: Partial<typeof routineRuns.$inferInsert>) {
|
|
return db
|
|
.update(routineRuns)
|
|
.set({
|
|
...patch,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(routineRuns.id, runId))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function createWebhookSecret(
|
|
companyId: string,
|
|
routineId: string,
|
|
actor: Actor,
|
|
) {
|
|
const secretValue = crypto.randomBytes(24).toString("hex");
|
|
const secret = await secretsSvc.create(
|
|
companyId,
|
|
{
|
|
name: `routine-${routineId}-${crypto.randomBytes(6).toString("hex")}`,
|
|
provider: "local_encrypted",
|
|
value: secretValue,
|
|
description: `Webhook auth for routine ${routineId}`,
|
|
},
|
|
actor,
|
|
);
|
|
return { secret, secretValue };
|
|
}
|
|
|
|
async function resolveTriggerSecret(trigger: typeof routineTriggers.$inferSelect, companyId: string) {
|
|
if (!trigger.secretId) throw notFound("Routine trigger secret not found");
|
|
const secret = await db
|
|
.select()
|
|
.from(companySecrets)
|
|
.where(eq(companySecrets.id, trigger.secretId))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!secret || secret.companyId !== companyId) throw notFound("Routine trigger secret not found");
|
|
const value = await secretsSvc.resolveSecretValue(companyId, trigger.secretId, "latest");
|
|
return value;
|
|
}
|
|
|
|
async function dispatchRoutineRun(input: {
|
|
routine: typeof routines.$inferSelect;
|
|
trigger: typeof routineTriggers.$inferSelect | null;
|
|
source: "schedule" | "manual" | "api" | "webhook";
|
|
payload?: Record<string, unknown> | null;
|
|
idempotencyKey?: string | null;
|
|
}) {
|
|
if (input.idempotencyKey) {
|
|
const existing = await db
|
|
.select()
|
|
.from(routineRuns)
|
|
.where(
|
|
and(
|
|
eq(routineRuns.companyId, input.routine.companyId),
|
|
eq(routineRuns.routineId, input.routine.id),
|
|
eq(routineRuns.source, input.source),
|
|
eq(routineRuns.idempotencyKey, input.idempotencyKey),
|
|
input.trigger ? eq(routineRuns.triggerId, input.trigger.id) : isNull(routineRuns.triggerId),
|
|
),
|
|
)
|
|
.orderBy(desc(routineRuns.createdAt))
|
|
.limit(1)
|
|
.then((rows) => rows[0] ?? null);
|
|
if (existing) return existing;
|
|
}
|
|
|
|
const triggeredAt = new Date();
|
|
const [run] = await db
|
|
.insert(routineRuns)
|
|
.values({
|
|
companyId: input.routine.companyId,
|
|
routineId: input.routine.id,
|
|
triggerId: input.trigger?.id ?? null,
|
|
source: input.source,
|
|
status: "received",
|
|
triggeredAt,
|
|
idempotencyKey: input.idempotencyKey ?? null,
|
|
triggerPayload: input.payload ?? null,
|
|
})
|
|
.returning();
|
|
|
|
const nextRunAt = input.trigger?.kind === "schedule" && input.trigger.cronExpression && input.trigger.timezone
|
|
? nextCronTickInTimeZone(input.trigger.cronExpression, input.trigger.timezone, triggeredAt)
|
|
: undefined;
|
|
|
|
try {
|
|
const activeIssue = await findOpenExecutionIssue(input.routine);
|
|
if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") {
|
|
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
|
|
const updated = await finalizeRun(run.id, {
|
|
status,
|
|
linkedIssueId: activeIssue.id,
|
|
coalescedIntoRunId: activeIssue.originRunId,
|
|
completedAt: triggeredAt,
|
|
});
|
|
await updateRoutineTouchedState({
|
|
routineId: input.routine.id,
|
|
triggerId: input.trigger?.id ?? null,
|
|
triggeredAt,
|
|
status,
|
|
issueId: activeIssue.id,
|
|
nextRunAt,
|
|
});
|
|
return updated ?? run;
|
|
}
|
|
|
|
let createdIssue;
|
|
try {
|
|
createdIssue = await issueSvc.create(input.routine.companyId, {
|
|
projectId: input.routine.projectId,
|
|
goalId: input.routine.goalId,
|
|
parentId: input.routine.parentIssueId,
|
|
title: input.routine.title,
|
|
description: input.routine.description,
|
|
status: "todo",
|
|
priority: input.routine.priority,
|
|
assigneeAgentId: input.routine.assigneeAgentId,
|
|
originKind: "routine_execution",
|
|
originId: input.routine.id,
|
|
originRunId: run.id,
|
|
});
|
|
} catch (error) {
|
|
const isOpenExecutionConflict =
|
|
!!error &&
|
|
typeof error === "object" &&
|
|
"code" in error &&
|
|
(error as { code?: string }).code === "23505" &&
|
|
"constraint" in error &&
|
|
(error as { constraint?: string }).constraint === "issues_open_routine_execution_uq";
|
|
if (!isOpenExecutionConflict || input.routine.concurrencyPolicy === "always_enqueue") {
|
|
throw error;
|
|
}
|
|
|
|
const existingIssue = await findOpenExecutionIssue(input.routine);
|
|
if (!existingIssue) throw error;
|
|
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
|
|
const updated = await finalizeRun(run.id, {
|
|
status,
|
|
linkedIssueId: existingIssue.id,
|
|
coalescedIntoRunId: existingIssue.originRunId,
|
|
completedAt: triggeredAt,
|
|
});
|
|
await updateRoutineTouchedState({
|
|
routineId: input.routine.id,
|
|
triggerId: input.trigger?.id ?? null,
|
|
triggeredAt,
|
|
status,
|
|
issueId: existingIssue.id,
|
|
nextRunAt,
|
|
});
|
|
return updated ?? run;
|
|
}
|
|
|
|
const updated = await finalizeRun(run.id, {
|
|
status: "issue_created",
|
|
linkedIssueId: createdIssue.id,
|
|
});
|
|
await updateRoutineTouchedState({
|
|
routineId: input.routine.id,
|
|
triggerId: input.trigger?.id ?? null,
|
|
triggeredAt,
|
|
status: "issue_created",
|
|
issueId: createdIssue.id,
|
|
nextRunAt,
|
|
});
|
|
return updated ?? run;
|
|
} catch (error) {
|
|
const failureReason = error instanceof Error ? error.message : String(error);
|
|
const failed = await finalizeRun(run.id, {
|
|
status: "failed",
|
|
failureReason,
|
|
completedAt: new Date(),
|
|
});
|
|
await updateRoutineTouchedState({
|
|
routineId: input.routine.id,
|
|
triggerId: input.trigger?.id ?? null,
|
|
triggeredAt,
|
|
status: "failed",
|
|
nextRunAt,
|
|
});
|
|
return failed ?? run;
|
|
}
|
|
}
|
|
|
|
return {
|
|
get: getRoutineById,
|
|
getTrigger: getTriggerById,
|
|
|
|
list: async (companyId: string): Promise<RoutineListItem[]> => {
|
|
const rows = await db
|
|
.select()
|
|
.from(routines)
|
|
.where(eq(routines.companyId, companyId))
|
|
.orderBy(desc(routines.updatedAt), asc(routines.title));
|
|
const routineIds = rows.map((row) => row.id);
|
|
const [triggersByRoutine, latestRunByRoutine, activeIssueByRoutine] = await Promise.all([
|
|
listTriggersForRoutineIds(companyId, routineIds),
|
|
listLatestRunByRoutineIds(companyId, routineIds),
|
|
listActiveIssueByRoutineIds(companyId, routineIds),
|
|
]);
|
|
return rows.map((row) => ({
|
|
...row,
|
|
triggers: (triggersByRoutine.get(row.id) ?? []).map((trigger) => ({
|
|
id: trigger.id,
|
|
kind: trigger.kind as RoutineListItem["triggers"][number]["kind"],
|
|
label: trigger.label,
|
|
enabled: trigger.enabled,
|
|
nextRunAt: trigger.nextRunAt,
|
|
lastFiredAt: trigger.lastFiredAt,
|
|
lastResult: trigger.lastResult,
|
|
})),
|
|
lastRun: latestRunByRoutine.get(row.id) ?? null,
|
|
activeIssue: activeIssueByRoutine.get(row.id) ?? null,
|
|
}));
|
|
},
|
|
|
|
getDetail: async (id: string): Promise<RoutineDetail | null> => {
|
|
const row = await getRoutineById(id);
|
|
if (!row) return null;
|
|
const [project, assignee, parentIssue, triggers, recentRuns, activeIssue] = await Promise.all([
|
|
db.select().from(projects).where(eq(projects.id, row.projectId)).then((rows) => rows[0] ?? null),
|
|
db.select().from(agents).where(eq(agents.id, row.assigneeAgentId)).then((rows) => rows[0] ?? null),
|
|
row.parentIssueId ? issueSvc.getById(row.parentIssueId) : null,
|
|
db.select().from(routineTriggers).where(eq(routineTriggers.routineId, row.id)).orderBy(asc(routineTriggers.createdAt)),
|
|
db
|
|
.select({
|
|
id: routineRuns.id,
|
|
companyId: routineRuns.companyId,
|
|
routineId: routineRuns.routineId,
|
|
triggerId: routineRuns.triggerId,
|
|
source: routineRuns.source,
|
|
status: routineRuns.status,
|
|
triggeredAt: routineRuns.triggeredAt,
|
|
idempotencyKey: routineRuns.idempotencyKey,
|
|
triggerPayload: routineRuns.triggerPayload,
|
|
linkedIssueId: routineRuns.linkedIssueId,
|
|
coalescedIntoRunId: routineRuns.coalescedIntoRunId,
|
|
failureReason: routineRuns.failureReason,
|
|
completedAt: routineRuns.completedAt,
|
|
createdAt: routineRuns.createdAt,
|
|
updatedAt: routineRuns.updatedAt,
|
|
triggerKind: routineTriggers.kind,
|
|
triggerLabel: routineTriggers.label,
|
|
issueIdentifier: issues.identifier,
|
|
issueTitle: issues.title,
|
|
issueStatus: issues.status,
|
|
issuePriority: issues.priority,
|
|
issueUpdatedAt: issues.updatedAt,
|
|
})
|
|
.from(routineRuns)
|
|
.leftJoin(routineTriggers, eq(routineRuns.triggerId, routineTriggers.id))
|
|
.leftJoin(issues, eq(routineRuns.linkedIssueId, issues.id))
|
|
.where(eq(routineRuns.routineId, row.id))
|
|
.orderBy(desc(routineRuns.createdAt))
|
|
.limit(25)
|
|
.then((runs) =>
|
|
runs.map((run) => ({
|
|
id: run.id,
|
|
companyId: run.companyId,
|
|
routineId: run.routineId,
|
|
triggerId: run.triggerId,
|
|
source: run.source as RoutineRunSummary["source"],
|
|
status: run.status as RoutineRunSummary["status"],
|
|
triggeredAt: run.triggeredAt,
|
|
idempotencyKey: run.idempotencyKey,
|
|
triggerPayload: run.triggerPayload as Record<string, unknown> | null,
|
|
linkedIssueId: run.linkedIssueId,
|
|
coalescedIntoRunId: run.coalescedIntoRunId,
|
|
failureReason: run.failureReason,
|
|
completedAt: run.completedAt,
|
|
createdAt: run.createdAt,
|
|
updatedAt: run.updatedAt,
|
|
linkedIssue: run.linkedIssueId
|
|
? {
|
|
id: run.linkedIssueId,
|
|
identifier: run.issueIdentifier,
|
|
title: run.issueTitle ?? "Routine execution",
|
|
status: run.issueStatus ?? "todo",
|
|
priority: run.issuePriority ?? "medium",
|
|
updatedAt: run.issueUpdatedAt ?? run.updatedAt,
|
|
}
|
|
: null,
|
|
trigger: run.triggerId
|
|
? {
|
|
id: run.triggerId,
|
|
kind: run.triggerKind as NonNullable<RoutineRunSummary["trigger"]>["kind"],
|
|
label: run.triggerLabel,
|
|
}
|
|
: null,
|
|
})),
|
|
),
|
|
findOpenExecutionIssue(row),
|
|
]);
|
|
|
|
return {
|
|
...row,
|
|
project,
|
|
assignee,
|
|
parentIssue,
|
|
triggers: triggers as RoutineTrigger[],
|
|
recentRuns,
|
|
activeIssue,
|
|
};
|
|
},
|
|
|
|
create: async (companyId: string, input: CreateRoutine, actor: Actor): Promise<Routine> => {
|
|
await assertProject(companyId, input.projectId);
|
|
await assertAssignableAgent(companyId, input.assigneeAgentId);
|
|
if (input.goalId) await assertGoal(companyId, input.goalId);
|
|
if (input.parentIssueId) await assertParentIssue(companyId, input.parentIssueId);
|
|
const [created] = await db
|
|
.insert(routines)
|
|
.values({
|
|
companyId,
|
|
projectId: input.projectId,
|
|
goalId: input.goalId ?? null,
|
|
parentIssueId: input.parentIssueId ?? null,
|
|
title: input.title,
|
|
description: input.description ?? null,
|
|
assigneeAgentId: input.assigneeAgentId,
|
|
priority: input.priority,
|
|
status: input.status,
|
|
concurrencyPolicy: input.concurrencyPolicy,
|
|
catchUpPolicy: input.catchUpPolicy,
|
|
createdByAgentId: actor.agentId ?? null,
|
|
createdByUserId: actor.userId ?? null,
|
|
updatedByAgentId: actor.agentId ?? null,
|
|
updatedByUserId: actor.userId ?? null,
|
|
})
|
|
.returning();
|
|
return created;
|
|
},
|
|
|
|
update: async (id: string, patch: UpdateRoutine, actor: Actor): Promise<Routine | null> => {
|
|
const existing = await getRoutineById(id);
|
|
if (!existing) return null;
|
|
const nextProjectId = patch.projectId ?? existing.projectId;
|
|
const nextAssigneeAgentId = patch.assigneeAgentId ?? existing.assigneeAgentId;
|
|
if (patch.projectId) await assertProject(existing.companyId, nextProjectId);
|
|
if (patch.assigneeAgentId) await assertAssignableAgent(existing.companyId, nextAssigneeAgentId);
|
|
if (patch.goalId) await assertGoal(existing.companyId, patch.goalId);
|
|
if (patch.parentIssueId) await assertParentIssue(existing.companyId, patch.parentIssueId);
|
|
const [updated] = await db
|
|
.update(routines)
|
|
.set({
|
|
projectId: nextProjectId,
|
|
goalId: patch.goalId === undefined ? existing.goalId : patch.goalId,
|
|
parentIssueId: patch.parentIssueId === undefined ? existing.parentIssueId : patch.parentIssueId,
|
|
title: patch.title ?? existing.title,
|
|
description: patch.description === undefined ? existing.description : patch.description,
|
|
assigneeAgentId: nextAssigneeAgentId,
|
|
priority: patch.priority ?? existing.priority,
|
|
status: patch.status ?? existing.status,
|
|
concurrencyPolicy: patch.concurrencyPolicy ?? existing.concurrencyPolicy,
|
|
catchUpPolicy: patch.catchUpPolicy ?? existing.catchUpPolicy,
|
|
updatedByAgentId: actor.agentId ?? null,
|
|
updatedByUserId: actor.userId ?? null,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(routines.id, id))
|
|
.returning();
|
|
return updated ?? null;
|
|
},
|
|
|
|
createTrigger: async (
|
|
routineId: string,
|
|
input: CreateRoutineTrigger,
|
|
actor: Actor,
|
|
): Promise<{ trigger: RoutineTrigger; secretMaterial: RoutineTriggerSecretMaterial | null }> => {
|
|
const routine = await getRoutineById(routineId);
|
|
if (!routine) throw notFound("Routine not found");
|
|
|
|
let secretMaterial: RoutineTriggerSecretMaterial | null = null;
|
|
let secretId: string | null = null;
|
|
let publicId: string | null = null;
|
|
let nextRunAt: Date | null = null;
|
|
|
|
if (input.kind === "schedule") {
|
|
const timeZone = input.timezone || "UTC";
|
|
assertTimeZone(timeZone);
|
|
const error = validateCron(input.cronExpression);
|
|
if (error) throw unprocessable(error);
|
|
nextRunAt = nextCronTickInTimeZone(input.cronExpression, timeZone, new Date());
|
|
}
|
|
|
|
if (input.kind === "webhook") {
|
|
publicId = crypto.randomBytes(12).toString("hex");
|
|
const created = await createWebhookSecret(routine.companyId, routine.id, actor);
|
|
secretId = created.secret.id;
|
|
secretMaterial = {
|
|
webhookUrl: `${process.env.PAPERCLIP_API_URL}/api/routine-triggers/public/${publicId}/fire`,
|
|
webhookSecret: created.secretValue,
|
|
};
|
|
}
|
|
|
|
const [trigger] = await db
|
|
.insert(routineTriggers)
|
|
.values({
|
|
companyId: routine.companyId,
|
|
routineId: routine.id,
|
|
kind: input.kind,
|
|
label: input.label ?? null,
|
|
enabled: input.enabled ?? true,
|
|
cronExpression: input.kind === "schedule" ? input.cronExpression : null,
|
|
timezone: input.kind === "schedule" ? (input.timezone || "UTC") : null,
|
|
nextRunAt,
|
|
publicId,
|
|
secretId,
|
|
signingMode: input.kind === "webhook" ? input.signingMode : null,
|
|
replayWindowSec: input.kind === "webhook" ? input.replayWindowSec : null,
|
|
lastRotatedAt: input.kind === "webhook" ? new Date() : null,
|
|
createdByAgentId: actor.agentId ?? null,
|
|
createdByUserId: actor.userId ?? null,
|
|
updatedByAgentId: actor.agentId ?? null,
|
|
updatedByUserId: actor.userId ?? null,
|
|
})
|
|
.returning();
|
|
|
|
return {
|
|
trigger: trigger as RoutineTrigger,
|
|
secretMaterial,
|
|
};
|
|
},
|
|
|
|
updateTrigger: async (id: string, patch: UpdateRoutineTrigger, actor: Actor): Promise<RoutineTrigger | null> => {
|
|
const existing = await getTriggerById(id);
|
|
if (!existing) return null;
|
|
|
|
let nextRunAt = existing.nextRunAt;
|
|
let cronExpression = existing.cronExpression;
|
|
let timezone = existing.timezone;
|
|
|
|
if (existing.kind === "schedule") {
|
|
if (patch.cronExpression !== undefined) {
|
|
if (patch.cronExpression == null) throw unprocessable("Scheduled triggers require cronExpression");
|
|
const error = validateCron(patch.cronExpression);
|
|
if (error) throw unprocessable(error);
|
|
cronExpression = patch.cronExpression;
|
|
}
|
|
if (patch.timezone !== undefined) {
|
|
if (patch.timezone == null) throw unprocessable("Scheduled triggers require timezone");
|
|
assertTimeZone(patch.timezone);
|
|
timezone = patch.timezone;
|
|
}
|
|
if (cronExpression && timezone) {
|
|
nextRunAt = nextCronTickInTimeZone(cronExpression, timezone, new Date());
|
|
}
|
|
}
|
|
|
|
const [updated] = await db
|
|
.update(routineTriggers)
|
|
.set({
|
|
label: patch.label === undefined ? existing.label : patch.label,
|
|
enabled: patch.enabled ?? existing.enabled,
|
|
cronExpression,
|
|
timezone,
|
|
nextRunAt,
|
|
signingMode: patch.signingMode === undefined ? existing.signingMode : patch.signingMode,
|
|
replayWindowSec: patch.replayWindowSec === undefined ? existing.replayWindowSec : patch.replayWindowSec,
|
|
updatedByAgentId: actor.agentId ?? null,
|
|
updatedByUserId: actor.userId ?? null,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(routineTriggers.id, id))
|
|
.returning();
|
|
|
|
return (updated as RoutineTrigger | undefined) ?? null;
|
|
},
|
|
|
|
deleteTrigger: async (id: string): Promise<boolean> => {
|
|
const existing = await getTriggerById(id);
|
|
if (!existing) return false;
|
|
await db.delete(routineTriggers).where(eq(routineTriggers.id, id));
|
|
return true;
|
|
},
|
|
|
|
rotateTriggerSecret: async (
|
|
id: string,
|
|
actor: Actor,
|
|
): Promise<{ trigger: RoutineTrigger; secretMaterial: RoutineTriggerSecretMaterial }> => {
|
|
const existing = await getTriggerById(id);
|
|
if (!existing) throw notFound("Routine trigger not found");
|
|
if (existing.kind !== "webhook" || !existing.publicId || !existing.secretId) {
|
|
throw unprocessable("Only webhook triggers can rotate secrets");
|
|
}
|
|
|
|
const secretValue = crypto.randomBytes(24).toString("hex");
|
|
await secretsSvc.rotate(existing.secretId, { value: secretValue }, actor);
|
|
const [updated] = await db
|
|
.update(routineTriggers)
|
|
.set({
|
|
lastRotatedAt: new Date(),
|
|
updatedByAgentId: actor.agentId ?? null,
|
|
updatedByUserId: actor.userId ?? null,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(routineTriggers.id, id))
|
|
.returning();
|
|
|
|
return {
|
|
trigger: updated as RoutineTrigger,
|
|
secretMaterial: {
|
|
webhookUrl: `${process.env.PAPERCLIP_API_URL}/api/routine-triggers/public/${existing.publicId}/fire`,
|
|
webhookSecret: secretValue,
|
|
},
|
|
};
|
|
},
|
|
|
|
runRoutine: async (id: string, input: RunRoutine) => {
|
|
const routine = await getRoutineById(id);
|
|
if (!routine) throw notFound("Routine not found");
|
|
if (routine.status !== "active") throw conflict("Routine is not active");
|
|
const trigger = input.triggerId ? await getTriggerById(input.triggerId) : null;
|
|
if (trigger && trigger.routineId !== routine.id) throw forbidden("Trigger does not belong to routine");
|
|
if (trigger && !trigger.enabled) throw conflict("Routine trigger is not active");
|
|
return dispatchRoutineRun({
|
|
routine,
|
|
trigger,
|
|
source: input.source,
|
|
payload: input.payload as Record<string, unknown> | null | undefined,
|
|
idempotencyKey: input.idempotencyKey,
|
|
});
|
|
},
|
|
|
|
firePublicTrigger: async (publicId: string, input: {
|
|
authorizationHeader?: string | null;
|
|
signatureHeader?: string | null;
|
|
timestampHeader?: string | null;
|
|
idempotencyKey?: string | null;
|
|
rawBody?: Buffer | null;
|
|
payload?: Record<string, unknown> | null;
|
|
}) => {
|
|
const trigger = await db
|
|
.select()
|
|
.from(routineTriggers)
|
|
.where(and(eq(routineTriggers.publicId, publicId), eq(routineTriggers.kind, "webhook")))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!trigger) throw notFound("Routine trigger not found");
|
|
const routine = await getRoutineById(trigger.routineId);
|
|
if (!routine) throw notFound("Routine not found");
|
|
if (!trigger.enabled || routine.status !== "active") throw conflict("Routine trigger is not active");
|
|
|
|
const secretValue = await resolveTriggerSecret(trigger, routine.companyId);
|
|
if (trigger.signingMode === "bearer") {
|
|
const expected = `Bearer ${secretValue}`;
|
|
if (!input.authorizationHeader || input.authorizationHeader.trim() !== expected) {
|
|
throw unauthorized();
|
|
}
|
|
} else {
|
|
const rawBody = input.rawBody ?? Buffer.from(JSON.stringify(input.payload ?? {}));
|
|
const providedSignature = input.signatureHeader?.trim() ?? "";
|
|
const providedTimestamp = input.timestampHeader?.trim() ?? "";
|
|
if (!providedSignature || !providedTimestamp) throw unauthorized();
|
|
const tsMillis = Number(providedTimestamp);
|
|
if (!Number.isFinite(tsMillis)) throw unauthorized();
|
|
const replayWindowSec = trigger.replayWindowSec ?? 300;
|
|
if (Math.abs(Date.now() - tsMillis) > replayWindowSec * 1000) {
|
|
throw unauthorized();
|
|
}
|
|
const expectedHmac = crypto
|
|
.createHmac("sha256", secretValue)
|
|
.update(`${providedTimestamp}.`)
|
|
.update(rawBody)
|
|
.digest("hex");
|
|
const normalizedSignature = providedSignature.replace(/^sha256=/, "");
|
|
const valid =
|
|
normalizedSignature.length === expectedHmac.length &&
|
|
crypto.timingSafeEqual(Buffer.from(normalizedSignature), Buffer.from(expectedHmac));
|
|
if (!valid) throw unauthorized();
|
|
}
|
|
|
|
return dispatchRoutineRun({
|
|
routine,
|
|
trigger,
|
|
source: "webhook",
|
|
payload: input.payload,
|
|
idempotencyKey: input.idempotencyKey,
|
|
});
|
|
},
|
|
|
|
listRuns: async (routineId: string, limit = 50): Promise<RoutineRunSummary[]> => {
|
|
const cappedLimit = Math.max(1, Math.min(limit, 200));
|
|
const rows = await db
|
|
.select({
|
|
id: routineRuns.id,
|
|
companyId: routineRuns.companyId,
|
|
routineId: routineRuns.routineId,
|
|
triggerId: routineRuns.triggerId,
|
|
source: routineRuns.source,
|
|
status: routineRuns.status,
|
|
triggeredAt: routineRuns.triggeredAt,
|
|
idempotencyKey: routineRuns.idempotencyKey,
|
|
triggerPayload: routineRuns.triggerPayload,
|
|
linkedIssueId: routineRuns.linkedIssueId,
|
|
coalescedIntoRunId: routineRuns.coalescedIntoRunId,
|
|
failureReason: routineRuns.failureReason,
|
|
completedAt: routineRuns.completedAt,
|
|
createdAt: routineRuns.createdAt,
|
|
updatedAt: routineRuns.updatedAt,
|
|
triggerKind: routineTriggers.kind,
|
|
triggerLabel: routineTriggers.label,
|
|
issueIdentifier: issues.identifier,
|
|
issueTitle: issues.title,
|
|
issueStatus: issues.status,
|
|
issuePriority: issues.priority,
|
|
issueUpdatedAt: issues.updatedAt,
|
|
})
|
|
.from(routineRuns)
|
|
.leftJoin(routineTriggers, eq(routineRuns.triggerId, routineTriggers.id))
|
|
.leftJoin(issues, eq(routineRuns.linkedIssueId, issues.id))
|
|
.where(eq(routineRuns.routineId, routineId))
|
|
.orderBy(desc(routineRuns.createdAt))
|
|
.limit(cappedLimit);
|
|
|
|
return rows.map((row) => ({
|
|
id: row.id,
|
|
companyId: row.companyId,
|
|
routineId: row.routineId,
|
|
triggerId: row.triggerId,
|
|
source: row.source as RoutineRunSummary["source"],
|
|
status: row.status as RoutineRunSummary["status"],
|
|
triggeredAt: row.triggeredAt,
|
|
idempotencyKey: row.idempotencyKey,
|
|
triggerPayload: row.triggerPayload as Record<string, unknown> | null,
|
|
linkedIssueId: row.linkedIssueId,
|
|
coalescedIntoRunId: row.coalescedIntoRunId,
|
|
failureReason: row.failureReason,
|
|
completedAt: row.completedAt,
|
|
createdAt: row.createdAt,
|
|
updatedAt: row.updatedAt,
|
|
linkedIssue: row.linkedIssueId
|
|
? {
|
|
id: row.linkedIssueId,
|
|
identifier: row.issueIdentifier,
|
|
title: row.issueTitle ?? "Routine execution",
|
|
status: row.issueStatus ?? "todo",
|
|
priority: row.issuePriority ?? "medium",
|
|
updatedAt: row.issueUpdatedAt ?? row.updatedAt,
|
|
}
|
|
: null,
|
|
trigger: row.triggerId
|
|
? {
|
|
id: row.triggerId,
|
|
kind: row.triggerKind as NonNullable<RoutineRunSummary["trigger"]>["kind"],
|
|
label: row.triggerLabel,
|
|
}
|
|
: null,
|
|
}));
|
|
},
|
|
|
|
tickScheduledTriggers: async (now: Date = new Date()) => {
|
|
const due = await db
|
|
.select({
|
|
trigger: routineTriggers,
|
|
routine: routines,
|
|
})
|
|
.from(routineTriggers)
|
|
.innerJoin(routines, eq(routineTriggers.routineId, routines.id))
|
|
.where(
|
|
and(
|
|
eq(routineTriggers.kind, "schedule"),
|
|
eq(routineTriggers.enabled, true),
|
|
eq(routines.status, "active"),
|
|
isNotNull(routineTriggers.nextRunAt),
|
|
lte(routineTriggers.nextRunAt, now),
|
|
),
|
|
)
|
|
.orderBy(asc(routineTriggers.nextRunAt), asc(routineTriggers.createdAt));
|
|
|
|
let triggered = 0;
|
|
for (const row of due) {
|
|
if (!row.trigger.nextRunAt || !row.trigger.cronExpression || !row.trigger.timezone) continue;
|
|
|
|
let runCount = 1;
|
|
let claimedNextRunAt = nextCronTickInTimeZone(row.trigger.cronExpression, row.trigger.timezone, now);
|
|
|
|
if (row.routine.catchUpPolicy === "enqueue_missed_with_cap") {
|
|
let cursor: Date | null = row.trigger.nextRunAt;
|
|
runCount = 0;
|
|
while (cursor && cursor <= now && runCount < MAX_CATCH_UP_RUNS) {
|
|
runCount += 1;
|
|
claimedNextRunAt = nextCronTickInTimeZone(row.trigger.cronExpression, row.trigger.timezone, cursor);
|
|
cursor = claimedNextRunAt;
|
|
}
|
|
}
|
|
|
|
const claimed = await db
|
|
.update(routineTriggers)
|
|
.set({
|
|
nextRunAt: claimedNextRunAt,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(
|
|
and(
|
|
eq(routineTriggers.id, row.trigger.id),
|
|
eq(routineTriggers.enabled, true),
|
|
eq(routineTriggers.nextRunAt, row.trigger.nextRunAt),
|
|
),
|
|
)
|
|
.returning({ id: routineTriggers.id })
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!claimed) continue;
|
|
|
|
for (let i = 0; i < runCount; i += 1) {
|
|
await dispatchRoutineRun({
|
|
routine: row.routine,
|
|
trigger: row.trigger,
|
|
source: "schedule",
|
|
});
|
|
triggered += 1;
|
|
}
|
|
}
|
|
|
|
return { triggered };
|
|
},
|
|
|
|
syncRunStatusForIssue: async (issueId: string) => {
|
|
const issue = await db
|
|
.select({
|
|
id: issues.id,
|
|
status: issues.status,
|
|
originKind: issues.originKind,
|
|
originRunId: issues.originRunId,
|
|
})
|
|
.from(issues)
|
|
.where(eq(issues.id, issueId))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!issue || issue.originKind !== "routine_execution" || !issue.originRunId) return null;
|
|
if (issue.status === "done") {
|
|
return finalizeRun(issue.originRunId, {
|
|
status: "completed",
|
|
completedAt: new Date(),
|
|
});
|
|
}
|
|
if (issue.status === "blocked" || issue.status === "cancelled") {
|
|
return finalizeRun(issue.originRunId, {
|
|
status: "failed",
|
|
failureReason: `Execution issue moved to ${issue.status}`,
|
|
completedAt: new Date(),
|
|
});
|
|
}
|
|
return null;
|
|
},
|
|
};
|
|
}
|