feat(core): merge backup core changes with post-split functionality
This commit is contained in:
@@ -11,6 +11,7 @@ import {
|
||||
heartbeatRunEvents,
|
||||
heartbeatRuns,
|
||||
} from "@paperclip/db";
|
||||
import { isUuidLike, normalizeAgentUrlKey } from "@paperclip/shared";
|
||||
import { conflict, notFound, unprocessable } from "../errors.js";
|
||||
import { normalizeAgentPermissions } from "./agent-permissions.js";
|
||||
import { REDACTED_EVENT_VALUE, sanitizeRecord } from "../redaction.js";
|
||||
@@ -140,13 +141,20 @@ function configPatchFromSnapshot(snapshot: unknown): Partial<typeof agents.$infe
|
||||
}
|
||||
|
||||
export function agentService(db: Db) {
|
||||
function normalizeAgentRow(row: typeof agents.$inferSelect) {
|
||||
function withUrlKey<T extends { id: string; name: string }>(row: T) {
|
||||
return {
|
||||
...row,
|
||||
permissions: normalizeAgentPermissions(row.permissions, row.role),
|
||||
urlKey: normalizeAgentUrlKey(row.name) ?? row.id,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeAgentRow(row: typeof agents.$inferSelect) {
|
||||
return withUrlKey({
|
||||
...row,
|
||||
permissions: normalizeAgentPermissions(row.permissions, row.role),
|
||||
});
|
||||
}
|
||||
|
||||
async function getById(id: string) {
|
||||
const row = await db
|
||||
.select()
|
||||
@@ -502,5 +510,37 @@ export function agentService(db: Db) {
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))),
|
||||
|
||||
resolveByReference: async (companyId: string, reference: string) => {
|
||||
const raw = reference.trim();
|
||||
if (raw.length === 0) {
|
||||
return { agent: null, ambiguous: false } as const;
|
||||
}
|
||||
|
||||
if (isUuidLike(raw)) {
|
||||
const byId = await getById(raw);
|
||||
if (!byId || byId.companyId !== companyId) {
|
||||
return { agent: null, ambiguous: false } as const;
|
||||
}
|
||||
return { agent: byId, ambiguous: false } as const;
|
||||
}
|
||||
|
||||
const urlKey = normalizeAgentUrlKey(raw);
|
||||
if (!urlKey) {
|
||||
return { agent: null, ambiguous: false } as const;
|
||||
}
|
||||
|
||||
const rows = await db.select().from(agents).where(eq(agents.companyId, companyId));
|
||||
const matches = rows
|
||||
.map(normalizeAgentRow)
|
||||
.filter((agent) => agent.urlKey === urlKey && agent.status !== "terminated");
|
||||
if (matches.length === 1) {
|
||||
return { agent: matches[0] ?? null, ambiguous: false } as const;
|
||||
}
|
||||
if (matches.length > 1) {
|
||||
return { agent: null, ambiguous: true } as const;
|
||||
}
|
||||
return { agent: null, ambiguous: false } as const;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1606,8 +1606,14 @@ export function heartbeatService(db: Db) {
|
||||
const executionAgentNameKey =
|
||||
normalizeAgentNameKey(issue.executionAgentNameKey) ??
|
||||
normalizeAgentNameKey(executionAgent?.name);
|
||||
const isSameExecutionAgent =
|
||||
Boolean(executionAgentNameKey) && executionAgentNameKey === agentNameKey;
|
||||
const shouldQueueFollowupForCommentWake =
|
||||
Boolean(wakeCommentId) &&
|
||||
activeExecutionRun.status === "running" &&
|
||||
isSameExecutionAgent;
|
||||
|
||||
if (executionAgentNameKey && executionAgentNameKey === agentNameKey) {
|
||||
if (isSameExecutionAgent && !shouldQueueFollowupForCommentWake) {
|
||||
const mergedContextSnapshot = mergeCoalescedContextSnapshot(
|
||||
activeExecutionRun.contextSnapshot,
|
||||
enrichedContextSnapshot,
|
||||
@@ -1647,6 +1653,47 @@ export function heartbeatService(db: Db) {
|
||||
[DEFERRED_WAKE_CONTEXT_KEY]: enrichedContextSnapshot,
|
||||
};
|
||||
|
||||
const existingDeferred = await tx
|
||||
.select()
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, agent.companyId),
|
||||
eq(agentWakeupRequests.agentId, agentId),
|
||||
eq(agentWakeupRequests.status, "deferred_issue_execution"),
|
||||
sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issue.id}`,
|
||||
),
|
||||
)
|
||||
.orderBy(asc(agentWakeupRequests.requestedAt))
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (existingDeferred) {
|
||||
const existingDeferredPayload = parseObject(existingDeferred.payload);
|
||||
const existingDeferredContext = parseObject(existingDeferredPayload[DEFERRED_WAKE_CONTEXT_KEY]);
|
||||
const mergedDeferredContext = mergeCoalescedContextSnapshot(
|
||||
existingDeferredContext,
|
||||
enrichedContextSnapshot,
|
||||
);
|
||||
const mergedDeferredPayload = {
|
||||
...existingDeferredPayload,
|
||||
...(payload ?? {}),
|
||||
issueId,
|
||||
[DEFERRED_WAKE_CONTEXT_KEY]: mergedDeferredContext,
|
||||
};
|
||||
|
||||
await tx
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
payload: mergedDeferredPayload,
|
||||
coalescedCount: (existingDeferred.coalescedCount ?? 0) + 1,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, existingDeferred.id));
|
||||
|
||||
return { kind: "deferred" as const };
|
||||
}
|
||||
|
||||
await tx.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
|
||||
@@ -56,7 +56,18 @@ export interface IssueFilters {
|
||||
|
||||
type IssueRow = typeof issues.$inferSelect;
|
||||
type IssueLabelRow = typeof labels.$inferSelect;
|
||||
type IssueActiveRunRow = {
|
||||
id: string;
|
||||
status: string;
|
||||
agentId: string;
|
||||
invocationSource: string;
|
||||
triggerDetail: string | null;
|
||||
startedAt: Date | null;
|
||||
finishedAt: Date | null;
|
||||
createdAt: Date;
|
||||
};
|
||||
type IssueWithLabels = IssueRow & { labels: IssueLabelRow[]; labelIds: string[] };
|
||||
type IssueWithLabelsAndRun = IssueWithLabels & { activeRun: IssueActiveRunRow | null };
|
||||
|
||||
function sameRunLock(checkoutRunId: string | null, actorRunId: string | null) {
|
||||
if (actorRunId) return checkoutRunId === actorRunId;
|
||||
@@ -103,6 +114,53 @@ async function withIssueLabels(dbOrTx: any, rows: IssueRow[]): Promise<IssueWith
|
||||
});
|
||||
}
|
||||
|
||||
const ACTIVE_RUN_STATUSES = ["queued", "running"];
|
||||
|
||||
async function activeRunMapForIssues(
|
||||
dbOrTx: any,
|
||||
issueRows: IssueWithLabels[],
|
||||
): Promise<Map<string, IssueActiveRunRow>> {
|
||||
const map = new Map<string, IssueActiveRunRow>();
|
||||
const runIds = issueRows
|
||||
.map((row) => row.executionRunId)
|
||||
.filter((id): id is string => id != null);
|
||||
if (runIds.length === 0) return map;
|
||||
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
id: heartbeatRuns.id,
|
||||
status: heartbeatRuns.status,
|
||||
agentId: heartbeatRuns.agentId,
|
||||
invocationSource: heartbeatRuns.invocationSource,
|
||||
triggerDetail: heartbeatRuns.triggerDetail,
|
||||
startedAt: heartbeatRuns.startedAt,
|
||||
finishedAt: heartbeatRuns.finishedAt,
|
||||
createdAt: heartbeatRuns.createdAt,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
inArray(heartbeatRuns.id, runIds),
|
||||
inArray(heartbeatRuns.status, ACTIVE_RUN_STATUSES),
|
||||
),
|
||||
);
|
||||
|
||||
for (const row of rows) {
|
||||
map.set(row.id, row);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
function withActiveRuns(
|
||||
issueRows: IssueWithLabels[],
|
||||
runMap: Map<string, IssueActiveRunRow>,
|
||||
): IssueWithLabelsAndRun[] {
|
||||
return issueRows.map((row) => ({
|
||||
...row,
|
||||
activeRun: row.executionRunId ? (runMap.get(row.executionRunId) ?? null) : null,
|
||||
}));
|
||||
}
|
||||
|
||||
export function issueService(db: Db) {
|
||||
async function assertAssignableAgent(companyId: string, agentId: string) {
|
||||
const assignee = await db
|
||||
@@ -293,7 +351,9 @@ export function issueService(db: Db) {
|
||||
.from(issues)
|
||||
.where(and(...conditions))
|
||||
.orderBy(hasSearch ? asc(searchOrder) : asc(priorityOrder), asc(priorityOrder), desc(issues.updatedAt));
|
||||
return withIssueLabels(db, rows);
|
||||
const withLabels = await withIssueLabels(db, rows);
|
||||
const runMap = await activeRunMapForIssues(db, withLabels);
|
||||
return withActiveRuns(withLabels, runMap);
|
||||
},
|
||||
|
||||
getById: async (id: string) => {
|
||||
|
||||
@@ -1,7 +1,14 @@
|
||||
import { and, asc, desc, eq, inArray } from "drizzle-orm";
|
||||
import type { Db } from "@paperclip/db";
|
||||
import { projects, projectGoals, goals, projectWorkspaces } from "@paperclip/db";
|
||||
import { PROJECT_COLORS, type ProjectGoalRef, type ProjectWorkspace } from "@paperclip/shared";
|
||||
import {
|
||||
PROJECT_COLORS,
|
||||
deriveProjectUrlKey,
|
||||
isUuidLike,
|
||||
normalizeProjectUrlKey,
|
||||
type ProjectGoalRef,
|
||||
type ProjectWorkspace,
|
||||
} from "@paperclip/shared";
|
||||
|
||||
type ProjectRow = typeof projects.$inferSelect;
|
||||
type ProjectWorkspaceRow = typeof projectWorkspaces.$inferSelect;
|
||||
@@ -17,6 +24,7 @@ type CreateWorkspaceInput = {
|
||||
type UpdateWorkspaceInput = Partial<CreateWorkspaceInput>;
|
||||
|
||||
interface ProjectWithGoals extends ProjectRow {
|
||||
urlKey: string;
|
||||
goalIds: string[];
|
||||
goals: ProjectGoalRef[];
|
||||
workspaces: ProjectWorkspace[];
|
||||
@@ -52,7 +60,12 @@ async function attachGoals(db: Db, rows: ProjectRow[]): Promise<ProjectWithGoals
|
||||
|
||||
return rows.map((r) => {
|
||||
const g = map.get(r.id) ?? [];
|
||||
return { ...r, goalIds: g.map((x) => x.id), goals: g } as ProjectWithGoals;
|
||||
return {
|
||||
...r,
|
||||
urlKey: deriveProjectUrlKey(r.name, r.id),
|
||||
goalIds: g.map((x) => x.id),
|
||||
goals: g,
|
||||
} as ProjectWithGoals;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -314,7 +327,11 @@ export function projectService(db: Db) {
|
||||
.delete(projects)
|
||||
.where(eq(projects.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null),
|
||||
.then((rows) => {
|
||||
const row = rows[0] ?? null;
|
||||
if (!row) return null;
|
||||
return { ...row, urlKey: deriveProjectUrlKey(row.name, row.id) };
|
||||
}),
|
||||
|
||||
listWorkspaces: async (projectId: string): Promise<ProjectWorkspace[]> => {
|
||||
const rows = await db
|
||||
@@ -555,5 +572,47 @@ export function projectService(db: Db) {
|
||||
|
||||
return removed ? toWorkspace(removed) : null;
|
||||
},
|
||||
|
||||
resolveByReference: async (companyId: string, reference: string) => {
|
||||
const raw = reference.trim();
|
||||
if (raw.length === 0) {
|
||||
return { project: null, ambiguous: false } as const;
|
||||
}
|
||||
|
||||
if (isUuidLike(raw)) {
|
||||
const row = await db
|
||||
.select({ id: projects.id, companyId: projects.companyId, name: projects.name })
|
||||
.from(projects)
|
||||
.where(and(eq(projects.id, raw), eq(projects.companyId, companyId)))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) return { project: null, ambiguous: false } as const;
|
||||
return {
|
||||
project: { id: row.id, companyId: row.companyId, urlKey: deriveProjectUrlKey(row.name, row.id) },
|
||||
ambiguous: false,
|
||||
} as const;
|
||||
}
|
||||
|
||||
const urlKey = normalizeProjectUrlKey(raw);
|
||||
if (!urlKey) {
|
||||
return { project: null, ambiguous: false } as const;
|
||||
}
|
||||
|
||||
const rows = await db
|
||||
.select({ id: projects.id, companyId: projects.companyId, name: projects.name })
|
||||
.from(projects)
|
||||
.where(eq(projects.companyId, companyId));
|
||||
const matches = rows.filter((row) => deriveProjectUrlKey(row.name, row.id) === urlKey);
|
||||
if (matches.length === 1) {
|
||||
const match = matches[0]!;
|
||||
return {
|
||||
project: { id: match.id, companyId: match.companyId, urlKey: deriveProjectUrlKey(match.name, match.id) },
|
||||
ambiguous: false,
|
||||
} as const;
|
||||
}
|
||||
if (matches.length > 1) {
|
||||
return { project: null, ambiguous: true } as const;
|
||||
}
|
||||
return { project: null, ambiguous: false } as const;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user