Merge remote-tracking branch 'public-gh/master' into paperclip-subissues
* public-gh/master: Fix budget incident resolution edge cases Fix agent budget tab routing Fix budget auth and monthly spend rollups Harden budget enforcement and migration startup Add budget tabs and sidebar budget indicators feat(costs): add billing, quota, and budget control plane refactor(quota): move provider quota logic into adapter layer, add unit tests fix(costs): replace non-null map assertions with nullish coalescing, clarify weekData guard fix(costs): guard byProject against duplicate null keys, memoize ProviderQuotaCard row aggregations fix(costs): align byAgent run filter to startedAt, tighten providerTabItems memo deps, stabilize byProject row keys feat(costs): add agent model breakdown, harden date validation, sync CostByProject type, fix quota threshold and tab-gated queries fix(costs): harden company auth check, fix frozen date memo, hide empty quota rows fix(costs): guard routes, fix DST ranges, sync provider state, wire live updates feat(costs): consolidate /usage into /costs with Spend + Providers tabs feat(usage): add subscription quota windows per provider on /usage page address greptile review: per-provider deficit notch, startedAt filter, weekRange refresh, deduplicate providerDisplayName feat(ui): add resource and usage dashboard (/usage route) # Conflicts: # packages/db/src/migration-runtime.ts # packages/db/src/migrations/meta/0031_snapshot.json # packages/db/src/migrations/meta/_journal.json
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { createHash, randomBytes } from "node:crypto";
|
||||
import { and, desc, eq, inArray, ne } from "drizzle-orm";
|
||||
import { and, desc, eq, gte, inArray, lt, ne, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
agents,
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
agentRuntimeState,
|
||||
agentTaskSessions,
|
||||
agentWakeupRequests,
|
||||
costEvents,
|
||||
heartbeatRunEvents,
|
||||
heartbeatRuns,
|
||||
} from "@paperclipai/db";
|
||||
@@ -182,6 +183,15 @@ export function deduplicateAgentName(
|
||||
}
|
||||
|
||||
export function agentService(db: Db) {
|
||||
function currentUtcMonthWindow(now = new Date()) {
|
||||
const year = now.getUTCFullYear();
|
||||
const month = now.getUTCMonth();
|
||||
return {
|
||||
start: new Date(Date.UTC(year, month, 1, 0, 0, 0, 0)),
|
||||
end: new Date(Date.UTC(year, month + 1, 1, 0, 0, 0, 0)),
|
||||
};
|
||||
}
|
||||
|
||||
function withUrlKey<T extends { id: string; name: string }>(row: T) {
|
||||
return {
|
||||
...row,
|
||||
@@ -196,13 +206,47 @@ export function agentService(db: Db) {
|
||||
});
|
||||
}
|
||||
|
||||
async function getMonthlySpendByAgentIds(companyId: string, agentIds: string[]) {
|
||||
if (agentIds.length === 0) return new Map<string, number>();
|
||||
const { start, end } = currentUtcMonthWindow();
|
||||
const rows = await db
|
||||
.select({
|
||||
agentId: costEvents.agentId,
|
||||
spentMonthlyCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.where(
|
||||
and(
|
||||
eq(costEvents.companyId, companyId),
|
||||
inArray(costEvents.agentId, agentIds),
|
||||
gte(costEvents.occurredAt, start),
|
||||
lt(costEvents.occurredAt, end),
|
||||
),
|
||||
)
|
||||
.groupBy(costEvents.agentId);
|
||||
return new Map(rows.map((row) => [row.agentId, Number(row.spentMonthlyCents ?? 0)]));
|
||||
}
|
||||
|
||||
async function hydrateAgentSpend<T extends { id: string; companyId: string; spentMonthlyCents: number }>(rows: T[]) {
|
||||
const agentIds = rows.map((row) => row.id);
|
||||
const companyId = rows[0]?.companyId;
|
||||
if (!companyId || agentIds.length === 0) return rows;
|
||||
const spendByAgentId = await getMonthlySpendByAgentIds(companyId, agentIds);
|
||||
return rows.map((row) => ({
|
||||
...row,
|
||||
spentMonthlyCents: spendByAgentId.get(row.id) ?? 0,
|
||||
}));
|
||||
}
|
||||
|
||||
async function getById(id: string) {
|
||||
const row = await db
|
||||
.select()
|
||||
.from(agents)
|
||||
.where(eq(agents.id, id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
return row ? normalizeAgentRow(row) : null;
|
||||
if (!row) return null;
|
||||
const [hydrated] = await hydrateAgentSpend([row]);
|
||||
return normalizeAgentRow(hydrated);
|
||||
}
|
||||
|
||||
async function ensureManager(companyId: string, managerId: string) {
|
||||
@@ -331,7 +375,8 @@ export function agentService(db: Db) {
|
||||
conditions.push(ne(agents.status, "terminated"));
|
||||
}
|
||||
const rows = await db.select().from(agents).where(and(...conditions));
|
||||
return rows.map(normalizeAgentRow);
|
||||
const hydrated = await hydrateAgentSpend(rows);
|
||||
return hydrated.map(normalizeAgentRow);
|
||||
},
|
||||
|
||||
getById,
|
||||
@@ -360,14 +405,19 @@ export function agentService(db: Db) {
|
||||
|
||||
update: updateAgent,
|
||||
|
||||
pause: async (id: string) => {
|
||||
pause: async (id: string, reason: "manual" | "budget" | "system" = "manual") => {
|
||||
const existing = await getById(id);
|
||||
if (!existing) return null;
|
||||
if (existing.status === "terminated") throw conflict("Cannot pause terminated agent");
|
||||
|
||||
const updated = await db
|
||||
.update(agents)
|
||||
.set({ status: "paused", updatedAt: new Date() })
|
||||
.set({
|
||||
status: "paused",
|
||||
pauseReason: reason,
|
||||
pausedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agents.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
@@ -384,7 +434,12 @@ export function agentService(db: Db) {
|
||||
|
||||
const updated = await db
|
||||
.update(agents)
|
||||
.set({ status: "idle", updatedAt: new Date() })
|
||||
.set({
|
||||
status: "idle",
|
||||
pauseReason: null,
|
||||
pausedAt: null,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agents.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
@@ -397,7 +452,12 @@ export function agentService(db: Db) {
|
||||
|
||||
await db
|
||||
.update(agents)
|
||||
.set({ status: "terminated", updatedAt: new Date() })
|
||||
.set({
|
||||
status: "terminated",
|
||||
pauseReason: null,
|
||||
pausedAt: null,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agents.id, id));
|
||||
|
||||
await db
|
||||
|
||||
@@ -4,6 +4,7 @@ import { approvalComments, approvals } from "@paperclipai/db";
|
||||
import { notFound, unprocessable } from "../errors.js";
|
||||
import { redactCurrentUserText } from "../log-redaction.js";
|
||||
import { agentService } from "./agents.js";
|
||||
import { budgetService } from "./budgets.js";
|
||||
import { notifyHireApproved } from "./hire-hook.js";
|
||||
|
||||
function redactApprovalComment<T extends { body: string }>(comment: T): T {
|
||||
@@ -15,6 +16,7 @@ function redactApprovalComment<T extends { body: string }>(comment: T): T {
|
||||
|
||||
export function approvalService(db: Db) {
|
||||
const agentsSvc = agentService(db);
|
||||
const budgets = budgetService(db);
|
||||
const canResolveStatuses = new Set(["pending", "revision_requested"]);
|
||||
const resolvableStatuses = Array.from(canResolveStatuses);
|
||||
type ApprovalRecord = typeof approvals.$inferSelect;
|
||||
@@ -137,6 +139,20 @@ export function approvalService(db: Db) {
|
||||
hireApprovedAgentId = created?.id ?? null;
|
||||
}
|
||||
if (hireApprovedAgentId) {
|
||||
const budgetMonthlyCents =
|
||||
typeof payload.budgetMonthlyCents === "number" ? payload.budgetMonthlyCents : 0;
|
||||
if (budgetMonthlyCents > 0) {
|
||||
await budgets.upsertPolicy(
|
||||
updated.companyId,
|
||||
{
|
||||
scopeType: "agent",
|
||||
scopeId: hireApprovedAgentId,
|
||||
amount: budgetMonthlyCents,
|
||||
windowKind: "calendar_month_utc",
|
||||
},
|
||||
decidedByUserId,
|
||||
);
|
||||
}
|
||||
void notifyHireApproved(db, {
|
||||
companyId: updated.companyId,
|
||||
agentId: hireApprovedAgentId,
|
||||
|
||||
958
server/src/services/budgets.ts
Normal file
958
server/src/services/budgets.ts
Normal file
@@ -0,0 +1,958 @@
|
||||
import { and, desc, eq, gte, inArray, lt, ne, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
agents,
|
||||
approvals,
|
||||
budgetIncidents,
|
||||
budgetPolicies,
|
||||
companies,
|
||||
costEvents,
|
||||
projects,
|
||||
} from "@paperclipai/db";
|
||||
import type {
|
||||
BudgetIncident,
|
||||
BudgetIncidentResolutionInput,
|
||||
BudgetMetric,
|
||||
BudgetOverview,
|
||||
BudgetPolicy,
|
||||
BudgetPolicySummary,
|
||||
BudgetPolicyUpsertInput,
|
||||
BudgetScopeType,
|
||||
BudgetThresholdType,
|
||||
BudgetWindowKind,
|
||||
} from "@paperclipai/shared";
|
||||
import { notFound, unprocessable } from "../errors.js";
|
||||
import { logActivity } from "./activity-log.js";
|
||||
|
||||
type ScopeRecord = {
|
||||
companyId: string;
|
||||
name: string;
|
||||
paused: boolean;
|
||||
pauseReason: "manual" | "budget" | "system" | null;
|
||||
};
|
||||
|
||||
type PolicyRow = typeof budgetPolicies.$inferSelect;
|
||||
type IncidentRow = typeof budgetIncidents.$inferSelect;
|
||||
|
||||
export type BudgetEnforcementScope = {
|
||||
companyId: string;
|
||||
scopeType: BudgetScopeType;
|
||||
scopeId: string;
|
||||
};
|
||||
|
||||
export type BudgetServiceHooks = {
|
||||
cancelWorkForScope?: (scope: BudgetEnforcementScope) => Promise<void>;
|
||||
};
|
||||
|
||||
function currentUtcMonthWindow(now = new Date()) {
|
||||
const year = now.getUTCFullYear();
|
||||
const month = now.getUTCMonth();
|
||||
const start = new Date(Date.UTC(year, month, 1, 0, 0, 0, 0));
|
||||
const end = new Date(Date.UTC(year, month + 1, 1, 0, 0, 0, 0));
|
||||
return { start, end };
|
||||
}
|
||||
|
||||
function resolveWindow(windowKind: BudgetWindowKind, now = new Date()) {
|
||||
if (windowKind === "lifetime") {
|
||||
return {
|
||||
start: new Date(Date.UTC(1970, 0, 1, 0, 0, 0, 0)),
|
||||
end: new Date(Date.UTC(9999, 0, 1, 0, 0, 0, 0)),
|
||||
};
|
||||
}
|
||||
return currentUtcMonthWindow(now);
|
||||
}
|
||||
|
||||
function budgetStatusFromObserved(
|
||||
observedAmount: number,
|
||||
amount: number,
|
||||
warnPercent: number,
|
||||
): BudgetPolicySummary["status"] {
|
||||
if (amount <= 0) return "ok";
|
||||
if (observedAmount >= amount) return "hard_stop";
|
||||
if (observedAmount >= Math.ceil((amount * warnPercent) / 100)) return "warning";
|
||||
return "ok";
|
||||
}
|
||||
|
||||
function normalizeScopeName(scopeType: BudgetScopeType, name: string) {
|
||||
if (scopeType === "company") return name;
|
||||
return name.trim().length > 0 ? name : scopeType;
|
||||
}
|
||||
|
||||
async function resolveScopeRecord(db: Db, scopeType: BudgetScopeType, scopeId: string): Promise<ScopeRecord> {
|
||||
if (scopeType === "company") {
|
||||
const row = await db
|
||||
.select({
|
||||
companyId: companies.id,
|
||||
name: companies.name,
|
||||
status: companies.status,
|
||||
pauseReason: companies.pauseReason,
|
||||
pausedAt: companies.pausedAt,
|
||||
})
|
||||
.from(companies)
|
||||
.where(eq(companies.id, scopeId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) throw notFound("Company not found");
|
||||
return {
|
||||
companyId: row.companyId,
|
||||
name: row.name,
|
||||
paused: row.status === "paused" || Boolean(row.pausedAt),
|
||||
pauseReason: (row.pauseReason as ScopeRecord["pauseReason"]) ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
if (scopeType === "agent") {
|
||||
const row = await db
|
||||
.select({
|
||||
companyId: agents.companyId,
|
||||
name: agents.name,
|
||||
status: agents.status,
|
||||
pauseReason: agents.pauseReason,
|
||||
})
|
||||
.from(agents)
|
||||
.where(eq(agents.id, scopeId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) throw notFound("Agent not found");
|
||||
return {
|
||||
companyId: row.companyId,
|
||||
name: row.name,
|
||||
paused: row.status === "paused",
|
||||
pauseReason: (row.pauseReason as ScopeRecord["pauseReason"]) ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
const row = await db
|
||||
.select({
|
||||
companyId: projects.companyId,
|
||||
name: projects.name,
|
||||
pauseReason: projects.pauseReason,
|
||||
pausedAt: projects.pausedAt,
|
||||
})
|
||||
.from(projects)
|
||||
.where(eq(projects.id, scopeId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) throw notFound("Project not found");
|
||||
return {
|
||||
companyId: row.companyId,
|
||||
name: row.name,
|
||||
paused: Boolean(row.pausedAt),
|
||||
pauseReason: (row.pauseReason as ScopeRecord["pauseReason"]) ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
async function computeObservedAmount(
|
||||
db: Db,
|
||||
policy: Pick<PolicyRow, "companyId" | "scopeType" | "scopeId" | "windowKind" | "metric">,
|
||||
) {
|
||||
if (policy.metric !== "billed_cents") return 0;
|
||||
|
||||
const conditions = [eq(costEvents.companyId, policy.companyId)];
|
||||
if (policy.scopeType === "agent") conditions.push(eq(costEvents.agentId, policy.scopeId));
|
||||
if (policy.scopeType === "project") conditions.push(eq(costEvents.projectId, policy.scopeId));
|
||||
const { start, end } = resolveWindow(policy.windowKind as BudgetWindowKind);
|
||||
if (policy.windowKind === "calendar_month_utc") {
|
||||
conditions.push(gte(costEvents.occurredAt, start));
|
||||
conditions.push(lt(costEvents.occurredAt, end));
|
||||
}
|
||||
|
||||
const [row] = await db
|
||||
.select({
|
||||
total: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.where(and(...conditions));
|
||||
|
||||
return Number(row?.total ?? 0);
|
||||
}
|
||||
|
||||
function buildApprovalPayload(input: {
|
||||
policy: PolicyRow;
|
||||
scopeName: string;
|
||||
thresholdType: BudgetThresholdType;
|
||||
amountObserved: number;
|
||||
windowStart: Date;
|
||||
windowEnd: Date;
|
||||
}) {
|
||||
return {
|
||||
scopeType: input.policy.scopeType,
|
||||
scopeId: input.policy.scopeId,
|
||||
scopeName: input.scopeName,
|
||||
metric: input.policy.metric,
|
||||
windowKind: input.policy.windowKind,
|
||||
thresholdType: input.thresholdType,
|
||||
budgetAmount: input.policy.amount,
|
||||
observedAmount: input.amountObserved,
|
||||
warnPercent: input.policy.warnPercent,
|
||||
windowStart: input.windowStart.toISOString(),
|
||||
windowEnd: input.windowEnd.toISOString(),
|
||||
policyId: input.policy.id,
|
||||
guidance: "Raise the budget and resume the scope, or keep the scope paused.",
|
||||
};
|
||||
}
|
||||
|
||||
async function markApprovalStatus(
|
||||
db: Db,
|
||||
approvalId: string | null,
|
||||
status: "approved" | "rejected",
|
||||
decisionNote: string | null | undefined,
|
||||
decidedByUserId: string,
|
||||
) {
|
||||
if (!approvalId) return;
|
||||
await db
|
||||
.update(approvals)
|
||||
.set({
|
||||
status,
|
||||
decisionNote: decisionNote ?? null,
|
||||
decidedByUserId,
|
||||
decidedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(approvals.id, approvalId));
|
||||
}
|
||||
|
||||
export function budgetService(db: Db, hooks: BudgetServiceHooks = {}) {
|
||||
async function pauseScopeForBudget(policy: PolicyRow) {
|
||||
const now = new Date();
|
||||
if (policy.scopeType === "agent") {
|
||||
await db
|
||||
.update(agents)
|
||||
.set({
|
||||
status: "paused",
|
||||
pauseReason: "budget",
|
||||
pausedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(agents.id, policy.scopeId), inArray(agents.status, ["active", "idle", "running", "error"])));
|
||||
return;
|
||||
}
|
||||
|
||||
if (policy.scopeType === "project") {
|
||||
await db
|
||||
.update(projects)
|
||||
.set({
|
||||
pauseReason: "budget",
|
||||
pausedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(projects.id, policy.scopeId));
|
||||
return;
|
||||
}
|
||||
|
||||
await db
|
||||
.update(companies)
|
||||
.set({
|
||||
status: "paused",
|
||||
pauseReason: "budget",
|
||||
pausedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(companies.id, policy.scopeId));
|
||||
}
|
||||
|
||||
async function pauseAndCancelScopeForBudget(policy: PolicyRow) {
|
||||
await pauseScopeForBudget(policy);
|
||||
await hooks.cancelWorkForScope?.({
|
||||
companyId: policy.companyId,
|
||||
scopeType: policy.scopeType as BudgetScopeType,
|
||||
scopeId: policy.scopeId,
|
||||
});
|
||||
}
|
||||
|
||||
async function resumeScopeFromBudget(policy: PolicyRow) {
|
||||
const now = new Date();
|
||||
if (policy.scopeType === "agent") {
|
||||
await db
|
||||
.update(agents)
|
||||
.set({
|
||||
status: "idle",
|
||||
pauseReason: null,
|
||||
pausedAt: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(agents.id, policy.scopeId), eq(agents.pauseReason, "budget")));
|
||||
return;
|
||||
}
|
||||
|
||||
if (policy.scopeType === "project") {
|
||||
await db
|
||||
.update(projects)
|
||||
.set({
|
||||
pauseReason: null,
|
||||
pausedAt: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(projects.id, policy.scopeId), eq(projects.pauseReason, "budget")));
|
||||
return;
|
||||
}
|
||||
|
||||
await db
|
||||
.update(companies)
|
||||
.set({
|
||||
status: "active",
|
||||
pauseReason: null,
|
||||
pausedAt: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(companies.id, policy.scopeId), eq(companies.pauseReason, "budget")));
|
||||
}
|
||||
|
||||
async function getPolicyRow(policyId: string) {
|
||||
const policy = await db
|
||||
.select()
|
||||
.from(budgetPolicies)
|
||||
.where(eq(budgetPolicies.id, policyId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!policy) throw notFound("Budget policy not found");
|
||||
return policy;
|
||||
}
|
||||
|
||||
async function listPolicyRows(companyId: string) {
|
||||
return db
|
||||
.select()
|
||||
.from(budgetPolicies)
|
||||
.where(eq(budgetPolicies.companyId, companyId))
|
||||
.orderBy(desc(budgetPolicies.updatedAt));
|
||||
}
|
||||
|
||||
async function buildPolicySummary(policy: PolicyRow): Promise<BudgetPolicySummary> {
|
||||
const scope = await resolveScopeRecord(db, policy.scopeType as BudgetScopeType, policy.scopeId);
|
||||
const observedAmount = await computeObservedAmount(db, policy);
|
||||
const { start, end } = resolveWindow(policy.windowKind as BudgetWindowKind);
|
||||
const amount = policy.isActive ? policy.amount : 0;
|
||||
const utilizationPercent =
|
||||
amount > 0 ? Number(((observedAmount / amount) * 100).toFixed(2)) : 0;
|
||||
return {
|
||||
policyId: policy.id,
|
||||
companyId: policy.companyId,
|
||||
scopeType: policy.scopeType as BudgetScopeType,
|
||||
scopeId: policy.scopeId,
|
||||
scopeName: normalizeScopeName(policy.scopeType as BudgetScopeType, scope.name),
|
||||
metric: policy.metric as BudgetMetric,
|
||||
windowKind: policy.windowKind as BudgetWindowKind,
|
||||
amount,
|
||||
observedAmount,
|
||||
remainingAmount: amount > 0 ? Math.max(0, amount - observedAmount) : 0,
|
||||
utilizationPercent,
|
||||
warnPercent: policy.warnPercent,
|
||||
hardStopEnabled: policy.hardStopEnabled,
|
||||
notifyEnabled: policy.notifyEnabled,
|
||||
isActive: policy.isActive,
|
||||
status: policy.isActive
|
||||
? budgetStatusFromObserved(observedAmount, amount, policy.warnPercent)
|
||||
: "ok",
|
||||
paused: scope.paused,
|
||||
pauseReason: scope.pauseReason,
|
||||
windowStart: start,
|
||||
windowEnd: end,
|
||||
};
|
||||
}
|
||||
|
||||
async function createIncidentIfNeeded(
|
||||
policy: PolicyRow,
|
||||
thresholdType: BudgetThresholdType,
|
||||
amountObserved: number,
|
||||
) {
|
||||
const { start, end } = resolveWindow(policy.windowKind as BudgetWindowKind);
|
||||
const existing = await db
|
||||
.select()
|
||||
.from(budgetIncidents)
|
||||
.where(
|
||||
and(
|
||||
eq(budgetIncidents.policyId, policy.id),
|
||||
eq(budgetIncidents.windowStart, start),
|
||||
eq(budgetIncidents.thresholdType, thresholdType),
|
||||
ne(budgetIncidents.status, "dismissed"),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (existing) return existing;
|
||||
|
||||
const scope = await resolveScopeRecord(db, policy.scopeType as BudgetScopeType, policy.scopeId);
|
||||
const payload = buildApprovalPayload({
|
||||
policy,
|
||||
scopeName: normalizeScopeName(policy.scopeType as BudgetScopeType, scope.name),
|
||||
thresholdType,
|
||||
amountObserved,
|
||||
windowStart: start,
|
||||
windowEnd: end,
|
||||
});
|
||||
|
||||
const approval = thresholdType === "hard"
|
||||
? await db
|
||||
.insert(approvals)
|
||||
.values({
|
||||
companyId: policy.companyId,
|
||||
type: "budget_override_required",
|
||||
requestedByUserId: null,
|
||||
requestedByAgentId: null,
|
||||
status: "pending",
|
||||
payload,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null)
|
||||
: null;
|
||||
|
||||
return db
|
||||
.insert(budgetIncidents)
|
||||
.values({
|
||||
companyId: policy.companyId,
|
||||
policyId: policy.id,
|
||||
scopeType: policy.scopeType,
|
||||
scopeId: policy.scopeId,
|
||||
metric: policy.metric,
|
||||
windowKind: policy.windowKind,
|
||||
windowStart: start,
|
||||
windowEnd: end,
|
||||
thresholdType,
|
||||
amountLimit: policy.amount,
|
||||
amountObserved,
|
||||
status: "open",
|
||||
approvalId: approval?.id ?? null,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
}
|
||||
|
||||
async function resolveOpenSoftIncidents(policyId: string) {
|
||||
await db
|
||||
.update(budgetIncidents)
|
||||
.set({
|
||||
status: "resolved",
|
||||
resolvedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(budgetIncidents.policyId, policyId),
|
||||
eq(budgetIncidents.thresholdType, "soft"),
|
||||
eq(budgetIncidents.status, "open"),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
async function resolveOpenIncidentsForPolicy(
|
||||
policyId: string,
|
||||
approvalStatus: "approved" | "rejected" | null,
|
||||
decidedByUserId: string | null,
|
||||
) {
|
||||
const openRows = await db
|
||||
.select()
|
||||
.from(budgetIncidents)
|
||||
.where(and(eq(budgetIncidents.policyId, policyId), eq(budgetIncidents.status, "open")));
|
||||
|
||||
await db
|
||||
.update(budgetIncidents)
|
||||
.set({
|
||||
status: "resolved",
|
||||
resolvedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(and(eq(budgetIncidents.policyId, policyId), eq(budgetIncidents.status, "open")));
|
||||
|
||||
if (!approvalStatus || !decidedByUserId) return;
|
||||
for (const row of openRows) {
|
||||
await markApprovalStatus(db, row.approvalId ?? null, approvalStatus, "Resolved via budget update", decidedByUserId);
|
||||
}
|
||||
}
|
||||
|
||||
async function hydrateIncidentRows(rows: IncidentRow[]): Promise<BudgetIncident[]> {
|
||||
const approvalIds = rows.map((row) => row.approvalId).filter((value): value is string => Boolean(value));
|
||||
const approvalRows = approvalIds.length > 0
|
||||
? await db
|
||||
.select({ id: approvals.id, status: approvals.status })
|
||||
.from(approvals)
|
||||
.where(inArray(approvals.id, approvalIds))
|
||||
: [];
|
||||
const approvalStatusById = new Map(approvalRows.map((row) => [row.id, row.status]));
|
||||
|
||||
return Promise.all(
|
||||
rows.map(async (row) => {
|
||||
const scope = await resolveScopeRecord(db, row.scopeType as BudgetScopeType, row.scopeId);
|
||||
return {
|
||||
id: row.id,
|
||||
companyId: row.companyId,
|
||||
policyId: row.policyId,
|
||||
scopeType: row.scopeType as BudgetScopeType,
|
||||
scopeId: row.scopeId,
|
||||
scopeName: normalizeScopeName(row.scopeType as BudgetScopeType, scope.name),
|
||||
metric: row.metric as BudgetMetric,
|
||||
windowKind: row.windowKind as BudgetWindowKind,
|
||||
windowStart: row.windowStart,
|
||||
windowEnd: row.windowEnd,
|
||||
thresholdType: row.thresholdType as BudgetThresholdType,
|
||||
amountLimit: row.amountLimit,
|
||||
amountObserved: row.amountObserved,
|
||||
status: row.status as BudgetIncident["status"],
|
||||
approvalId: row.approvalId ?? null,
|
||||
approvalStatus: row.approvalId ? approvalStatusById.get(row.approvalId) ?? null : null,
|
||||
resolvedAt: row.resolvedAt ?? null,
|
||||
createdAt: row.createdAt,
|
||||
updatedAt: row.updatedAt,
|
||||
};
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
listPolicies: async (companyId: string): Promise<BudgetPolicy[]> => {
|
||||
const rows = await listPolicyRows(companyId);
|
||||
return rows.map((row) => ({
|
||||
...row,
|
||||
scopeType: row.scopeType as BudgetScopeType,
|
||||
metric: row.metric as BudgetMetric,
|
||||
windowKind: row.windowKind as BudgetWindowKind,
|
||||
}));
|
||||
},
|
||||
|
||||
upsertPolicy: async (
|
||||
companyId: string,
|
||||
input: BudgetPolicyUpsertInput,
|
||||
actorUserId: string | null,
|
||||
): Promise<BudgetPolicySummary> => {
|
||||
const scope = await resolveScopeRecord(db, input.scopeType, input.scopeId);
|
||||
if (scope.companyId !== companyId) {
|
||||
throw unprocessable("Budget scope does not belong to company");
|
||||
}
|
||||
|
||||
const metric = input.metric ?? "billed_cents";
|
||||
const windowKind = input.windowKind ?? (input.scopeType === "project" ? "lifetime" : "calendar_month_utc");
|
||||
const amount = Math.max(0, Math.floor(input.amount));
|
||||
const nextIsActive = amount > 0 && (input.isActive ?? true);
|
||||
const existing = await db
|
||||
.select()
|
||||
.from(budgetPolicies)
|
||||
.where(
|
||||
and(
|
||||
eq(budgetPolicies.companyId, companyId),
|
||||
eq(budgetPolicies.scopeType, input.scopeType),
|
||||
eq(budgetPolicies.scopeId, input.scopeId),
|
||||
eq(budgetPolicies.metric, metric),
|
||||
eq(budgetPolicies.windowKind, windowKind),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
const now = new Date();
|
||||
const row = existing
|
||||
? await db
|
||||
.update(budgetPolicies)
|
||||
.set({
|
||||
amount,
|
||||
warnPercent: input.warnPercent ?? existing.warnPercent,
|
||||
hardStopEnabled: input.hardStopEnabled ?? existing.hardStopEnabled,
|
||||
notifyEnabled: input.notifyEnabled ?? existing.notifyEnabled,
|
||||
isActive: nextIsActive,
|
||||
updatedByUserId: actorUserId,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(budgetPolicies.id, existing.id))
|
||||
.returning()
|
||||
.then((rows) => rows[0])
|
||||
: await db
|
||||
.insert(budgetPolicies)
|
||||
.values({
|
||||
companyId,
|
||||
scopeType: input.scopeType,
|
||||
scopeId: input.scopeId,
|
||||
metric,
|
||||
windowKind,
|
||||
amount,
|
||||
warnPercent: input.warnPercent ?? 80,
|
||||
hardStopEnabled: input.hardStopEnabled ?? true,
|
||||
notifyEnabled: input.notifyEnabled ?? true,
|
||||
isActive: nextIsActive,
|
||||
createdByUserId: actorUserId,
|
||||
updatedByUserId: actorUserId,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
if (input.scopeType === "company" && windowKind === "calendar_month_utc") {
|
||||
await db
|
||||
.update(companies)
|
||||
.set({
|
||||
budgetMonthlyCents: amount,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(companies.id, input.scopeId));
|
||||
}
|
||||
|
||||
if (input.scopeType === "agent" && windowKind === "calendar_month_utc") {
|
||||
await db
|
||||
.update(agents)
|
||||
.set({
|
||||
budgetMonthlyCents: amount,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(agents.id, input.scopeId));
|
||||
}
|
||||
|
||||
if (amount > 0) {
|
||||
const observedAmount = await computeObservedAmount(db, row);
|
||||
if (observedAmount < amount) {
|
||||
await resumeScopeFromBudget(row);
|
||||
await resolveOpenIncidentsForPolicy(row.id, actorUserId ? "approved" : null, actorUserId);
|
||||
} else {
|
||||
const softThreshold = Math.ceil((row.amount * row.warnPercent) / 100);
|
||||
if (row.notifyEnabled && observedAmount >= softThreshold) {
|
||||
await createIncidentIfNeeded(row, "soft", observedAmount);
|
||||
}
|
||||
if (row.hardStopEnabled && observedAmount >= row.amount) {
|
||||
await resolveOpenSoftIncidents(row.id);
|
||||
await createIncidentIfNeeded(row, "hard", observedAmount);
|
||||
await pauseAndCancelScopeForBudget(row);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
await resumeScopeFromBudget(row);
|
||||
await resolveOpenIncidentsForPolicy(row.id, actorUserId ? "approved" : null, actorUserId);
|
||||
}
|
||||
|
||||
await logActivity(db, {
|
||||
companyId,
|
||||
actorType: "user",
|
||||
actorId: actorUserId ?? "board",
|
||||
action: "budget.policy_upserted",
|
||||
entityType: "budget_policy",
|
||||
entityId: row.id,
|
||||
details: {
|
||||
scopeType: row.scopeType,
|
||||
scopeId: row.scopeId,
|
||||
amount: row.amount,
|
||||
windowKind: row.windowKind,
|
||||
},
|
||||
});
|
||||
|
||||
return buildPolicySummary(row);
|
||||
},
|
||||
|
||||
overview: async (companyId: string): Promise<BudgetOverview> => {
|
||||
const rows = await listPolicyRows(companyId);
|
||||
const policies = await Promise.all(rows.map((row) => buildPolicySummary(row)));
|
||||
const activeIncidentRows = await db
|
||||
.select()
|
||||
.from(budgetIncidents)
|
||||
.where(and(eq(budgetIncidents.companyId, companyId), eq(budgetIncidents.status, "open")))
|
||||
.orderBy(desc(budgetIncidents.createdAt));
|
||||
const activeIncidents = await hydrateIncidentRows(activeIncidentRows);
|
||||
return {
|
||||
companyId,
|
||||
policies,
|
||||
activeIncidents,
|
||||
pausedAgentCount: policies.filter((policy) => policy.scopeType === "agent" && policy.paused).length,
|
||||
pausedProjectCount: policies.filter((policy) => policy.scopeType === "project" && policy.paused).length,
|
||||
pendingApprovalCount: activeIncidents.filter((incident) => incident.approvalStatus === "pending").length,
|
||||
};
|
||||
},
|
||||
|
||||
evaluateCostEvent: async (event: typeof costEvents.$inferSelect) => {
|
||||
const candidatePolicies = await db
|
||||
.select()
|
||||
.from(budgetPolicies)
|
||||
.where(
|
||||
and(
|
||||
eq(budgetPolicies.companyId, event.companyId),
|
||||
eq(budgetPolicies.isActive, true),
|
||||
inArray(budgetPolicies.scopeType, ["company", "agent", "project"]),
|
||||
),
|
||||
);
|
||||
|
||||
const relevantPolicies = candidatePolicies.filter((policy) => {
|
||||
if (policy.scopeType === "company") return policy.scopeId === event.companyId;
|
||||
if (policy.scopeType === "agent") return policy.scopeId === event.agentId;
|
||||
if (policy.scopeType === "project") return Boolean(event.projectId) && policy.scopeId === event.projectId;
|
||||
return false;
|
||||
});
|
||||
|
||||
for (const policy of relevantPolicies) {
|
||||
if (policy.metric !== "billed_cents" || policy.amount <= 0) continue;
|
||||
const observedAmount = await computeObservedAmount(db, policy);
|
||||
const softThreshold = Math.ceil((policy.amount * policy.warnPercent) / 100);
|
||||
|
||||
if (policy.notifyEnabled && observedAmount >= softThreshold) {
|
||||
const softIncident = await createIncidentIfNeeded(policy, "soft", observedAmount);
|
||||
if (softIncident) {
|
||||
await logActivity(db, {
|
||||
companyId: policy.companyId,
|
||||
actorType: "system",
|
||||
actorId: "budget_service",
|
||||
action: "budget.soft_threshold_crossed",
|
||||
entityType: "budget_incident",
|
||||
entityId: softIncident.id,
|
||||
details: {
|
||||
scopeType: policy.scopeType,
|
||||
scopeId: policy.scopeId,
|
||||
amountObserved: observedAmount,
|
||||
amountLimit: policy.amount,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (policy.hardStopEnabled && observedAmount >= policy.amount) {
|
||||
await resolveOpenSoftIncidents(policy.id);
|
||||
const hardIncident = await createIncidentIfNeeded(policy, "hard", observedAmount);
|
||||
await pauseAndCancelScopeForBudget(policy);
|
||||
if (hardIncident) {
|
||||
await logActivity(db, {
|
||||
companyId: policy.companyId,
|
||||
actorType: "system",
|
||||
actorId: "budget_service",
|
||||
action: "budget.hard_threshold_crossed",
|
||||
entityType: "budget_incident",
|
||||
entityId: hardIncident.id,
|
||||
details: {
|
||||
scopeType: policy.scopeType,
|
||||
scopeId: policy.scopeId,
|
||||
amountObserved: observedAmount,
|
||||
amountLimit: policy.amount,
|
||||
approvalId: hardIncident.approvalId ?? null,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
getInvocationBlock: async (
|
||||
companyId: string,
|
||||
agentId: string,
|
||||
context?: { issueId?: string | null; projectId?: string | null },
|
||||
) => {
|
||||
const agent = await db
|
||||
.select({
|
||||
status: agents.status,
|
||||
pauseReason: agents.pauseReason,
|
||||
companyId: agents.companyId,
|
||||
name: agents.name,
|
||||
})
|
||||
.from(agents)
|
||||
.where(eq(agents.id, agentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!agent || agent.companyId !== companyId) throw notFound("Agent not found");
|
||||
|
||||
const company = await db
|
||||
.select({
|
||||
status: companies.status,
|
||||
pauseReason: companies.pauseReason,
|
||||
name: companies.name,
|
||||
})
|
||||
.from(companies)
|
||||
.where(eq(companies.id, companyId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!company) throw notFound("Company not found");
|
||||
if (company.status === "paused") {
|
||||
return {
|
||||
scopeType: "company" as const,
|
||||
scopeId: companyId,
|
||||
scopeName: company.name,
|
||||
reason:
|
||||
company.pauseReason === "budget"
|
||||
? "Company is paused because its budget hard-stop was reached."
|
||||
: "Company is paused and cannot start new work.",
|
||||
};
|
||||
}
|
||||
|
||||
const companyPolicy = await db
|
||||
.select()
|
||||
.from(budgetPolicies)
|
||||
.where(
|
||||
and(
|
||||
eq(budgetPolicies.companyId, companyId),
|
||||
eq(budgetPolicies.scopeType, "company"),
|
||||
eq(budgetPolicies.scopeId, companyId),
|
||||
eq(budgetPolicies.isActive, true),
|
||||
eq(budgetPolicies.metric, "billed_cents"),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (companyPolicy && companyPolicy.hardStopEnabled && companyPolicy.amount > 0) {
|
||||
const observed = await computeObservedAmount(db, companyPolicy);
|
||||
if (observed >= companyPolicy.amount) {
|
||||
return {
|
||||
scopeType: "company" as const,
|
||||
scopeId: companyId,
|
||||
scopeName: company.name,
|
||||
reason: "Company cannot start new work because its budget hard-stop is exceeded.",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (agent.status === "paused" && agent.pauseReason === "budget") {
|
||||
return {
|
||||
scopeType: "agent" as const,
|
||||
scopeId: agentId,
|
||||
scopeName: agent.name,
|
||||
reason: "Agent is paused because its budget hard-stop was reached.",
|
||||
};
|
||||
}
|
||||
|
||||
const agentPolicy = await db
|
||||
.select()
|
||||
.from(budgetPolicies)
|
||||
.where(
|
||||
and(
|
||||
eq(budgetPolicies.companyId, companyId),
|
||||
eq(budgetPolicies.scopeType, "agent"),
|
||||
eq(budgetPolicies.scopeId, agentId),
|
||||
eq(budgetPolicies.isActive, true),
|
||||
eq(budgetPolicies.metric, "billed_cents"),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (agentPolicy && agentPolicy.hardStopEnabled && agentPolicy.amount > 0) {
|
||||
const observed = await computeObservedAmount(db, agentPolicy);
|
||||
if (observed >= agentPolicy.amount) {
|
||||
return {
|
||||
scopeType: "agent" as const,
|
||||
scopeId: agentId,
|
||||
scopeName: agent.name,
|
||||
reason: "Agent cannot start because its budget hard-stop is still exceeded.",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const candidateProjectId = context?.projectId ?? null;
|
||||
if (!candidateProjectId) return null;
|
||||
|
||||
const project = await db
|
||||
.select({
|
||||
id: projects.id,
|
||||
name: projects.name,
|
||||
companyId: projects.companyId,
|
||||
pauseReason: projects.pauseReason,
|
||||
pausedAt: projects.pausedAt,
|
||||
})
|
||||
.from(projects)
|
||||
.where(eq(projects.id, candidateProjectId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!project || project.companyId !== companyId) return null;
|
||||
const projectPolicy = await db
|
||||
.select()
|
||||
.from(budgetPolicies)
|
||||
.where(
|
||||
and(
|
||||
eq(budgetPolicies.companyId, companyId),
|
||||
eq(budgetPolicies.scopeType, "project"),
|
||||
eq(budgetPolicies.scopeId, project.id),
|
||||
eq(budgetPolicies.isActive, true),
|
||||
eq(budgetPolicies.metric, "billed_cents"),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (projectPolicy && projectPolicy.hardStopEnabled && projectPolicy.amount > 0) {
|
||||
const observed = await computeObservedAmount(db, projectPolicy);
|
||||
if (observed >= projectPolicy.amount) {
|
||||
return {
|
||||
scopeType: "project" as const,
|
||||
scopeId: project.id,
|
||||
scopeName: project.name,
|
||||
reason: "Project cannot start work because its budget hard-stop is still exceeded.",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (!project.pausedAt || project.pauseReason !== "budget") return null;
|
||||
return {
|
||||
scopeType: "project" as const,
|
||||
scopeId: project.id,
|
||||
scopeName: project.name,
|
||||
reason: "Project is paused because its budget hard-stop was reached.",
|
||||
};
|
||||
},
|
||||
|
||||
resolveIncident: async (
|
||||
companyId: string,
|
||||
incidentId: string,
|
||||
input: BudgetIncidentResolutionInput,
|
||||
actorUserId: string,
|
||||
): Promise<BudgetIncident> => {
|
||||
const incident = await db
|
||||
.select()
|
||||
.from(budgetIncidents)
|
||||
.where(eq(budgetIncidents.id, incidentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!incident) throw notFound("Budget incident not found");
|
||||
if (incident.companyId !== companyId) throw notFound("Budget incident not found");
|
||||
|
||||
const policy = await getPolicyRow(incident.policyId);
|
||||
if (input.action === "raise_budget_and_resume") {
|
||||
const nextAmount = Math.max(0, Math.floor(input.amount ?? 0));
|
||||
const currentObserved = await computeObservedAmount(db, policy);
|
||||
if (nextAmount <= currentObserved) {
|
||||
throw unprocessable("New budget must exceed current observed spend");
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
await db
|
||||
.update(budgetPolicies)
|
||||
.set({
|
||||
amount: nextAmount,
|
||||
isActive: true,
|
||||
updatedByUserId: actorUserId,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(budgetPolicies.id, policy.id));
|
||||
|
||||
if (policy.scopeType === "company" && policy.windowKind === "calendar_month_utc") {
|
||||
await db
|
||||
.update(companies)
|
||||
.set({ budgetMonthlyCents: nextAmount, updatedAt: now })
|
||||
.where(eq(companies.id, policy.scopeId));
|
||||
}
|
||||
|
||||
if (policy.scopeType === "agent" && policy.windowKind === "calendar_month_utc") {
|
||||
await db
|
||||
.update(agents)
|
||||
.set({ budgetMonthlyCents: nextAmount, updatedAt: now })
|
||||
.where(eq(agents.id, policy.scopeId));
|
||||
}
|
||||
|
||||
await resumeScopeFromBudget(policy);
|
||||
await db
|
||||
.update(budgetIncidents)
|
||||
.set({
|
||||
status: "resolved",
|
||||
resolvedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(budgetIncidents.policyId, policy.id), eq(budgetIncidents.status, "open")));
|
||||
|
||||
await markApprovalStatus(db, incident.approvalId ?? null, "approved", input.decisionNote, actorUserId);
|
||||
} else {
|
||||
await db
|
||||
.update(budgetIncidents)
|
||||
.set({
|
||||
status: "dismissed",
|
||||
resolvedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(budgetIncidents.id, incident.id));
|
||||
await markApprovalStatus(db, incident.approvalId ?? null, "rejected", input.decisionNote, actorUserId);
|
||||
}
|
||||
|
||||
await logActivity(db, {
|
||||
companyId: incident.companyId,
|
||||
actorType: "user",
|
||||
actorId: actorUserId,
|
||||
action: "budget.incident_resolved",
|
||||
entityType: "budget_incident",
|
||||
entityId: incident.id,
|
||||
details: {
|
||||
action: input.action,
|
||||
amount: input.amount ?? null,
|
||||
scopeType: incident.scopeType,
|
||||
scopeId: incident.scopeId,
|
||||
},
|
||||
});
|
||||
|
||||
const [updated] = await hydrateIncidentRows([{
|
||||
...incident,
|
||||
status: input.action === "raise_budget_and_resume" ? "resolved" : "dismissed",
|
||||
resolvedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
}]);
|
||||
return updated!;
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import { eq, count } from "drizzle-orm";
|
||||
import { and, count, eq, gte, inArray, lt, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
companies,
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
heartbeatRuns,
|
||||
heartbeatRunEvents,
|
||||
costEvents,
|
||||
financeEvents,
|
||||
approvalComments,
|
||||
approvals,
|
||||
activityLog,
|
||||
@@ -53,6 +54,49 @@ export function companyService(db: Db) {
|
||||
};
|
||||
}
|
||||
|
||||
function currentUtcMonthWindow(now = new Date()) {
|
||||
const year = now.getUTCFullYear();
|
||||
const month = now.getUTCMonth();
|
||||
return {
|
||||
start: new Date(Date.UTC(year, month, 1, 0, 0, 0, 0)),
|
||||
end: new Date(Date.UTC(year, month + 1, 1, 0, 0, 0, 0)),
|
||||
};
|
||||
}
|
||||
|
||||
async function getMonthlySpendByCompanyIds(
|
||||
companyIds: string[],
|
||||
database: Pick<Db, "select"> = db,
|
||||
) {
|
||||
if (companyIds.length === 0) return new Map<string, number>();
|
||||
const { start, end } = currentUtcMonthWindow();
|
||||
const rows = await database
|
||||
.select({
|
||||
companyId: costEvents.companyId,
|
||||
spentMonthlyCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.where(
|
||||
and(
|
||||
inArray(costEvents.companyId, companyIds),
|
||||
gte(costEvents.occurredAt, start),
|
||||
lt(costEvents.occurredAt, end),
|
||||
),
|
||||
)
|
||||
.groupBy(costEvents.companyId);
|
||||
return new Map(rows.map((row) => [row.companyId, Number(row.spentMonthlyCents ?? 0)]));
|
||||
}
|
||||
|
||||
async function hydrateCompanySpend<T extends { id: string; spentMonthlyCents: number }>(
|
||||
rows: T[],
|
||||
database: Pick<Db, "select"> = db,
|
||||
) {
|
||||
const spendByCompanyId = await getMonthlySpendByCompanyIds(rows.map((row) => row.id), database);
|
||||
return rows.map((row) => ({
|
||||
...row,
|
||||
spentMonthlyCents: spendByCompanyId.get(row.id) ?? 0,
|
||||
}));
|
||||
}
|
||||
|
||||
function getCompanyQuery(database: Pick<Db, "select">) {
|
||||
return database
|
||||
.select(companySelection)
|
||||
@@ -103,13 +147,20 @@ export function companyService(db: Db) {
|
||||
}
|
||||
|
||||
return {
|
||||
list: () =>
|
||||
getCompanyQuery(db).then((rows) => rows.map((row) => enrichCompany(row))),
|
||||
list: async () => {
|
||||
const rows = await getCompanyQuery(db);
|
||||
const hydrated = await hydrateCompanySpend(rows);
|
||||
return hydrated.map((row) => enrichCompany(row));
|
||||
},
|
||||
|
||||
getById: (id: string) =>
|
||||
getCompanyQuery(db)
|
||||
getById: async (id: string) => {
|
||||
const row = await getCompanyQuery(db)
|
||||
.where(eq(companies.id, id))
|
||||
.then((rows) => (rows[0] ? enrichCompany(rows[0]) : null)),
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) return null;
|
||||
const [hydrated] = await hydrateCompanySpend([row], db);
|
||||
return enrichCompany(hydrated);
|
||||
},
|
||||
|
||||
create: async (data: typeof companies.$inferInsert) => {
|
||||
const created = await createCompanyWithUniquePrefix(data);
|
||||
@@ -117,7 +168,8 @@ export function companyService(db: Db) {
|
||||
.where(eq(companies.id, created.id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) throw notFound("Company not found after creation");
|
||||
return enrichCompany(row);
|
||||
const [hydrated] = await hydrateCompanySpend([row], db);
|
||||
return enrichCompany(hydrated);
|
||||
},
|
||||
|
||||
update: (
|
||||
@@ -174,10 +226,12 @@ export function companyService(db: Db) {
|
||||
await tx.delete(assets).where(eq(assets.id, existing.logoAssetId));
|
||||
}
|
||||
|
||||
return enrichCompany({
|
||||
const [hydrated] = await hydrateCompanySpend([{
|
||||
...updated,
|
||||
logoAssetId: logoAssetId === undefined ? existing.logoAssetId : logoAssetId,
|
||||
});
|
||||
}], tx);
|
||||
|
||||
return enrichCompany(hydrated);
|
||||
}),
|
||||
|
||||
archive: (id: string) =>
|
||||
@@ -192,7 +246,9 @@ export function companyService(db: Db) {
|
||||
const row = await getCompanyQuery(tx)
|
||||
.where(eq(companies.id, id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
return row ? enrichCompany(row) : null;
|
||||
if (!row) return null;
|
||||
const [hydrated] = await hydrateCompanySpend([row], tx);
|
||||
return enrichCompany(hydrated);
|
||||
}),
|
||||
|
||||
remove: (id: string) =>
|
||||
@@ -206,6 +262,7 @@ export function companyService(db: Db) {
|
||||
await tx.delete(agentRuntimeState).where(eq(agentRuntimeState.companyId, id));
|
||||
await tx.delete(issueComments).where(eq(issueComments.companyId, id));
|
||||
await tx.delete(costEvents).where(eq(costEvents.companyId, id));
|
||||
await tx.delete(financeEvents).where(eq(financeEvents.companyId, id));
|
||||
await tx.delete(approvalComments).where(eq(approvalComments.companyId, id));
|
||||
await tx.delete(approvals).where(eq(approvals.companyId, id));
|
||||
await tx.delete(companySecrets).where(eq(companySecrets.companyId, id));
|
||||
|
||||
@@ -1,14 +1,50 @@
|
||||
import { and, desc, eq, gte, isNotNull, lte, sql } from "drizzle-orm";
|
||||
import { and, desc, eq, gte, isNotNull, lt, lte, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { activityLog, agents, companies, costEvents, heartbeatRuns, issues, projects } from "@paperclipai/db";
|
||||
import { activityLog, agents, companies, costEvents, issues, projects } from "@paperclipai/db";
|
||||
import { notFound, unprocessable } from "../errors.js";
|
||||
import { budgetService, type BudgetServiceHooks } from "./budgets.js";
|
||||
|
||||
export interface CostDateRange {
|
||||
from?: Date;
|
||||
to?: Date;
|
||||
}
|
||||
|
||||
export function costService(db: Db) {
|
||||
const METERED_BILLING_TYPE = "metered_api";
|
||||
const SUBSCRIPTION_BILLING_TYPES = ["subscription_included", "subscription_overage"] as const;
|
||||
|
||||
function currentUtcMonthWindow(now = new Date()) {
|
||||
const year = now.getUTCFullYear();
|
||||
const month = now.getUTCMonth();
|
||||
return {
|
||||
start: new Date(Date.UTC(year, month, 1, 0, 0, 0, 0)),
|
||||
end: new Date(Date.UTC(year, month + 1, 1, 0, 0, 0, 0)),
|
||||
};
|
||||
}
|
||||
|
||||
async function getMonthlySpendTotal(
|
||||
db: Db,
|
||||
scope: { companyId: string; agentId?: string | null },
|
||||
) {
|
||||
const { start, end } = currentUtcMonthWindow();
|
||||
const conditions = [
|
||||
eq(costEvents.companyId, scope.companyId),
|
||||
gte(costEvents.occurredAt, start),
|
||||
lt(costEvents.occurredAt, end),
|
||||
];
|
||||
if (scope.agentId) {
|
||||
conditions.push(eq(costEvents.agentId, scope.agentId));
|
||||
}
|
||||
const [row] = await db
|
||||
.select({
|
||||
total: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.where(and(...conditions));
|
||||
return Number(row?.total ?? 0);
|
||||
}
|
||||
|
||||
export function costService(db: Db, budgetHooks: BudgetServiceHooks = {}) {
|
||||
const budgets = budgetService(db, budgetHooks);
|
||||
return {
|
||||
createEvent: async (companyId: string, data: Omit<typeof costEvents.$inferInsert, "companyId">) => {
|
||||
const agent = await db
|
||||
@@ -24,14 +60,25 @@ export function costService(db: Db) {
|
||||
|
||||
const event = await db
|
||||
.insert(costEvents)
|
||||
.values({ ...data, companyId })
|
||||
.values({
|
||||
...data,
|
||||
companyId,
|
||||
biller: data.biller ?? data.provider,
|
||||
billingType: data.billingType ?? "unknown",
|
||||
cachedInputTokens: data.cachedInputTokens ?? 0,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
const [agentMonthSpend, companyMonthSpend] = await Promise.all([
|
||||
getMonthlySpendTotal(db, { companyId, agentId: event.agentId }),
|
||||
getMonthlySpendTotal(db, { companyId }),
|
||||
]);
|
||||
|
||||
await db
|
||||
.update(agents)
|
||||
.set({
|
||||
spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${event.costCents}`,
|
||||
spentMonthlyCents: agentMonthSpend,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agents.id, event.agentId));
|
||||
@@ -39,29 +86,12 @@ export function costService(db: Db) {
|
||||
await db
|
||||
.update(companies)
|
||||
.set({
|
||||
spentMonthlyCents: sql`${companies.spentMonthlyCents} + ${event.costCents}`,
|
||||
spentMonthlyCents: companyMonthSpend,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(companies.id, companyId));
|
||||
|
||||
const updatedAgent = await db
|
||||
.select()
|
||||
.from(agents)
|
||||
.where(eq(agents.id, event.agentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (
|
||||
updatedAgent &&
|
||||
updatedAgent.budgetMonthlyCents > 0 &&
|
||||
updatedAgent.spentMonthlyCents >= updatedAgent.budgetMonthlyCents &&
|
||||
updatedAgent.status !== "paused" &&
|
||||
updatedAgent.status !== "terminated"
|
||||
) {
|
||||
await db
|
||||
.update(agents)
|
||||
.set({ status: "paused", updatedAt: new Date() })
|
||||
.where(eq(agents.id, updatedAgent.id));
|
||||
}
|
||||
await budgets.evaluateCostEvent(event);
|
||||
|
||||
return event;
|
||||
},
|
||||
@@ -105,52 +135,180 @@ export function costService(db: Db) {
|
||||
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
|
||||
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
|
||||
|
||||
const costRows = await db
|
||||
return db
|
||||
.select({
|
||||
agentId: costEvents.agentId,
|
||||
agentName: agents.name,
|
||||
agentStatus: agents.status,
|
||||
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
|
||||
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
|
||||
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
|
||||
apiRunCount:
|
||||
sql<number>`count(distinct case when ${costEvents.billingType} = ${METERED_BILLING_TYPE} then ${costEvents.heartbeatRunId} end)::int`,
|
||||
subscriptionRunCount:
|
||||
sql<number>`count(distinct case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.heartbeatRunId} end)::int`,
|
||||
subscriptionCachedInputTokens:
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.cachedInputTokens} else 0 end), 0)::int`,
|
||||
subscriptionInputTokens:
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.inputTokens} else 0 end), 0)::int`,
|
||||
subscriptionOutputTokens:
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.outputTokens} else 0 end), 0)::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.leftJoin(agents, eq(costEvents.agentId, agents.id))
|
||||
.where(and(...conditions))
|
||||
.groupBy(costEvents.agentId, agents.name, agents.status)
|
||||
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
|
||||
},
|
||||
|
||||
const runConditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
|
||||
if (range?.from) runConditions.push(gte(heartbeatRuns.finishedAt, range.from));
|
||||
if (range?.to) runConditions.push(lte(heartbeatRuns.finishedAt, range.to));
|
||||
byProvider: async (companyId: string, range?: CostDateRange) => {
|
||||
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
|
||||
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
|
||||
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
|
||||
|
||||
const runRows = await db
|
||||
return db
|
||||
.select({
|
||||
agentId: heartbeatRuns.agentId,
|
||||
provider: costEvents.provider,
|
||||
biller: costEvents.biller,
|
||||
billingType: costEvents.billingType,
|
||||
model: costEvents.model,
|
||||
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
|
||||
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
|
||||
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
|
||||
apiRunCount:
|
||||
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'api' then 1 else 0 end), 0)::int`,
|
||||
sql<number>`count(distinct case when ${costEvents.billingType} = ${METERED_BILLING_TYPE} then ${costEvents.heartbeatRunId} end)::int`,
|
||||
subscriptionRunCount:
|
||||
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then 1 else 0 end), 0)::int`,
|
||||
sql<number>`count(distinct case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.heartbeatRunId} end)::int`,
|
||||
subscriptionCachedInputTokens:
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.cachedInputTokens} else 0 end), 0)::int`,
|
||||
subscriptionInputTokens:
|
||||
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0) else 0 end), 0)::int`,
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.inputTokens} else 0 end), 0)::int`,
|
||||
subscriptionOutputTokens:
|
||||
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0) else 0 end), 0)::int`,
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.outputTokens} else 0 end), 0)::int`,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(and(...runConditions))
|
||||
.groupBy(heartbeatRuns.agentId);
|
||||
.from(costEvents)
|
||||
.where(and(...conditions))
|
||||
.groupBy(costEvents.provider, costEvents.biller, costEvents.billingType, costEvents.model)
|
||||
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
|
||||
},
|
||||
|
||||
const runRowsByAgent = new Map(runRows.map((row) => [row.agentId, row]));
|
||||
return costRows.map((row) => {
|
||||
const runRow = runRowsByAgent.get(row.agentId);
|
||||
return {
|
||||
...row,
|
||||
apiRunCount: runRow?.apiRunCount ?? 0,
|
||||
subscriptionRunCount: runRow?.subscriptionRunCount ?? 0,
|
||||
subscriptionInputTokens: runRow?.subscriptionInputTokens ?? 0,
|
||||
subscriptionOutputTokens: runRow?.subscriptionOutputTokens ?? 0,
|
||||
};
|
||||
});
|
||||
byBiller: async (companyId: string, range?: CostDateRange) => {
|
||||
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
|
||||
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
|
||||
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
|
||||
|
||||
return db
|
||||
.select({
|
||||
biller: costEvents.biller,
|
||||
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
|
||||
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
|
||||
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
|
||||
apiRunCount:
|
||||
sql<number>`count(distinct case when ${costEvents.billingType} = ${METERED_BILLING_TYPE} then ${costEvents.heartbeatRunId} end)::int`,
|
||||
subscriptionRunCount:
|
||||
sql<number>`count(distinct case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.heartbeatRunId} end)::int`,
|
||||
subscriptionCachedInputTokens:
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.cachedInputTokens} else 0 end), 0)::int`,
|
||||
subscriptionInputTokens:
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.inputTokens} else 0 end), 0)::int`,
|
||||
subscriptionOutputTokens:
|
||||
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.outputTokens} else 0 end), 0)::int`,
|
||||
providerCount: sql<number>`count(distinct ${costEvents.provider})::int`,
|
||||
modelCount: sql<number>`count(distinct ${costEvents.model})::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.where(and(...conditions))
|
||||
.groupBy(costEvents.biller)
|
||||
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
|
||||
},
|
||||
|
||||
/**
|
||||
* aggregates cost_events by provider for each of three rolling windows:
|
||||
* last 5 hours, last 24 hours, last 7 days.
|
||||
* purely internal consumption data, no external rate-limit sources.
|
||||
*/
|
||||
windowSpend: async (companyId: string) => {
|
||||
const windows = [
|
||||
{ label: "5h", hours: 5 },
|
||||
{ label: "24h", hours: 24 },
|
||||
{ label: "7d", hours: 168 },
|
||||
] as const;
|
||||
|
||||
const results = await Promise.all(
|
||||
windows.map(async ({ label, hours }) => {
|
||||
const since = new Date(Date.now() - hours * 60 * 60 * 1000);
|
||||
const rows = await db
|
||||
.select({
|
||||
provider: costEvents.provider,
|
||||
biller: sql<string>`case when count(distinct ${costEvents.biller}) = 1 then min(${costEvents.biller}) else 'mixed' end`,
|
||||
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
|
||||
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
|
||||
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.where(
|
||||
and(
|
||||
eq(costEvents.companyId, companyId),
|
||||
gte(costEvents.occurredAt, since),
|
||||
),
|
||||
)
|
||||
.groupBy(costEvents.provider)
|
||||
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
|
||||
|
||||
return rows.map((row) => ({
|
||||
provider: row.provider,
|
||||
biller: row.biller,
|
||||
window: label as string,
|
||||
windowHours: hours,
|
||||
costCents: row.costCents,
|
||||
inputTokens: row.inputTokens,
|
||||
cachedInputTokens: row.cachedInputTokens,
|
||||
outputTokens: row.outputTokens,
|
||||
}));
|
||||
}),
|
||||
);
|
||||
|
||||
return results.flat();
|
||||
},
|
||||
|
||||
byAgentModel: async (companyId: string, range?: CostDateRange) => {
|
||||
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
|
||||
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
|
||||
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
|
||||
|
||||
// single query: group by agent + provider + model.
|
||||
// the (companyId, agentId, occurredAt) composite index covers this well.
|
||||
// order by provider + model for stable db-level ordering; cost-desc sort
|
||||
// within each agent's sub-rows is done client-side in the ui memo.
|
||||
return db
|
||||
.select({
|
||||
agentId: costEvents.agentId,
|
||||
agentName: agents.name,
|
||||
provider: costEvents.provider,
|
||||
biller: costEvents.biller,
|
||||
billingType: costEvents.billingType,
|
||||
model: costEvents.model,
|
||||
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
|
||||
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
|
||||
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
|
||||
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
|
||||
})
|
||||
.from(costEvents)
|
||||
.leftJoin(agents, eq(costEvents.agentId, agents.id))
|
||||
.where(and(...conditions))
|
||||
.groupBy(
|
||||
costEvents.agentId,
|
||||
agents.name,
|
||||
costEvents.provider,
|
||||
costEvents.biller,
|
||||
costEvents.billingType,
|
||||
costEvents.model,
|
||||
)
|
||||
.orderBy(costEvents.provider, costEvents.biller, costEvents.billingType, costEvents.model);
|
||||
},
|
||||
|
||||
byProject: async (companyId: string, range?: CostDateRange) => {
|
||||
@@ -179,25 +337,27 @@ export function costService(db: Db) {
|
||||
.orderBy(activityLog.runId, issues.projectId, desc(activityLog.createdAt))
|
||||
.as("run_project_links");
|
||||
|
||||
const conditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
|
||||
if (range?.from) conditions.push(gte(heartbeatRuns.finishedAt, range.from));
|
||||
if (range?.to) conditions.push(lte(heartbeatRuns.finishedAt, range.to));
|
||||
const effectiveProjectId = sql<string | null>`coalesce(${costEvents.projectId}, ${runProjectLinks.projectId})`;
|
||||
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
|
||||
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
|
||||
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
|
||||
|
||||
const costCentsExpr = sql<number>`coalesce(sum(round(coalesce((${heartbeatRuns.usageJson} ->> 'costUsd')::numeric, 0) * 100)), 0)::int`;
|
||||
const costCentsExpr = sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`;
|
||||
|
||||
return db
|
||||
.select({
|
||||
projectId: runProjectLinks.projectId,
|
||||
projectId: effectiveProjectId,
|
||||
projectName: projects.name,
|
||||
costCents: costCentsExpr,
|
||||
inputTokens: sql<number>`coalesce(sum(coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0)), 0)::int`,
|
||||
outputTokens: sql<number>`coalesce(sum(coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0)), 0)::int`,
|
||||
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
|
||||
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
|
||||
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
|
||||
})
|
||||
.from(runProjectLinks)
|
||||
.innerJoin(heartbeatRuns, eq(runProjectLinks.runId, heartbeatRuns.id))
|
||||
.innerJoin(projects, eq(runProjectLinks.projectId, projects.id))
|
||||
.where(and(...conditions))
|
||||
.groupBy(runProjectLinks.projectId, projects.name)
|
||||
.from(costEvents)
|
||||
.leftJoin(runProjectLinks, eq(costEvents.heartbeatRunId, runProjectLinks.runId))
|
||||
.innerJoin(projects, sql`${projects.id} = ${effectiveProjectId}`)
|
||||
.where(and(...conditions, sql`${effectiveProjectId} is not null`))
|
||||
.groupBy(effectiveProjectId, projects.name)
|
||||
.orderBy(desc(costCentsExpr));
|
||||
},
|
||||
};
|
||||
|
||||
@@ -2,8 +2,10 @@ import { and, eq, gte, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { agents, approvals, companies, costEvents, issues } from "@paperclipai/db";
|
||||
import { notFound } from "../errors.js";
|
||||
import { budgetService } from "./budgets.js";
|
||||
|
||||
export function dashboardService(db: Db) {
|
||||
const budgets = budgetService(db);
|
||||
return {
|
||||
summary: async (companyId: string) => {
|
||||
const company = await db
|
||||
@@ -78,6 +80,7 @@ export function dashboardService(db: Db) {
|
||||
company.budgetMonthlyCents > 0
|
||||
? (monthSpendCents / company.budgetMonthlyCents) * 100
|
||||
: 0;
|
||||
const budgetOverview = await budgets.overview(companyId);
|
||||
|
||||
return {
|
||||
companyId,
|
||||
@@ -94,6 +97,12 @@ export function dashboardService(db: Db) {
|
||||
monthUtilizationPercent: Number(utilization.toFixed(2)),
|
||||
},
|
||||
pendingApprovals,
|
||||
budgets: {
|
||||
activeIncidents: budgetOverview.activeIncidents.length,
|
||||
pendingApprovals: budgetOverview.pendingApprovalCount,
|
||||
pausedAgents: budgetOverview.pausedAgentCount,
|
||||
pausedProjects: budgetOverview.pausedProjectCount,
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
134
server/src/services/finance.ts
Normal file
134
server/src/services/finance.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
import { and, desc, eq, gte, lte, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { agents, costEvents, financeEvents, goals, heartbeatRuns, issues, projects } from "@paperclipai/db";
|
||||
import { notFound, unprocessable } from "../errors.js";
|
||||
|
||||
export interface FinanceDateRange {
|
||||
from?: Date;
|
||||
to?: Date;
|
||||
}
|
||||
|
||||
async function assertBelongsToCompany(
|
||||
db: Db,
|
||||
table: any,
|
||||
id: string,
|
||||
companyId: string,
|
||||
label: string,
|
||||
) {
|
||||
const row = await db
|
||||
.select()
|
||||
.from(table)
|
||||
.where(eq(table.id, id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!row) throw notFound(`${label} not found`);
|
||||
if ((row as unknown as { companyId: string }).companyId !== companyId) {
|
||||
throw unprocessable(`${label} does not belong to company`);
|
||||
}
|
||||
}
|
||||
|
||||
function rangeConditions(companyId: string, range?: FinanceDateRange) {
|
||||
const conditions: ReturnType<typeof eq>[] = [eq(financeEvents.companyId, companyId)];
|
||||
if (range?.from) conditions.push(gte(financeEvents.occurredAt, range.from));
|
||||
if (range?.to) conditions.push(lte(financeEvents.occurredAt, range.to));
|
||||
return conditions;
|
||||
}
|
||||
|
||||
export function financeService(db: Db) {
|
||||
const debitExpr = sql<number>`coalesce(sum(case when ${financeEvents.direction} = 'debit' then ${financeEvents.amountCents} else 0 end), 0)::int`;
|
||||
const creditExpr = sql<number>`coalesce(sum(case when ${financeEvents.direction} = 'credit' then ${financeEvents.amountCents} else 0 end), 0)::int`;
|
||||
const estimatedDebitExpr = sql<number>`coalesce(sum(case when ${financeEvents.direction} = 'debit' and ${financeEvents.estimated} = true then ${financeEvents.amountCents} else 0 end), 0)::int`;
|
||||
|
||||
return {
|
||||
createEvent: async (companyId: string, data: Omit<typeof financeEvents.$inferInsert, "companyId">) => {
|
||||
if (data.agentId) await assertBelongsToCompany(db, agents, data.agentId, companyId, "Agent");
|
||||
if (data.issueId) await assertBelongsToCompany(db, issues, data.issueId, companyId, "Issue");
|
||||
if (data.projectId) await assertBelongsToCompany(db, projects, data.projectId, companyId, "Project");
|
||||
if (data.goalId) await assertBelongsToCompany(db, goals, data.goalId, companyId, "Goal");
|
||||
if (data.heartbeatRunId) await assertBelongsToCompany(db, heartbeatRuns, data.heartbeatRunId, companyId, "Heartbeat run");
|
||||
if (data.costEventId) await assertBelongsToCompany(db, costEvents, data.costEventId, companyId, "Cost event");
|
||||
|
||||
const event = await db
|
||||
.insert(financeEvents)
|
||||
.values({
|
||||
...data,
|
||||
companyId,
|
||||
currency: data.currency ?? "USD",
|
||||
direction: data.direction ?? "debit",
|
||||
estimated: data.estimated ?? false,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
return event;
|
||||
},
|
||||
|
||||
summary: async (companyId: string, range?: FinanceDateRange) => {
|
||||
const conditions = rangeConditions(companyId, range);
|
||||
const [row] = await db
|
||||
.select({
|
||||
debitCents: debitExpr,
|
||||
creditCents: creditExpr,
|
||||
estimatedDebitCents: estimatedDebitExpr,
|
||||
eventCount: sql<number>`count(*)::int`,
|
||||
})
|
||||
.from(financeEvents)
|
||||
.where(and(...conditions));
|
||||
|
||||
return {
|
||||
companyId,
|
||||
debitCents: Number(row?.debitCents ?? 0),
|
||||
creditCents: Number(row?.creditCents ?? 0),
|
||||
netCents: Number(row?.debitCents ?? 0) - Number(row?.creditCents ?? 0),
|
||||
estimatedDebitCents: Number(row?.estimatedDebitCents ?? 0),
|
||||
eventCount: Number(row?.eventCount ?? 0),
|
||||
};
|
||||
},
|
||||
|
||||
byBiller: async (companyId: string, range?: FinanceDateRange) => {
|
||||
const conditions = rangeConditions(companyId, range);
|
||||
return db
|
||||
.select({
|
||||
biller: financeEvents.biller,
|
||||
debitCents: debitExpr,
|
||||
creditCents: creditExpr,
|
||||
estimatedDebitCents: estimatedDebitExpr,
|
||||
eventCount: sql<number>`count(*)::int`,
|
||||
kindCount: sql<number>`count(distinct ${financeEvents.eventKind})::int`,
|
||||
netCents: sql<number>`(${debitExpr} - ${creditExpr})::int`,
|
||||
})
|
||||
.from(financeEvents)
|
||||
.where(and(...conditions))
|
||||
.groupBy(financeEvents.biller)
|
||||
.orderBy(desc(sql`(${debitExpr} - ${creditExpr})::int`), financeEvents.biller);
|
||||
},
|
||||
|
||||
byKind: async (companyId: string, range?: FinanceDateRange) => {
|
||||
const conditions = rangeConditions(companyId, range);
|
||||
return db
|
||||
.select({
|
||||
eventKind: financeEvents.eventKind,
|
||||
debitCents: debitExpr,
|
||||
creditCents: creditExpr,
|
||||
estimatedDebitCents: estimatedDebitExpr,
|
||||
eventCount: sql<number>`count(*)::int`,
|
||||
billerCount: sql<number>`count(distinct ${financeEvents.biller})::int`,
|
||||
netCents: sql<number>`(${debitExpr} - ${creditExpr})::int`,
|
||||
})
|
||||
.from(financeEvents)
|
||||
.where(and(...conditions))
|
||||
.groupBy(financeEvents.eventKind)
|
||||
.orderBy(desc(sql`(${debitExpr} - ${creditExpr})::int`), financeEvents.eventKind);
|
||||
},
|
||||
|
||||
list: async (companyId: string, range?: FinanceDateRange, limit: number = 100) => {
|
||||
const conditions = rangeConditions(companyId, range);
|
||||
return db
|
||||
.select()
|
||||
.from(financeEvents)
|
||||
.where(and(...conditions))
|
||||
.orderBy(desc(financeEvents.occurredAt), desc(financeEvents.createdAt))
|
||||
.limit(limit);
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import { execFile as execFileCallback } from "node:child_process";
|
||||
import { promisify } from "node:util";
|
||||
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import type { BillingType } from "@paperclipai/shared";
|
||||
import {
|
||||
agents,
|
||||
agentRuntimeState,
|
||||
@@ -24,6 +25,7 @@ import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec
|
||||
import { createLocalAgentJwt } from "../agent-auth-jwt.js";
|
||||
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
|
||||
import { costService } from "./costs.js";
|
||||
import { budgetService, type BudgetEnforcementScope } from "./budgets.js";
|
||||
import { secretService } from "./secrets.js";
|
||||
import { resolveDefaultAgentWorkspaceDir, resolveManagedProjectWorkspaceDir } from "../home-paths.js";
|
||||
import { summarizeHeartbeatRunResultJson } from "./heartbeat-run-summary.js";
|
||||
@@ -251,6 +253,67 @@ function readNonEmptyString(value: unknown): string | null {
|
||||
return typeof value === "string" && value.trim().length > 0 ? value : null;
|
||||
}
|
||||
|
||||
function normalizeLedgerBillingType(value: unknown): BillingType {
|
||||
const raw = readNonEmptyString(value);
|
||||
switch (raw) {
|
||||
case "api":
|
||||
case "metered_api":
|
||||
return "metered_api";
|
||||
case "subscription":
|
||||
case "subscription_included":
|
||||
return "subscription_included";
|
||||
case "subscription_overage":
|
||||
return "subscription_overage";
|
||||
case "credits":
|
||||
return "credits";
|
||||
case "fixed":
|
||||
return "fixed";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
function resolveLedgerBiller(result: AdapterExecutionResult): string {
|
||||
return readNonEmptyString(result.biller) ?? readNonEmptyString(result.provider) ?? "unknown";
|
||||
}
|
||||
|
||||
function normalizeBilledCostCents(costUsd: number | null | undefined, billingType: BillingType): number {
|
||||
if (billingType === "subscription_included") return 0;
|
||||
if (typeof costUsd !== "number" || !Number.isFinite(costUsd)) return 0;
|
||||
return Math.max(0, Math.round(costUsd * 100));
|
||||
}
|
||||
|
||||
async function resolveLedgerScopeForRun(
|
||||
db: Db,
|
||||
companyId: string,
|
||||
run: typeof heartbeatRuns.$inferSelect,
|
||||
) {
|
||||
const context = parseObject(run.contextSnapshot);
|
||||
const contextIssueId = readNonEmptyString(context.issueId);
|
||||
const contextProjectId = readNonEmptyString(context.projectId);
|
||||
|
||||
if (!contextIssueId) {
|
||||
return {
|
||||
issueId: null,
|
||||
projectId: contextProjectId,
|
||||
};
|
||||
}
|
||||
|
||||
const issue = await db
|
||||
.select({
|
||||
id: issues.id,
|
||||
projectId: issues.projectId,
|
||||
})
|
||||
.from(issues)
|
||||
.where(and(eq(issues.id, contextIssueId), eq(issues.companyId, companyId)))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
return {
|
||||
issueId: issue?.id ?? null,
|
||||
projectId: issue?.projectId ?? contextProjectId,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeUsageTotals(usage: UsageSummary | null | undefined): UsageTotals | null {
|
||||
if (!usage) return null;
|
||||
return {
|
||||
@@ -639,6 +702,10 @@ export function heartbeatService(db: Db) {
|
||||
const issuesSvc = issueService(db);
|
||||
const executionWorkspacesSvc = executionWorkspaceService(db);
|
||||
const activeRunExecutions = new Set<string>();
|
||||
const budgetHooks = {
|
||||
cancelWorkForScope: cancelBudgetScopeWork,
|
||||
};
|
||||
const budgets = budgetService(db, budgetHooks);
|
||||
|
||||
async function getAgent(agentId: string) {
|
||||
return db
|
||||
@@ -1281,6 +1348,26 @@ export function heartbeatService(db: Db) {
|
||||
|
||||
async function claimQueuedRun(run: typeof heartbeatRuns.$inferSelect) {
|
||||
if (run.status !== "queued") return run;
|
||||
const agent = await getAgent(run.agentId);
|
||||
if (!agent) {
|
||||
await cancelRunInternal(run.id, "Cancelled because the agent no longer exists");
|
||||
return null;
|
||||
}
|
||||
if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") {
|
||||
await cancelRunInternal(run.id, "Cancelled because the agent is not invokable");
|
||||
return null;
|
||||
}
|
||||
|
||||
const context = parseObject(run.contextSnapshot);
|
||||
const budgetBlock = await budgets.getInvocationBlock(run.companyId, run.agentId, {
|
||||
issueId: readNonEmptyString(context.issueId),
|
||||
projectId: readNonEmptyString(context.projectId),
|
||||
});
|
||||
if (budgetBlock) {
|
||||
await cancelRunInternal(run.id, budgetBlock.reason);
|
||||
return null;
|
||||
}
|
||||
|
||||
const claimedAt = new Date();
|
||||
const claimed = await db
|
||||
.update(heartbeatRuns)
|
||||
@@ -1436,8 +1523,12 @@ export function heartbeatService(db: Db) {
|
||||
const inputTokens = usage?.inputTokens ?? 0;
|
||||
const outputTokens = usage?.outputTokens ?? 0;
|
||||
const cachedInputTokens = usage?.cachedInputTokens ?? 0;
|
||||
const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100));
|
||||
const billingType = normalizeLedgerBillingType(result.billingType);
|
||||
const additionalCostCents = normalizeBilledCostCents(result.costUsd, billingType);
|
||||
const hasTokenUsage = inputTokens > 0 || outputTokens > 0 || cachedInputTokens > 0;
|
||||
const provider = result.provider ?? "unknown";
|
||||
const biller = resolveLedgerBiller(result);
|
||||
const ledgerScope = await resolveLedgerScopeForRun(db, agent.companyId, run);
|
||||
|
||||
await db
|
||||
.update(agentRuntimeState)
|
||||
@@ -1456,12 +1547,18 @@ export function heartbeatService(db: Db) {
|
||||
.where(eq(agentRuntimeState.agentId, agent.id));
|
||||
|
||||
if (additionalCostCents > 0 || hasTokenUsage) {
|
||||
const costs = costService(db);
|
||||
const costs = costService(db, budgetHooks);
|
||||
await costs.createEvent(agent.companyId, {
|
||||
heartbeatRunId: run.id,
|
||||
agentId: agent.id,
|
||||
provider: result.provider ?? "unknown",
|
||||
issueId: ledgerScope.issueId,
|
||||
projectId: ledgerScope.projectId,
|
||||
provider,
|
||||
biller,
|
||||
billingType,
|
||||
model: result.model ?? "unknown",
|
||||
inputTokens,
|
||||
cachedInputTokens,
|
||||
outputTokens,
|
||||
costCents: additionalCostCents,
|
||||
occurredAt: new Date(),
|
||||
@@ -1473,6 +1570,9 @@ export function heartbeatService(db: Db) {
|
||||
return withAgentStartLock(agentId, async () => {
|
||||
const agent = await getAgent(agentId);
|
||||
if (!agent) return [];
|
||||
if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") {
|
||||
return [];
|
||||
}
|
||||
const policy = parseHeartbeatPolicy(agent);
|
||||
const runningCount = await countRunningRunsForAgent(agentId);
|
||||
const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount);
|
||||
@@ -2086,8 +2186,11 @@ export function heartbeatService(db: Db) {
|
||||
freshSession: runtimeForAdapter.sessionId == null && runtimeForAdapter.sessionDisplayId == null,
|
||||
sessionRotated: sessionCompaction.rotate,
|
||||
sessionRotationReason: sessionCompaction.reason,
|
||||
provider: readNonEmptyString(adapterResult.provider) ?? "unknown",
|
||||
biller: resolveLedgerBiller(adapterResult),
|
||||
model: readNonEmptyString(adapterResult.model) ?? "unknown",
|
||||
...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}),
|
||||
...(adapterResult.billingType ? { billingType: adapterResult.billingType } : {}),
|
||||
billingType: normalizeLedgerBillingType(adapterResult.billingType),
|
||||
} as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
@@ -2437,6 +2540,43 @@ export function heartbeatService(db: Db) {
|
||||
const agent = await getAgent(agentId);
|
||||
if (!agent) throw notFound("Agent not found");
|
||||
|
||||
const writeSkippedRequest = async (skipReason: string) => {
|
||||
await db.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason: skipReason,
|
||||
payload,
|
||||
status: "skipped",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
};
|
||||
|
||||
let projectId = readNonEmptyString(enrichedContextSnapshot.projectId);
|
||||
if (!projectId && issueId) {
|
||||
projectId = await db
|
||||
.select({ projectId: issues.projectId })
|
||||
.from(issues)
|
||||
.where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId)))
|
||||
.then((rows) => rows[0]?.projectId ?? null);
|
||||
}
|
||||
|
||||
const budgetBlock = await budgets.getInvocationBlock(agent.companyId, agentId, {
|
||||
issueId,
|
||||
projectId,
|
||||
});
|
||||
if (budgetBlock) {
|
||||
await writeSkippedRequest("budget.blocked");
|
||||
throw conflict(budgetBlock.reason, {
|
||||
scopeType: budgetBlock.scopeType,
|
||||
scopeId: budgetBlock.scopeId,
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
agent.status === "paused" ||
|
||||
agent.status === "terminated" ||
|
||||
@@ -2446,21 +2586,6 @@ export function heartbeatService(db: Db) {
|
||||
}
|
||||
|
||||
const policy = parseHeartbeatPolicy(agent);
|
||||
const writeSkippedRequest = async (reason: string) => {
|
||||
await db.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason,
|
||||
payload,
|
||||
status: "skipped",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
};
|
||||
|
||||
if (source === "timer" && !policy.enabled) {
|
||||
await writeSkippedRequest("heartbeat.disabled");
|
||||
@@ -2870,6 +2995,205 @@ export function heartbeatService(db: Db) {
|
||||
return newRun;
|
||||
}
|
||||
|
||||
async function listProjectScopedRunIds(companyId: string, projectId: string) {
|
||||
const runIssueId = sql<string | null>`${heartbeatRuns.contextSnapshot} ->> 'issueId'`;
|
||||
const effectiveProjectId = sql<string | null>`coalesce(${heartbeatRuns.contextSnapshot} ->> 'projectId', ${issues.projectId}::text)`;
|
||||
|
||||
const rows = await db
|
||||
.selectDistinctOn([heartbeatRuns.id], { id: heartbeatRuns.id })
|
||||
.from(heartbeatRuns)
|
||||
.leftJoin(
|
||||
issues,
|
||||
and(
|
||||
eq(issues.companyId, companyId),
|
||||
sql`${issues.id}::text = ${runIssueId}`,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
sql`${effectiveProjectId} = ${projectId}`,
|
||||
),
|
||||
);
|
||||
|
||||
return rows.map((row) => row.id);
|
||||
}
|
||||
|
||||
async function listProjectScopedWakeupIds(companyId: string, projectId: string) {
|
||||
const wakeIssueId = sql<string | null>`${agentWakeupRequests.payload} ->> 'issueId'`;
|
||||
const effectiveProjectId = sql<string | null>`coalesce(${agentWakeupRequests.payload} ->> 'projectId', ${issues.projectId}::text)`;
|
||||
|
||||
const rows = await db
|
||||
.selectDistinctOn([agentWakeupRequests.id], { id: agentWakeupRequests.id })
|
||||
.from(agentWakeupRequests)
|
||||
.leftJoin(
|
||||
issues,
|
||||
and(
|
||||
eq(issues.companyId, companyId),
|
||||
sql`${issues.id}::text = ${wakeIssueId}`,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, companyId),
|
||||
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
|
||||
sql`${agentWakeupRequests.runId} is null`,
|
||||
sql`${effectiveProjectId} = ${projectId}`,
|
||||
),
|
||||
);
|
||||
|
||||
return rows.map((row) => row.id);
|
||||
}
|
||||
|
||||
async function cancelPendingWakeupsForBudgetScope(scope: BudgetEnforcementScope) {
|
||||
const now = new Date();
|
||||
let wakeupIds: string[] = [];
|
||||
|
||||
if (scope.scopeType === "company") {
|
||||
wakeupIds = await db
|
||||
.select({ id: agentWakeupRequests.id })
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, scope.companyId),
|
||||
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
|
||||
sql`${agentWakeupRequests.runId} is null`,
|
||||
),
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.id));
|
||||
} else if (scope.scopeType === "agent") {
|
||||
wakeupIds = await db
|
||||
.select({ id: agentWakeupRequests.id })
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, scope.companyId),
|
||||
eq(agentWakeupRequests.agentId, scope.scopeId),
|
||||
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
|
||||
sql`${agentWakeupRequests.runId} is null`,
|
||||
),
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.id));
|
||||
} else {
|
||||
wakeupIds = await listProjectScopedWakeupIds(scope.companyId, scope.scopeId);
|
||||
}
|
||||
|
||||
if (wakeupIds.length === 0) return 0;
|
||||
|
||||
await db
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
status: "cancelled",
|
||||
finishedAt: now,
|
||||
error: "Cancelled due to budget pause",
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(inArray(agentWakeupRequests.id, wakeupIds));
|
||||
|
||||
return wakeupIds.length;
|
||||
}
|
||||
|
||||
async function cancelRunInternal(runId: string, reason = "Cancelled by control plane") {
|
||||
const run = await getRun(runId);
|
||||
if (!run) throw notFound("Heartbeat run not found");
|
||||
if (run.status !== "running" && run.status !== "queued") return run;
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
const graceMs = Math.max(1, running.graceSec) * 1000;
|
||||
setTimeout(() => {
|
||||
if (!running.child.killed) {
|
||||
running.child.kill("SIGKILL");
|
||||
}
|
||||
}, graceMs);
|
||||
}
|
||||
|
||||
const cancelled = await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
});
|
||||
|
||||
if (cancelled) {
|
||||
await appendRunEvent(cancelled, 1, {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: "run cancelled",
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(cancelled);
|
||||
}
|
||||
|
||||
runningProcesses.delete(run.id);
|
||||
await finalizeAgentStatus(run.agentId, "cancelled");
|
||||
await startNextQueuedRunForAgent(run.agentId);
|
||||
return cancelled;
|
||||
}
|
||||
|
||||
async function cancelActiveForAgentInternal(agentId: string, reason = "Cancelled due to agent pause") {
|
||||
const runs = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
||||
|
||||
for (const run of runs) {
|
||||
await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
});
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
runningProcesses.delete(run.id);
|
||||
}
|
||||
await releaseIssueExecutionAndPromote(run);
|
||||
}
|
||||
|
||||
return runs.length;
|
||||
}
|
||||
|
||||
async function cancelBudgetScopeWork(scope: BudgetEnforcementScope) {
|
||||
if (scope.scopeType === "agent") {
|
||||
await cancelActiveForAgentInternal(scope.scopeId, "Cancelled due to budget pause");
|
||||
await cancelPendingWakeupsForBudgetScope(scope);
|
||||
return;
|
||||
}
|
||||
|
||||
const runIds =
|
||||
scope.scopeType === "company"
|
||||
? await db
|
||||
.select({ id: heartbeatRuns.id })
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, scope.companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.id))
|
||||
: await listProjectScopedRunIds(scope.companyId, scope.scopeId);
|
||||
|
||||
for (const runId of runIds) {
|
||||
await cancelRunInternal(runId, "Cancelled due to budget pause");
|
||||
}
|
||||
|
||||
await cancelPendingWakeupsForBudgetScope(scope);
|
||||
}
|
||||
|
||||
return {
|
||||
list: async (companyId: string, agentId?: string, limit?: number) => {
|
||||
const query = db
|
||||
@@ -3042,77 +3366,11 @@ export function heartbeatService(db: Db) {
|
||||
return { checked, enqueued, skipped };
|
||||
},
|
||||
|
||||
cancelRun: async (runId: string) => {
|
||||
const run = await getRun(runId);
|
||||
if (!run) throw notFound("Heartbeat run not found");
|
||||
if (run.status !== "running" && run.status !== "queued") return run;
|
||||
cancelRun: (runId: string) => cancelRunInternal(runId),
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
const graceMs = Math.max(1, running.graceSec) * 1000;
|
||||
setTimeout(() => {
|
||||
if (!running.child.killed) {
|
||||
running.child.kill("SIGKILL");
|
||||
}
|
||||
}, graceMs);
|
||||
}
|
||||
cancelActiveForAgent: (agentId: string) => cancelActiveForAgentInternal(agentId),
|
||||
|
||||
const cancelled = await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled by control plane",
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled by control plane",
|
||||
});
|
||||
|
||||
if (cancelled) {
|
||||
await appendRunEvent(cancelled, 1, {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: "run cancelled",
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(cancelled);
|
||||
}
|
||||
|
||||
runningProcesses.delete(run.id);
|
||||
await finalizeAgentStatus(run.agentId, "cancelled");
|
||||
await startNextQueuedRunForAgent(run.agentId);
|
||||
return cancelled;
|
||||
},
|
||||
|
||||
cancelActiveForAgent: async (agentId: string) => {
|
||||
const runs = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
||||
|
||||
for (const run of runs) {
|
||||
await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled due to agent pause",
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled due to agent pause",
|
||||
});
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
runningProcesses.delete(run.id);
|
||||
}
|
||||
await releaseIssueExecutionAndPromote(run);
|
||||
}
|
||||
|
||||
return runs.length;
|
||||
},
|
||||
cancelBudgetScopeWork,
|
||||
|
||||
getActiveRunForAgent: async (agentId: string) => {
|
||||
const [run] = await db
|
||||
|
||||
@@ -8,8 +8,10 @@ export { issueApprovalService } from "./issue-approvals.js";
|
||||
export { goalService } from "./goals.js";
|
||||
export { activityService, type ActivityFilters } from "./activity.js";
|
||||
export { approvalService } from "./approvals.js";
|
||||
export { budgetService } from "./budgets.js";
|
||||
export { secretService } from "./secrets.js";
|
||||
export { costService } from "./costs.js";
|
||||
export { financeService } from "./finance.js";
|
||||
export { heartbeatService } from "./heartbeat.js";
|
||||
export { dashboardService } from "./dashboard.js";
|
||||
export { sidebarBadgeService } from "./sidebar-badges.js";
|
||||
|
||||
64
server/src/services/quota-windows.ts
Normal file
64
server/src/services/quota-windows.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import type { ProviderQuotaResult } from "@paperclipai/shared";
|
||||
import { listServerAdapters } from "../adapters/registry.js";
|
||||
|
||||
const QUOTA_PROVIDER_TIMEOUT_MS = 20_000;
|
||||
|
||||
function providerSlugForAdapterType(type: string): string {
|
||||
switch (type) {
|
||||
case "claude_local":
|
||||
return "anthropic";
|
||||
case "codex_local":
|
||||
return "openai";
|
||||
default:
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asks each registered adapter for its provider quota windows and aggregates the results.
|
||||
* Adapters that don't implement getQuotaWindows() are silently skipped.
|
||||
* Individual adapter failures are caught and returned as error results rather than
|
||||
* letting one provider's outage block the entire response.
|
||||
*/
|
||||
export async function fetchAllQuotaWindows(): Promise<ProviderQuotaResult[]> {
|
||||
const adapters = listServerAdapters().filter((a) => a.getQuotaWindows != null);
|
||||
|
||||
const settled = await Promise.allSettled(
|
||||
adapters.map((adapter) => withQuotaTimeout(adapter.type, adapter.getQuotaWindows!())),
|
||||
);
|
||||
|
||||
return settled.map((result, i) => {
|
||||
if (result.status === "fulfilled") return result.value;
|
||||
const adapterType = adapters[i]!.type;
|
||||
return {
|
||||
provider: providerSlugForAdapterType(adapterType),
|
||||
ok: false,
|
||||
error: String(result.reason),
|
||||
windows: [],
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
async function withQuotaTimeout(
|
||||
adapterType: string,
|
||||
task: Promise<ProviderQuotaResult>,
|
||||
): Promise<ProviderQuotaResult> {
|
||||
let timeoutId: NodeJS.Timeout | null = null;
|
||||
try {
|
||||
return await Promise.race([
|
||||
task,
|
||||
new Promise<ProviderQuotaResult>((resolve) => {
|
||||
timeoutId = setTimeout(() => {
|
||||
resolve({
|
||||
provider: providerSlugForAdapterType(adapterType),
|
||||
ok: false,
|
||||
error: `quota polling timed out after ${Math.round(QUOTA_PROVIDER_TIMEOUT_MS / 1000)}s`,
|
||||
windows: [],
|
||||
});
|
||||
}, QUOTA_PROVIDER_TIMEOUT_MS);
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
if (timeoutId) clearTimeout(timeoutId);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user