Add AES-256-GCM local encrypted secrets provider with auto-generated master key, stub providers for AWS/GCP/Vault, and a secrets service that normalizes adapter configs (converting sensitive inline values to secret refs in strict mode) and resolves secret refs back to plain values at runtime. Extract redaction utilities from agent routes into shared module. Redact sensitive values in activity logs, config revisions, and approval payloads. Block rollback of revisions containing redacted secrets. Filter hidden issues from list queries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
507 lines
17 KiB
TypeScript
507 lines
17 KiB
TypeScript
import { createHash, randomBytes } from "node:crypto";
|
|
import { and, desc, eq, inArray } from "drizzle-orm";
|
|
import type { Db } from "@paperclip/db";
|
|
import {
|
|
agents,
|
|
agentConfigRevisions,
|
|
agentApiKeys,
|
|
agentRuntimeState,
|
|
agentTaskSessions,
|
|
agentWakeupRequests,
|
|
heartbeatRunEvents,
|
|
heartbeatRuns,
|
|
} from "@paperclip/db";
|
|
import { conflict, notFound, unprocessable } from "../errors.js";
|
|
import { normalizeAgentPermissions } from "./agent-permissions.js";
|
|
import { REDACTED_EVENT_VALUE, sanitizeRecord } from "../redaction.js";
|
|
|
|
function hashToken(token: string) {
|
|
return createHash("sha256").update(token).digest("hex");
|
|
}
|
|
|
|
function createToken() {
|
|
return `pcp_${randomBytes(24).toString("hex")}`;
|
|
}
|
|
|
|
const CONFIG_REVISION_FIELDS = [
|
|
"name",
|
|
"role",
|
|
"title",
|
|
"reportsTo",
|
|
"capabilities",
|
|
"adapterType",
|
|
"adapterConfig",
|
|
"runtimeConfig",
|
|
"budgetMonthlyCents",
|
|
"metadata",
|
|
] as const;
|
|
|
|
type ConfigRevisionField = (typeof CONFIG_REVISION_FIELDS)[number];
|
|
type AgentConfigSnapshot = Pick<typeof agents.$inferSelect, ConfigRevisionField>;
|
|
|
|
interface RevisionMetadata {
|
|
createdByAgentId?: string | null;
|
|
createdByUserId?: string | null;
|
|
source?: string;
|
|
rolledBackFromRevisionId?: string | null;
|
|
}
|
|
|
|
interface UpdateAgentOptions {
|
|
recordRevision?: RevisionMetadata;
|
|
}
|
|
|
|
function isPlainRecord(value: unknown): value is Record<string, unknown> {
|
|
return typeof value === "object" && value !== null && !Array.isArray(value);
|
|
}
|
|
|
|
function jsonEqual(left: unknown, right: unknown): boolean {
|
|
return JSON.stringify(left) === JSON.stringify(right);
|
|
}
|
|
|
|
function buildConfigSnapshot(
|
|
row: Pick<typeof agents.$inferSelect, ConfigRevisionField>,
|
|
): AgentConfigSnapshot {
|
|
const adapterConfig =
|
|
typeof row.adapterConfig === "object" && row.adapterConfig !== null && !Array.isArray(row.adapterConfig)
|
|
? sanitizeRecord(row.adapterConfig as Record<string, unknown>)
|
|
: {};
|
|
const runtimeConfig =
|
|
typeof row.runtimeConfig === "object" && row.runtimeConfig !== null && !Array.isArray(row.runtimeConfig)
|
|
? sanitizeRecord(row.runtimeConfig as Record<string, unknown>)
|
|
: {};
|
|
const metadata =
|
|
typeof row.metadata === "object" && row.metadata !== null && !Array.isArray(row.metadata)
|
|
? sanitizeRecord(row.metadata as Record<string, unknown>)
|
|
: row.metadata ?? null;
|
|
return {
|
|
name: row.name,
|
|
role: row.role,
|
|
title: row.title,
|
|
reportsTo: row.reportsTo,
|
|
capabilities: row.capabilities,
|
|
adapterType: row.adapterType,
|
|
adapterConfig,
|
|
runtimeConfig,
|
|
budgetMonthlyCents: row.budgetMonthlyCents,
|
|
metadata,
|
|
};
|
|
}
|
|
|
|
function containsRedactedMarker(value: unknown): boolean {
|
|
if (value === REDACTED_EVENT_VALUE) return true;
|
|
if (Array.isArray(value)) return value.some((item) => containsRedactedMarker(item));
|
|
if (typeof value !== "object" || value === null) return false;
|
|
return Object.values(value as Record<string, unknown>).some((entry) => containsRedactedMarker(entry));
|
|
}
|
|
|
|
function hasConfigPatchFields(data: Partial<typeof agents.$inferInsert>) {
|
|
return CONFIG_REVISION_FIELDS.some((field) => Object.prototype.hasOwnProperty.call(data, field));
|
|
}
|
|
|
|
function diffConfigSnapshot(
|
|
before: AgentConfigSnapshot,
|
|
after: AgentConfigSnapshot,
|
|
): string[] {
|
|
return CONFIG_REVISION_FIELDS.filter((field) => !jsonEqual(before[field], after[field]));
|
|
}
|
|
|
|
function configPatchFromSnapshot(snapshot: unknown): Partial<typeof agents.$inferInsert> {
|
|
if (!isPlainRecord(snapshot)) throw unprocessable("Invalid revision snapshot");
|
|
|
|
if (typeof snapshot.name !== "string" || snapshot.name.length === 0) {
|
|
throw unprocessable("Invalid revision snapshot: name");
|
|
}
|
|
if (typeof snapshot.role !== "string" || snapshot.role.length === 0) {
|
|
throw unprocessable("Invalid revision snapshot: role");
|
|
}
|
|
if (typeof snapshot.adapterType !== "string" || snapshot.adapterType.length === 0) {
|
|
throw unprocessable("Invalid revision snapshot: adapterType");
|
|
}
|
|
if (typeof snapshot.budgetMonthlyCents !== "number" || !Number.isFinite(snapshot.budgetMonthlyCents)) {
|
|
throw unprocessable("Invalid revision snapshot: budgetMonthlyCents");
|
|
}
|
|
|
|
return {
|
|
name: snapshot.name,
|
|
role: snapshot.role,
|
|
title: typeof snapshot.title === "string" || snapshot.title === null ? snapshot.title : null,
|
|
reportsTo:
|
|
typeof snapshot.reportsTo === "string" || snapshot.reportsTo === null ? snapshot.reportsTo : null,
|
|
capabilities:
|
|
typeof snapshot.capabilities === "string" || snapshot.capabilities === null
|
|
? snapshot.capabilities
|
|
: null,
|
|
adapterType: snapshot.adapterType,
|
|
adapterConfig: isPlainRecord(snapshot.adapterConfig) ? snapshot.adapterConfig : {},
|
|
runtimeConfig: isPlainRecord(snapshot.runtimeConfig) ? snapshot.runtimeConfig : {},
|
|
budgetMonthlyCents: Math.max(0, Math.floor(snapshot.budgetMonthlyCents)),
|
|
metadata: isPlainRecord(snapshot.metadata) || snapshot.metadata === null ? snapshot.metadata : null,
|
|
};
|
|
}
|
|
|
|
export function agentService(db: Db) {
|
|
function normalizeAgentRow(row: typeof agents.$inferSelect) {
|
|
return {
|
|
...row,
|
|
permissions: normalizeAgentPermissions(row.permissions, row.role),
|
|
};
|
|
}
|
|
|
|
async function getById(id: string) {
|
|
const row = await db
|
|
.select()
|
|
.from(agents)
|
|
.where(eq(agents.id, id))
|
|
.then((rows) => rows[0] ?? null);
|
|
return row ? normalizeAgentRow(row) : null;
|
|
}
|
|
|
|
async function ensureManager(companyId: string, managerId: string) {
|
|
const manager = await getById(managerId);
|
|
if (!manager) throw notFound("Manager not found");
|
|
if (manager.companyId !== companyId) {
|
|
throw unprocessable("Manager must belong to same company");
|
|
}
|
|
return manager;
|
|
}
|
|
|
|
async function assertNoCycle(agentId: string, reportsTo: string | null | undefined) {
|
|
if (!reportsTo) return;
|
|
if (reportsTo === agentId) throw unprocessable("Agent cannot report to itself");
|
|
|
|
let cursor: string | null = reportsTo;
|
|
while (cursor) {
|
|
if (cursor === agentId) throw unprocessable("Reporting relationship would create cycle");
|
|
const next = await getById(cursor);
|
|
cursor = next?.reportsTo ?? null;
|
|
}
|
|
}
|
|
|
|
async function updateAgent(
|
|
id: string,
|
|
data: Partial<typeof agents.$inferInsert>,
|
|
options?: UpdateAgentOptions,
|
|
) {
|
|
const existing = await getById(id);
|
|
if (!existing) return null;
|
|
|
|
if (existing.status === "terminated" && data.status && data.status !== "terminated") {
|
|
throw conflict("Terminated agents cannot be resumed");
|
|
}
|
|
if (
|
|
existing.status === "pending_approval" &&
|
|
data.status &&
|
|
data.status !== "pending_approval" &&
|
|
data.status !== "terminated"
|
|
) {
|
|
throw conflict("Pending approval agents cannot be activated directly");
|
|
}
|
|
|
|
if (data.reportsTo !== undefined) {
|
|
if (data.reportsTo) {
|
|
await ensureManager(existing.companyId, data.reportsTo);
|
|
}
|
|
await assertNoCycle(id, data.reportsTo);
|
|
}
|
|
|
|
const normalizedPatch = { ...data } as Partial<typeof agents.$inferInsert>;
|
|
if (data.permissions !== undefined) {
|
|
const role = (data.role ?? existing.role) as string;
|
|
normalizedPatch.permissions = normalizeAgentPermissions(data.permissions, role);
|
|
}
|
|
|
|
const shouldRecordRevision = Boolean(options?.recordRevision) && hasConfigPatchFields(normalizedPatch);
|
|
const beforeConfig = shouldRecordRevision ? buildConfigSnapshot(existing) : null;
|
|
|
|
const updated = await db
|
|
.update(agents)
|
|
.set({ ...normalizedPatch, updatedAt: new Date() })
|
|
.where(eq(agents.id, id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
const normalizedUpdated = updated ? normalizeAgentRow(updated) : null;
|
|
|
|
if (normalizedUpdated && shouldRecordRevision && beforeConfig) {
|
|
const afterConfig = buildConfigSnapshot(normalizedUpdated);
|
|
const changedKeys = diffConfigSnapshot(beforeConfig, afterConfig);
|
|
if (changedKeys.length > 0) {
|
|
await db.insert(agentConfigRevisions).values({
|
|
companyId: normalizedUpdated.companyId,
|
|
agentId: normalizedUpdated.id,
|
|
createdByAgentId: options?.recordRevision?.createdByAgentId ?? null,
|
|
createdByUserId: options?.recordRevision?.createdByUserId ?? null,
|
|
source: options?.recordRevision?.source ?? "patch",
|
|
rolledBackFromRevisionId: options?.recordRevision?.rolledBackFromRevisionId ?? null,
|
|
changedKeys,
|
|
beforeConfig: beforeConfig as unknown as Record<string, unknown>,
|
|
afterConfig: afterConfig as unknown as Record<string, unknown>,
|
|
});
|
|
}
|
|
}
|
|
|
|
return normalizedUpdated;
|
|
}
|
|
|
|
return {
|
|
list: async (companyId: string) => {
|
|
const rows = await db.select().from(agents).where(eq(agents.companyId, companyId));
|
|
return rows.map(normalizeAgentRow);
|
|
},
|
|
|
|
getById,
|
|
|
|
create: async (companyId: string, data: Omit<typeof agents.$inferInsert, "companyId">) => {
|
|
if (data.reportsTo) {
|
|
await ensureManager(companyId, data.reportsTo);
|
|
}
|
|
|
|
const role = data.role ?? "general";
|
|
const normalizedPermissions = normalizeAgentPermissions(data.permissions, role);
|
|
const created = await db
|
|
.insert(agents)
|
|
.values({ ...data, companyId, role, permissions: normalizedPermissions })
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
|
|
return normalizeAgentRow(created);
|
|
},
|
|
|
|
update: updateAgent,
|
|
|
|
pause: async (id: string) => {
|
|
const existing = await getById(id);
|
|
if (!existing) return null;
|
|
if (existing.status === "terminated") throw conflict("Cannot pause terminated agent");
|
|
|
|
const updated = await db
|
|
.update(agents)
|
|
.set({ status: "paused", updatedAt: new Date() })
|
|
.where(eq(agents.id, id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
return updated ? normalizeAgentRow(updated) : null;
|
|
},
|
|
|
|
resume: async (id: string) => {
|
|
const existing = await getById(id);
|
|
if (!existing) return null;
|
|
if (existing.status === "terminated") throw conflict("Cannot resume terminated agent");
|
|
if (existing.status === "pending_approval") {
|
|
throw conflict("Pending approval agents cannot be resumed");
|
|
}
|
|
|
|
const updated = await db
|
|
.update(agents)
|
|
.set({ status: "idle", updatedAt: new Date() })
|
|
.where(eq(agents.id, id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
return updated ? normalizeAgentRow(updated) : null;
|
|
},
|
|
|
|
terminate: async (id: string) => {
|
|
const existing = await getById(id);
|
|
if (!existing) return null;
|
|
|
|
await db
|
|
.update(agents)
|
|
.set({ status: "terminated", updatedAt: new Date() })
|
|
.where(eq(agents.id, id));
|
|
|
|
await db
|
|
.update(agentApiKeys)
|
|
.set({ revokedAt: new Date() })
|
|
.where(eq(agentApiKeys.agentId, id));
|
|
|
|
return getById(id);
|
|
},
|
|
|
|
remove: async (id: string) => {
|
|
const existing = await getById(id);
|
|
if (!existing) return null;
|
|
|
|
return db.transaction(async (tx) => {
|
|
await tx.update(agents).set({ reportsTo: null }).where(eq(agents.reportsTo, id));
|
|
await tx.delete(heartbeatRunEvents).where(eq(heartbeatRunEvents.agentId, id));
|
|
await tx.delete(agentTaskSessions).where(eq(agentTaskSessions.agentId, id));
|
|
await tx.delete(heartbeatRuns).where(eq(heartbeatRuns.agentId, id));
|
|
await tx.delete(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, id));
|
|
await tx.delete(agentApiKeys).where(eq(agentApiKeys.agentId, id));
|
|
await tx.delete(agentRuntimeState).where(eq(agentRuntimeState.agentId, id));
|
|
const deleted = await tx
|
|
.delete(agents)
|
|
.where(eq(agents.id, id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
return deleted ? normalizeAgentRow(deleted) : null;
|
|
});
|
|
},
|
|
|
|
activatePendingApproval: async (id: string) => {
|
|
const existing = await getById(id);
|
|
if (!existing) return null;
|
|
if (existing.status !== "pending_approval") return existing;
|
|
|
|
const updated = await db
|
|
.update(agents)
|
|
.set({ status: "idle", updatedAt: new Date() })
|
|
.where(eq(agents.id, id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
return updated ? normalizeAgentRow(updated) : null;
|
|
},
|
|
|
|
updatePermissions: async (id: string, permissions: { canCreateAgents: boolean }) => {
|
|
const existing = await getById(id);
|
|
if (!existing) return null;
|
|
|
|
const updated = await db
|
|
.update(agents)
|
|
.set({
|
|
permissions: normalizeAgentPermissions(permissions, existing.role),
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agents.id, id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
return updated ? normalizeAgentRow(updated) : null;
|
|
},
|
|
|
|
listConfigRevisions: async (id: string) =>
|
|
db
|
|
.select()
|
|
.from(agentConfigRevisions)
|
|
.where(eq(agentConfigRevisions.agentId, id))
|
|
.orderBy(desc(agentConfigRevisions.createdAt)),
|
|
|
|
getConfigRevision: async (id: string, revisionId: string) =>
|
|
db
|
|
.select()
|
|
.from(agentConfigRevisions)
|
|
.where(and(eq(agentConfigRevisions.agentId, id), eq(agentConfigRevisions.id, revisionId)))
|
|
.then((rows) => rows[0] ?? null),
|
|
|
|
rollbackConfigRevision: async (
|
|
id: string,
|
|
revisionId: string,
|
|
actor: { agentId?: string | null; userId?: string | null },
|
|
) => {
|
|
const revision = await db
|
|
.select()
|
|
.from(agentConfigRevisions)
|
|
.where(and(eq(agentConfigRevisions.agentId, id), eq(agentConfigRevisions.id, revisionId)))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!revision) return null;
|
|
if (containsRedactedMarker(revision.afterConfig)) {
|
|
throw unprocessable("Cannot roll back a revision that contains redacted secret values");
|
|
}
|
|
|
|
const patch = configPatchFromSnapshot(revision.afterConfig);
|
|
return updateAgent(id, patch, {
|
|
recordRevision: {
|
|
createdByAgentId: actor.agentId ?? null,
|
|
createdByUserId: actor.userId ?? null,
|
|
source: "rollback",
|
|
rolledBackFromRevisionId: revision.id,
|
|
},
|
|
});
|
|
},
|
|
|
|
createApiKey: async (id: string, name: string) => {
|
|
const existing = await getById(id);
|
|
if (!existing) throw notFound("Agent not found");
|
|
if (existing.status === "pending_approval") {
|
|
throw conflict("Cannot create keys for pending approval agents");
|
|
}
|
|
if (existing.status === "terminated") {
|
|
throw conflict("Cannot create keys for terminated agents");
|
|
}
|
|
|
|
const token = createToken();
|
|
const keyHash = hashToken(token);
|
|
const created = await db
|
|
.insert(agentApiKeys)
|
|
.values({
|
|
agentId: id,
|
|
companyId: existing.companyId,
|
|
name,
|
|
keyHash,
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
|
|
return {
|
|
id: created.id,
|
|
name: created.name,
|
|
token,
|
|
createdAt: created.createdAt,
|
|
};
|
|
},
|
|
|
|
listKeys: (id: string) =>
|
|
db
|
|
.select({
|
|
id: agentApiKeys.id,
|
|
name: agentApiKeys.name,
|
|
createdAt: agentApiKeys.createdAt,
|
|
revokedAt: agentApiKeys.revokedAt,
|
|
})
|
|
.from(agentApiKeys)
|
|
.where(eq(agentApiKeys.agentId, id)),
|
|
|
|
revokeKey: async (keyId: string) => {
|
|
const rows = await db
|
|
.update(agentApiKeys)
|
|
.set({ revokedAt: new Date() })
|
|
.where(eq(agentApiKeys.id, keyId))
|
|
.returning();
|
|
return rows[0] ?? null;
|
|
},
|
|
|
|
orgForCompany: async (companyId: string) => {
|
|
const rows = await db.select().from(agents).where(eq(agents.companyId, companyId));
|
|
const normalizedRows = rows.map(normalizeAgentRow);
|
|
const byManager = new Map<string | null, typeof normalizedRows>();
|
|
for (const row of normalizedRows) {
|
|
const key = row.reportsTo ?? null;
|
|
const group = byManager.get(key) ?? [];
|
|
group.push(row);
|
|
byManager.set(key, group);
|
|
}
|
|
|
|
const build = (managerId: string | null): Array<Record<string, unknown>> => {
|
|
const members = byManager.get(managerId) ?? [];
|
|
return members.map((member) => ({
|
|
...member,
|
|
reports: build(member.id),
|
|
}));
|
|
};
|
|
|
|
return build(null);
|
|
},
|
|
|
|
getChainOfCommand: async (agentId: string) => {
|
|
const chain: { id: string; name: string; role: string; title: string | null }[] = [];
|
|
const visited = new Set<string>([agentId]);
|
|
const start = await getById(agentId);
|
|
let currentId = start?.reportsTo ?? null;
|
|
while (currentId && !visited.has(currentId) && chain.length < 50) {
|
|
visited.add(currentId);
|
|
const mgr = await getById(currentId);
|
|
if (!mgr) break;
|
|
chain.push({ id: mgr.id, name: mgr.name, role: mgr.role, title: mgr.title ?? null });
|
|
currentId = mgr.reportsTo ?? null;
|
|
}
|
|
return chain;
|
|
},
|
|
|
|
runningForAgent: (agentId: string) =>
|
|
db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))),
|
|
};
|
|
}
|