Fix routine coalescing for idle execution issues
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
import crypto from "node:crypto";
|
||||
import { and, asc, desc, eq, inArray, isNotNull, isNull, lte, ne, or } from "drizzle-orm";
|
||||
import { and, asc, desc, eq, inArray, isNotNull, isNull, lte, ne, or, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
agents,
|
||||
companySecrets,
|
||||
goals,
|
||||
heartbeatRuns,
|
||||
issues,
|
||||
projects,
|
||||
routineRuns,
|
||||
@@ -30,6 +31,7 @@ import { secretService } from "./secrets.js";
|
||||
import { parseCron, validateCron } from "./cron.js";
|
||||
|
||||
const OPEN_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked"];
|
||||
const LIVE_HEARTBEAT_RUN_STATUSES = ["queued", "running"];
|
||||
const TERMINAL_ISSUE_STATUSES = new Set(["done", "cancelled"]);
|
||||
const MAX_CATCH_UP_RUNS = 25;
|
||||
const WEEKDAY_INDEX: Record<string, number> = {
|
||||
@@ -119,8 +121,8 @@ function nextCronTickInTimeZone(expression: string, timeZone: string, after: Dat
|
||||
|
||||
function nextResultText(status: string, issueId?: string | null) {
|
||||
if (status === "issue_created" && issueId) return `Created execution issue ${issueId}`;
|
||||
if (status === "coalesced") return "Coalesced into an existing active execution issue";
|
||||
if (status === "skipped") return "Skipped because an active execution issue already exists";
|
||||
if (status === "coalesced") return "Coalesced into an existing live execution issue";
|
||||
if (status === "skipped") return "Skipped because a live execution issue already exists";
|
||||
if (status === "completed") return "Execution issue completed";
|
||||
if (status === "failed") return "Execution failed";
|
||||
return status;
|
||||
@@ -284,9 +286,9 @@ export function routineService(db: Db) {
|
||||
return map;
|
||||
}
|
||||
|
||||
async function listActiveIssueByRoutineIds(companyId: string, routineIds: string[]) {
|
||||
async function listLiveIssueByRoutineIds(companyId: string, routineIds: string[]) {
|
||||
if (routineIds.length === 0) return new Map<string, RoutineListItem["activeIssue"]>();
|
||||
const rows = await db
|
||||
const executionBoundRows = await db
|
||||
.selectDistinctOn([issues.originId], {
|
||||
originId: issues.originId,
|
||||
id: issues.id,
|
||||
@@ -297,6 +299,13 @@ export function routineService(db: Db) {
|
||||
updatedAt: issues.updatedAt,
|
||||
})
|
||||
.from(issues)
|
||||
.innerJoin(
|
||||
heartbeatRuns,
|
||||
and(
|
||||
eq(heartbeatRuns.id, issues.executionRunId),
|
||||
inArray(heartbeatRuns.status, LIVE_HEARTBEAT_RUN_STATUSES),
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(issues.companyId, companyId),
|
||||
@@ -308,8 +317,52 @@ export function routineService(db: Db) {
|
||||
)
|
||||
.orderBy(issues.originId, desc(issues.updatedAt), desc(issues.createdAt));
|
||||
|
||||
const rowsByOriginId = new Map<string, (typeof executionBoundRows)[number]>();
|
||||
for (const row of executionBoundRows) {
|
||||
if (!row.originId) continue;
|
||||
rowsByOriginId.set(row.originId, row);
|
||||
}
|
||||
|
||||
const missingRoutineIds = routineIds.filter((routineId) => !rowsByOriginId.has(routineId));
|
||||
if (missingRoutineIds.length > 0) {
|
||||
const legacyRows = await db
|
||||
.selectDistinctOn([issues.originId], {
|
||||
originId: issues.originId,
|
||||
id: issues.id,
|
||||
identifier: issues.identifier,
|
||||
title: issues.title,
|
||||
status: issues.status,
|
||||
priority: issues.priority,
|
||||
updatedAt: issues.updatedAt,
|
||||
})
|
||||
.from(issues)
|
||||
.innerJoin(
|
||||
heartbeatRuns,
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, issues.companyId),
|
||||
inArray(heartbeatRuns.status, LIVE_HEARTBEAT_RUN_STATUSES),
|
||||
sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = cast(${issues.id} as text)`,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(issues.companyId, companyId),
|
||||
eq(issues.originKind, "routine_execution"),
|
||||
inArray(issues.originId, missingRoutineIds),
|
||||
inArray(issues.status, OPEN_ISSUE_STATUSES),
|
||||
isNull(issues.hiddenAt),
|
||||
),
|
||||
)
|
||||
.orderBy(issues.originId, desc(issues.updatedAt), desc(issues.createdAt));
|
||||
|
||||
for (const row of legacyRows) {
|
||||
if (!row.originId) continue;
|
||||
rowsByOriginId.set(row.originId, row);
|
||||
}
|
||||
}
|
||||
|
||||
const map = new Map<string, RoutineListItem["activeIssue"]>();
|
||||
for (const row of rows) {
|
||||
for (const row of rowsByOriginId.values()) {
|
||||
if (!row.originId) continue;
|
||||
map.set(row.originId, {
|
||||
id: row.id,
|
||||
@@ -353,10 +406,17 @@ export function routineService(db: Db) {
|
||||
}
|
||||
}
|
||||
|
||||
async function findOpenExecutionIssue(routine: typeof routines.$inferSelect) {
|
||||
return db
|
||||
async function findLiveExecutionIssue(routine: typeof routines.$inferSelect) {
|
||||
const executionBoundIssue = await db
|
||||
.select()
|
||||
.from(issues)
|
||||
.innerJoin(
|
||||
heartbeatRuns,
|
||||
and(
|
||||
eq(heartbeatRuns.id, issues.executionRunId),
|
||||
inArray(heartbeatRuns.status, LIVE_HEARTBEAT_RUN_STATUSES),
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(issues.companyId, routine.companyId),
|
||||
@@ -368,7 +428,32 @@ export function routineService(db: Db) {
|
||||
)
|
||||
.orderBy(desc(issues.updatedAt), desc(issues.createdAt))
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
.then((rows) => rows[0]?.issues ?? null);
|
||||
if (executionBoundIssue) return executionBoundIssue;
|
||||
|
||||
return db
|
||||
.select()
|
||||
.from(issues)
|
||||
.innerJoin(
|
||||
heartbeatRuns,
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, issues.companyId),
|
||||
inArray(heartbeatRuns.status, LIVE_HEARTBEAT_RUN_STATUSES),
|
||||
sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = cast(${issues.id} as text)`,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(issues.companyId, routine.companyId),
|
||||
eq(issues.originKind, "routine_execution"),
|
||||
eq(issues.originId, routine.id),
|
||||
inArray(issues.status, OPEN_ISSUE_STATUSES),
|
||||
isNull(issues.hiddenAt),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(issues.updatedAt), desc(issues.createdAt))
|
||||
.limit(1)
|
||||
.then((rows) => rows[0]?.issues ?? null);
|
||||
}
|
||||
|
||||
async function finalizeRun(runId: string, patch: Partial<typeof routineRuns.$inferInsert>) {
|
||||
@@ -460,7 +545,7 @@ export function routineService(db: Db) {
|
||||
: undefined;
|
||||
|
||||
try {
|
||||
const activeIssue = await findOpenExecutionIssue(input.routine);
|
||||
const activeIssue = await findLiveExecutionIssue(input.routine);
|
||||
if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") {
|
||||
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
|
||||
const updated = await finalizeRun(run.id, {
|
||||
@@ -507,7 +592,7 @@ export function routineService(db: Db) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const existingIssue = await findOpenExecutionIssue(input.routine);
|
||||
const existingIssue = await findLiveExecutionIssue(input.routine);
|
||||
if (!existingIssue) throw error;
|
||||
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
|
||||
const updated = await finalizeRun(run.id, {
|
||||
@@ -572,7 +657,7 @@ export function routineService(db: Db) {
|
||||
const [triggersByRoutine, latestRunByRoutine, activeIssueByRoutine] = await Promise.all([
|
||||
listTriggersForRoutineIds(companyId, routineIds),
|
||||
listLatestRunByRoutineIds(companyId, routineIds),
|
||||
listActiveIssueByRoutineIds(companyId, routineIds),
|
||||
listLiveIssueByRoutineIds(companyId, routineIds),
|
||||
]);
|
||||
return rows.map((row) => ({
|
||||
...row,
|
||||
@@ -665,7 +750,7 @@ export function routineService(db: Db) {
|
||||
: null,
|
||||
})),
|
||||
),
|
||||
findOpenExecutionIssue(row),
|
||||
findLiveExecutionIssue(row),
|
||||
]);
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user