Files
paperclip/server/src/services/costs.ts

320 lines
13 KiB
TypeScript

import { and, desc, eq, gte, isNotNull, lte, sql } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import { activityLog, agents, companies, costEvents, heartbeatRuns, issues, projects } from "@paperclipai/db";
import { notFound, unprocessable } from "../errors.js";
export interface CostDateRange {
from?: Date;
to?: Date;
}
export function costService(db: Db) {
return {
createEvent: async (companyId: string, data: Omit<typeof costEvents.$inferInsert, "companyId">) => {
const agent = await db
.select()
.from(agents)
.where(eq(agents.id, data.agentId))
.then((rows) => rows[0] ?? null);
if (!agent) throw notFound("Agent not found");
if (agent.companyId !== companyId) {
throw unprocessable("Agent does not belong to company");
}
const event = await db
.insert(costEvents)
.values({ ...data, companyId })
.returning()
.then((rows) => rows[0]);
await db
.update(agents)
.set({
spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${event.costCents}`,
updatedAt: new Date(),
})
.where(eq(agents.id, event.agentId));
await db
.update(companies)
.set({
spentMonthlyCents: sql`${companies.spentMonthlyCents} + ${event.costCents}`,
updatedAt: new Date(),
})
.where(eq(companies.id, companyId));
const updatedAgent = await db
.select()
.from(agents)
.where(eq(agents.id, event.agentId))
.then((rows) => rows[0] ?? null);
if (
updatedAgent &&
updatedAgent.budgetMonthlyCents > 0 &&
updatedAgent.spentMonthlyCents >= updatedAgent.budgetMonthlyCents &&
updatedAgent.status !== "paused" &&
updatedAgent.status !== "terminated"
) {
await db
.update(agents)
.set({ status: "paused", updatedAt: new Date() })
.where(eq(agents.id, updatedAgent.id));
}
return event;
},
summary: async (companyId: string, range?: CostDateRange) => {
const company = await db
.select()
.from(companies)
.where(eq(companies.id, companyId))
.then((rows) => rows[0] ?? null);
if (!company) throw notFound("Company not found");
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
const [{ total }] = await db
.select({
total: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
})
.from(costEvents)
.where(and(...conditions));
const spendCents = Number(total);
const utilization =
company.budgetMonthlyCents > 0
? (spendCents / company.budgetMonthlyCents) * 100
: 0;
return {
companyId,
spendCents,
budgetCents: company.budgetMonthlyCents,
utilizationPercent: Number(utilization.toFixed(2)),
};
},
byAgent: async (companyId: string, range?: CostDateRange) => {
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
const costRows = await db
.select({
agentId: costEvents.agentId,
agentName: agents.name,
agentStatus: agents.status,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
})
.from(costEvents)
.leftJoin(agents, eq(costEvents.agentId, agents.id))
.where(and(...conditions))
.groupBy(costEvents.agentId, agents.name, agents.status)
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
const runConditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
if (range?.from) runConditions.push(gte(heartbeatRuns.finishedAt, range.from));
if (range?.to) runConditions.push(lte(heartbeatRuns.finishedAt, range.to));
const runRows = await db
.select({
agentId: heartbeatRuns.agentId,
apiRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'api' then 1 else 0 end), 0)::int`,
subscriptionRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then 1 else 0 end), 0)::int`,
subscriptionInputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0) else 0 end), 0)::int`,
subscriptionOutputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0) else 0 end), 0)::int`,
})
.from(heartbeatRuns)
.where(and(...runConditions))
.groupBy(heartbeatRuns.agentId);
const runRowsByAgent = new Map(runRows.map((row) => [row.agentId, row]));
return costRows.map((row) => {
const runRow = runRowsByAgent.get(row.agentId);
return {
...row,
apiRunCount: runRow?.apiRunCount ?? 0,
subscriptionRunCount: runRow?.subscriptionRunCount ?? 0,
subscriptionInputTokens: runRow?.subscriptionInputTokens ?? 0,
subscriptionOutputTokens: runRow?.subscriptionOutputTokens ?? 0,
};
});
},
byProvider: async (companyId: string, range?: CostDateRange) => {
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
const costRows = await db
.select({
provider: costEvents.provider,
model: costEvents.model,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
})
.from(costEvents)
.where(and(...conditions))
.groupBy(costEvents.provider, costEvents.model)
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
const runConditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
if (range?.from) runConditions.push(gte(heartbeatRuns.startedAt, range.from));
if (range?.to) runConditions.push(lte(heartbeatRuns.startedAt, range.to));
const runRows = await db
.select({
agentId: heartbeatRuns.agentId,
apiRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'api' then 1 else 0 end), 0)::int`,
subscriptionRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then 1 else 0 end), 0)::int`,
subscriptionInputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0) else 0 end), 0)::int`,
subscriptionOutputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0) else 0 end), 0)::int`,
})
.from(heartbeatRuns)
.where(and(...runConditions))
.groupBy(heartbeatRuns.agentId);
// aggregate run billing splits across all agents (runs don't carry model info so we can't go per-model)
const totals = runRows.reduce(
(acc, r) => ({
apiRunCount: acc.apiRunCount + r.apiRunCount,
subscriptionRunCount: acc.subscriptionRunCount + r.subscriptionRunCount,
subscriptionInputTokens: acc.subscriptionInputTokens + r.subscriptionInputTokens,
subscriptionOutputTokens: acc.subscriptionOutputTokens + r.subscriptionOutputTokens,
}),
{ apiRunCount: 0, subscriptionRunCount: 0, subscriptionInputTokens: 0, subscriptionOutputTokens: 0 },
);
// pro-rate billing split across models by token share
const totalTokens = costRows.reduce((s, r) => s + r.inputTokens + r.outputTokens, 0);
return costRows.map((row) => {
const rowTokens = row.inputTokens + row.outputTokens;
const share = totalTokens > 0 ? rowTokens / totalTokens : 0;
return {
provider: row.provider,
model: row.model,
costCents: row.costCents,
inputTokens: row.inputTokens,
outputTokens: row.outputTokens,
apiRunCount: Math.round(totals.apiRunCount * share),
subscriptionRunCount: Math.round(totals.subscriptionRunCount * share),
subscriptionInputTokens: Math.round(totals.subscriptionInputTokens * share),
subscriptionOutputTokens: Math.round(totals.subscriptionOutputTokens * share),
};
});
},
/**
* aggregates cost_events by provider for each of three rolling windows:
* last 5 hours, last 24 hours, last 7 days.
* purely internal consumption data, no external rate-limit sources.
*/
windowSpend: async (companyId: string) => {
const windows = [
{ label: "5h", hours: 5 },
{ label: "24h", hours: 24 },
{ label: "7d", hours: 168 },
] as const;
const results = await Promise.all(
windows.map(async ({ label, hours }) => {
const since = new Date(Date.now() - hours * 60 * 60 * 1000);
const rows = await db
.select({
provider: costEvents.provider,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
})
.from(costEvents)
.where(
and(
eq(costEvents.companyId, companyId),
gte(costEvents.occurredAt, since),
),
)
.groupBy(costEvents.provider)
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
return rows.map((row) => ({
provider: row.provider,
window: label as string,
windowHours: hours,
costCents: row.costCents,
inputTokens: row.inputTokens,
outputTokens: row.outputTokens,
}));
}),
);
return results.flat();
},
byProject: async (companyId: string, range?: CostDateRange) => {
const issueIdAsText = sql<string>`${issues.id}::text`;
const runProjectLinks = db
.selectDistinctOn([activityLog.runId, issues.projectId], {
runId: activityLog.runId,
projectId: issues.projectId,
})
.from(activityLog)
.innerJoin(
issues,
and(
eq(activityLog.entityType, "issue"),
eq(activityLog.entityId, issueIdAsText),
),
)
.where(
and(
eq(activityLog.companyId, companyId),
eq(issues.companyId, companyId),
isNotNull(activityLog.runId),
isNotNull(issues.projectId),
),
)
.orderBy(activityLog.runId, issues.projectId, desc(activityLog.createdAt))
.as("run_project_links");
const conditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
if (range?.from) conditions.push(gte(heartbeatRuns.finishedAt, range.from));
if (range?.to) conditions.push(lte(heartbeatRuns.finishedAt, range.to));
const costCentsExpr = sql<number>`coalesce(sum(round(coalesce((${heartbeatRuns.usageJson} ->> 'costUsd')::numeric, 0) * 100)), 0)::int`;
return db
.select({
projectId: runProjectLinks.projectId,
projectName: projects.name,
costCents: costCentsExpr,
inputTokens: sql<number>`coalesce(sum(coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0)), 0)::int`,
outputTokens: sql<number>`coalesce(sum(coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0)), 0)::int`,
})
.from(runProjectLinks)
.innerJoin(heartbeatRuns, eq(runProjectLinks.runId, heartbeatRuns.id))
.innerJoin(projects, eq(runProjectLinks.projectId, projects.id))
.where(and(...conditions))
.groupBy(runProjectLinks.projectId, projects.name)
.orderBy(desc(costCentsExpr));
},
};
}