683 lines
20 KiB
TypeScript
683 lines
20 KiB
TypeScript
import { asc, eq, ne, sql, and } from "drizzle-orm";
|
||
import type { Db } from "@paperclipai/db";
|
||
import {
|
||
plugins,
|
||
pluginConfig,
|
||
pluginEntities,
|
||
pluginJobs,
|
||
pluginJobRuns,
|
||
pluginWebhookDeliveries,
|
||
} from "@paperclipai/db";
|
||
import type {
|
||
PaperclipPluginManifestV1,
|
||
PluginStatus,
|
||
InstallPlugin,
|
||
UpdatePluginStatus,
|
||
UpsertPluginConfig,
|
||
PatchPluginConfig,
|
||
PluginEntityRecord,
|
||
PluginEntityQuery,
|
||
PluginJobRecord,
|
||
PluginJobRunRecord,
|
||
PluginWebhookDeliveryRecord,
|
||
PluginJobStatus,
|
||
PluginJobRunStatus,
|
||
PluginJobRunTrigger,
|
||
PluginWebhookDeliveryStatus,
|
||
} from "@paperclipai/shared";
|
||
import { conflict, notFound } from "../errors.js";
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Helpers
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/**
|
||
* Detect if a Postgres error is a unique-constraint violation on the
|
||
* `plugins_plugin_key_idx` unique index.
|
||
*/
|
||
function isPluginKeyConflict(error: unknown): boolean {
|
||
if (typeof error !== "object" || error === null) return false;
|
||
const err = error as { code?: string; constraint?: string; constraint_name?: string };
|
||
const constraint = err.constraint ?? err.constraint_name;
|
||
return err.code === "23505" && constraint === "plugins_plugin_key_idx";
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Service
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/**
|
||
* PluginRegistry – CRUD operations for the `plugins` and `plugin_config`
|
||
* tables. Follows the same factory-function pattern used by the rest of
|
||
* the Paperclip service layer.
|
||
*
|
||
* This is the lowest-level persistence layer for plugins. Higher-level
|
||
* concerns such as lifecycle state-machine enforcement and capability
|
||
* gating are handled by {@link pluginLifecycleManager} and
|
||
* {@link pluginCapabilityValidator} respectively.
|
||
*
|
||
* @see PLUGIN_SPEC.md §21.3 — Required Tables
|
||
*/
|
||
export function pluginRegistryService(db: Db) {
|
||
// -----------------------------------------------------------------------
|
||
// Internal helpers
|
||
// -----------------------------------------------------------------------
|
||
|
||
async function getById(id: string) {
|
||
return db
|
||
.select()
|
||
.from(plugins)
|
||
.where(eq(plugins.id, id))
|
||
.then((rows) => rows[0] ?? null);
|
||
}
|
||
|
||
async function getByKey(pluginKey: string) {
|
||
return db
|
||
.select()
|
||
.from(plugins)
|
||
.where(eq(plugins.pluginKey, pluginKey))
|
||
.then((rows) => rows[0] ?? null);
|
||
}
|
||
|
||
async function nextInstallOrder(): Promise<number> {
|
||
const result = await db
|
||
.select({ maxOrder: sql<number>`coalesce(max(${plugins.installOrder}), 0)` })
|
||
.from(plugins);
|
||
return (result[0]?.maxOrder ?? 0) + 1;
|
||
}
|
||
|
||
// -----------------------------------------------------------------------
|
||
// Public API
|
||
// -----------------------------------------------------------------------
|
||
|
||
return {
|
||
// ----- Read -----------------------------------------------------------
|
||
|
||
/** List all registered plugins ordered by install order. */
|
||
list: () =>
|
||
db
|
||
.select()
|
||
.from(plugins)
|
||
.orderBy(asc(plugins.installOrder)),
|
||
|
||
/**
|
||
* List installed plugins (excludes soft-deleted/uninstalled).
|
||
* Use for Plugin Manager and default API list so uninstalled plugins do not appear.
|
||
*/
|
||
listInstalled: () =>
|
||
db
|
||
.select()
|
||
.from(plugins)
|
||
.where(ne(plugins.status, "uninstalled"))
|
||
.orderBy(asc(plugins.installOrder)),
|
||
|
||
/** List plugins filtered by status. */
|
||
listByStatus: (status: PluginStatus) =>
|
||
db
|
||
.select()
|
||
.from(plugins)
|
||
.where(eq(plugins.status, status))
|
||
.orderBy(asc(plugins.installOrder)),
|
||
|
||
/** Get a single plugin by primary key. */
|
||
getById,
|
||
|
||
/** Get a single plugin by its unique `pluginKey`. */
|
||
getByKey,
|
||
|
||
// ----- Install / Register --------------------------------------------
|
||
|
||
/**
|
||
* Register (install) a new plugin.
|
||
*
|
||
* The caller is expected to have already resolved and validated the
|
||
* manifest from the package. This method persists the plugin row and
|
||
* assigns the next install order.
|
||
*/
|
||
install: async (input: InstallPlugin, manifest: PaperclipPluginManifestV1) => {
|
||
const existing = await getByKey(manifest.id);
|
||
if (existing) {
|
||
if (existing.status !== "uninstalled") {
|
||
throw conflict(`Plugin already installed: ${manifest.id}`);
|
||
}
|
||
|
||
// Reinstall after soft-delete: reactivate the existing row so plugin-scoped
|
||
// data and references remain stable across uninstall/reinstall cycles.
|
||
return db
|
||
.update(plugins)
|
||
.set({
|
||
packageName: input.packageName,
|
||
packagePath: input.packagePath ?? null,
|
||
version: manifest.version,
|
||
apiVersion: manifest.apiVersion,
|
||
categories: manifest.categories,
|
||
manifestJson: manifest,
|
||
status: "installed" as PluginStatus,
|
||
lastError: null,
|
||
updatedAt: new Date(),
|
||
})
|
||
.where(eq(plugins.id, existing.id))
|
||
.returning()
|
||
.then((rows) => rows[0] ?? null);
|
||
}
|
||
|
||
const installOrder = await nextInstallOrder();
|
||
|
||
try {
|
||
const rows = await db
|
||
.insert(plugins)
|
||
.values({
|
||
pluginKey: manifest.id,
|
||
packageName: input.packageName,
|
||
version: manifest.version,
|
||
apiVersion: manifest.apiVersion,
|
||
categories: manifest.categories,
|
||
manifestJson: manifest,
|
||
status: "installed" as PluginStatus,
|
||
installOrder,
|
||
packagePath: input.packagePath ?? null,
|
||
})
|
||
.returning();
|
||
return rows[0];
|
||
} catch (error) {
|
||
if (isPluginKeyConflict(error)) {
|
||
throw conflict(`Plugin already installed: ${manifest.id}`);
|
||
}
|
||
throw error;
|
||
}
|
||
},
|
||
|
||
// ----- Update ---------------------------------------------------------
|
||
|
||
/**
|
||
* Update a plugin's manifest and version (e.g. on upgrade).
|
||
* The plugin must already exist.
|
||
*/
|
||
update: async (
|
||
id: string,
|
||
data: {
|
||
packageName?: string;
|
||
version?: string;
|
||
manifest?: PaperclipPluginManifestV1;
|
||
},
|
||
) => {
|
||
const plugin = await getById(id);
|
||
if (!plugin) throw notFound("Plugin not found");
|
||
|
||
const setClause: Partial<typeof plugins.$inferInsert> & { updatedAt: Date } = {
|
||
updatedAt: new Date(),
|
||
};
|
||
if (data.packageName !== undefined) setClause.packageName = data.packageName;
|
||
if (data.version !== undefined) setClause.version = data.version;
|
||
if (data.manifest !== undefined) {
|
||
setClause.manifestJson = data.manifest;
|
||
setClause.apiVersion = data.manifest.apiVersion;
|
||
setClause.categories = data.manifest.categories;
|
||
}
|
||
|
||
return db
|
||
.update(plugins)
|
||
.set(setClause)
|
||
.where(eq(plugins.id, id))
|
||
.returning()
|
||
.then((rows) => rows[0] ?? null);
|
||
},
|
||
|
||
// ----- Status ---------------------------------------------------------
|
||
|
||
/** Update a plugin's lifecycle status and optional error message. */
|
||
updateStatus: async (id: string, input: UpdatePluginStatus) => {
|
||
const plugin = await getById(id);
|
||
if (!plugin) throw notFound("Plugin not found");
|
||
|
||
return db
|
||
.update(plugins)
|
||
.set({
|
||
status: input.status,
|
||
lastError: input.lastError ?? null,
|
||
updatedAt: new Date(),
|
||
})
|
||
.where(eq(plugins.id, id))
|
||
.returning()
|
||
.then((rows) => rows[0] ?? null);
|
||
},
|
||
|
||
// ----- Uninstall / Remove --------------------------------------------
|
||
|
||
/**
|
||
* Uninstall a plugin.
|
||
*
|
||
* When `removeData` is true the plugin row (and cascaded config) is
|
||
* hard-deleted. Otherwise the status is set to `"uninstalled"` for
|
||
* a soft-delete that preserves the record.
|
||
*/
|
||
uninstall: async (id: string, removeData = false) => {
|
||
const plugin = await getById(id);
|
||
if (!plugin) throw notFound("Plugin not found");
|
||
|
||
if (removeData) {
|
||
// Hard delete – plugin_config cascades via FK onDelete
|
||
return db
|
||
.delete(plugins)
|
||
.where(eq(plugins.id, id))
|
||
.returning()
|
||
.then((rows) => rows[0] ?? null);
|
||
}
|
||
|
||
// Soft delete – mark as uninstalled
|
||
return db
|
||
.update(plugins)
|
||
.set({
|
||
status: "uninstalled" as PluginStatus,
|
||
updatedAt: new Date(),
|
||
})
|
||
.where(eq(plugins.id, id))
|
||
.returning()
|
||
.then((rows) => rows[0] ?? null);
|
||
},
|
||
|
||
// ----- Config ---------------------------------------------------------
|
||
|
||
/** Retrieve a plugin's instance configuration. */
|
||
getConfig: (pluginId: string) =>
|
||
db
|
||
.select()
|
||
.from(pluginConfig)
|
||
.where(eq(pluginConfig.pluginId, pluginId))
|
||
.then((rows) => rows[0] ?? null),
|
||
|
||
/**
|
||
* Create or fully replace a plugin's instance configuration.
|
||
* If a config row already exists for the plugin it is replaced;
|
||
* otherwise a new row is inserted.
|
||
*/
|
||
upsertConfig: async (pluginId: string, input: UpsertPluginConfig) => {
|
||
const plugin = await getById(pluginId);
|
||
if (!plugin) throw notFound("Plugin not found");
|
||
|
||
const existing = await db
|
||
.select()
|
||
.from(pluginConfig)
|
||
.where(eq(pluginConfig.pluginId, pluginId))
|
||
.then((rows) => rows[0] ?? null);
|
||
|
||
if (existing) {
|
||
return db
|
||
.update(pluginConfig)
|
||
.set({
|
||
configJson: input.configJson,
|
||
lastError: null,
|
||
updatedAt: new Date(),
|
||
})
|
||
.where(eq(pluginConfig.pluginId, pluginId))
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
}
|
||
|
||
return db
|
||
.insert(pluginConfig)
|
||
.values({
|
||
pluginId,
|
||
configJson: input.configJson,
|
||
})
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
},
|
||
|
||
/**
|
||
* Partially update a plugin's instance configuration via shallow merge.
|
||
* If no config row exists yet one is created with the supplied values.
|
||
*/
|
||
patchConfig: async (pluginId: string, input: PatchPluginConfig) => {
|
||
const plugin = await getById(pluginId);
|
||
if (!plugin) throw notFound("Plugin not found");
|
||
|
||
const existing = await db
|
||
.select()
|
||
.from(pluginConfig)
|
||
.where(eq(pluginConfig.pluginId, pluginId))
|
||
.then((rows) => rows[0] ?? null);
|
||
|
||
if (existing) {
|
||
const merged = { ...existing.configJson, ...input.configJson };
|
||
return db
|
||
.update(pluginConfig)
|
||
.set({
|
||
configJson: merged,
|
||
lastError: null,
|
||
updatedAt: new Date(),
|
||
})
|
||
.where(eq(pluginConfig.pluginId, pluginId))
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
}
|
||
|
||
return db
|
||
.insert(pluginConfig)
|
||
.values({
|
||
pluginId,
|
||
configJson: input.configJson,
|
||
})
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
},
|
||
|
||
/**
|
||
* Record an error against a plugin's config (e.g. validation failure
|
||
* against the plugin's instanceConfigSchema).
|
||
*/
|
||
setConfigError: async (pluginId: string, lastError: string | null) => {
|
||
const rows = await db
|
||
.update(pluginConfig)
|
||
.set({ lastError, updatedAt: new Date() })
|
||
.where(eq(pluginConfig.pluginId, pluginId))
|
||
.returning();
|
||
|
||
if (rows.length === 0) throw notFound("Plugin config not found");
|
||
return rows[0];
|
||
},
|
||
|
||
/** Delete a plugin's config row. */
|
||
deleteConfig: async (pluginId: string) => {
|
||
const rows = await db
|
||
.delete(pluginConfig)
|
||
.where(eq(pluginConfig.pluginId, pluginId))
|
||
.returning();
|
||
|
||
return rows[0] ?? null;
|
||
},
|
||
|
||
// ----- Entities -------------------------------------------------------
|
||
|
||
/**
|
||
* List persistent entity mappings owned by a specific plugin, with filtering and pagination.
|
||
*
|
||
* @param pluginId - The UUID of the plugin.
|
||
* @param query - Optional filters (type, externalId) and pagination (limit, offset).
|
||
* @returns A list of matching `PluginEntityRecord` objects.
|
||
*/
|
||
listEntities: (pluginId: string, query?: PluginEntityQuery) => {
|
||
const conditions = [eq(pluginEntities.pluginId, pluginId)];
|
||
if (query?.entityType) conditions.push(eq(pluginEntities.entityType, query.entityType));
|
||
if (query?.externalId) conditions.push(eq(pluginEntities.externalId, query.externalId));
|
||
|
||
return db
|
||
.select()
|
||
.from(pluginEntities)
|
||
.where(and(...conditions))
|
||
.orderBy(asc(pluginEntities.createdAt))
|
||
.limit(query?.limit ?? 100)
|
||
.offset(query?.offset ?? 0);
|
||
},
|
||
|
||
/**
|
||
* Look up a plugin-owned entity mapping by its external identifier.
|
||
*
|
||
* @param pluginId - The UUID of the plugin.
|
||
* @param entityType - The type of entity (e.g., 'project', 'issue').
|
||
* @param externalId - The identifier in the external system.
|
||
* @returns The matching `PluginEntityRecord` or null.
|
||
*/
|
||
getEntityByExternalId: (
|
||
pluginId: string,
|
||
entityType: string,
|
||
externalId: string,
|
||
) =>
|
||
db
|
||
.select()
|
||
.from(pluginEntities)
|
||
.where(
|
||
and(
|
||
eq(pluginEntities.pluginId, pluginId),
|
||
eq(pluginEntities.entityType, entityType),
|
||
eq(pluginEntities.externalId, externalId),
|
||
),
|
||
)
|
||
.then((rows) => rows[0] ?? null),
|
||
|
||
/**
|
||
* Create or update a persistent mapping between a Paperclip object and an
|
||
* external entity.
|
||
*
|
||
* @param pluginId - The UUID of the plugin.
|
||
* @param input - The entity data to persist.
|
||
* @returns The newly created or updated `PluginEntityRecord`.
|
||
*/
|
||
upsertEntity: async (
|
||
pluginId: string,
|
||
input: Omit<typeof pluginEntities.$inferInsert, "id" | "pluginId" | "createdAt" | "updatedAt">,
|
||
) => {
|
||
// Drizzle doesn't support pg-specific onConflictDoUpdate easily in the insert() call
|
||
// with complex where clauses, so we do it manually.
|
||
const existing = await db
|
||
.select()
|
||
.from(pluginEntities)
|
||
.where(
|
||
and(
|
||
eq(pluginEntities.pluginId, pluginId),
|
||
eq(pluginEntities.entityType, input.entityType),
|
||
eq(pluginEntities.externalId, input.externalId ?? ""),
|
||
),
|
||
)
|
||
.then((rows) => rows[0] ?? null);
|
||
|
||
if (existing) {
|
||
return db
|
||
.update(pluginEntities)
|
||
.set({
|
||
...input,
|
||
updatedAt: new Date(),
|
||
})
|
||
.where(eq(pluginEntities.id, existing.id))
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
}
|
||
|
||
return db
|
||
.insert(pluginEntities)
|
||
.values({
|
||
...input,
|
||
pluginId,
|
||
} as any)
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
},
|
||
|
||
/**
|
||
* Delete a specific plugin-owned entity mapping by its internal UUID.
|
||
*
|
||
* @param id - The UUID of the entity record.
|
||
* @returns The deleted record, or null if not found.
|
||
*/
|
||
deleteEntity: async (id: string) => {
|
||
const rows = await db
|
||
.delete(pluginEntities)
|
||
.where(eq(pluginEntities.id, id))
|
||
.returning();
|
||
return rows[0] ?? null;
|
||
},
|
||
|
||
// ----- Jobs -----------------------------------------------------------
|
||
|
||
/**
|
||
* List all scheduled jobs registered for a specific plugin.
|
||
*
|
||
* @param pluginId - The UUID of the plugin.
|
||
* @returns A list of `PluginJobRecord` objects.
|
||
*/
|
||
listJobs: (pluginId: string) =>
|
||
db
|
||
.select()
|
||
.from(pluginJobs)
|
||
.where(eq(pluginJobs.pluginId, pluginId))
|
||
.orderBy(asc(pluginJobs.jobKey)),
|
||
|
||
/**
|
||
* Look up a plugin job by its unique job key.
|
||
*
|
||
* @param pluginId - The UUID of the plugin.
|
||
* @param jobKey - The key defined in the plugin manifest.
|
||
* @returns The matching `PluginJobRecord` or null.
|
||
*/
|
||
getJobByKey: (pluginId: string, jobKey: string) =>
|
||
db
|
||
.select()
|
||
.from(pluginJobs)
|
||
.where(and(eq(pluginJobs.pluginId, pluginId), eq(pluginJobs.jobKey, jobKey)))
|
||
.then((rows) => rows[0] ?? null),
|
||
|
||
/**
|
||
* Register or update a scheduled job for a plugin.
|
||
*
|
||
* @param pluginId - The UUID of the plugin.
|
||
* @param jobKey - The unique key for the job.
|
||
* @param input - The schedule (cron) and optional status.
|
||
* @returns The updated or created `PluginJobRecord`.
|
||
*/
|
||
upsertJob: async (
|
||
pluginId: string,
|
||
jobKey: string,
|
||
input: { schedule: string; status?: PluginJobStatus },
|
||
) => {
|
||
const existing = await db
|
||
.select()
|
||
.from(pluginJobs)
|
||
.where(and(eq(pluginJobs.pluginId, pluginId), eq(pluginJobs.jobKey, jobKey)))
|
||
.then((rows) => rows[0] ?? null);
|
||
|
||
if (existing) {
|
||
return db
|
||
.update(pluginJobs)
|
||
.set({
|
||
schedule: input.schedule,
|
||
status: input.status ?? existing.status,
|
||
updatedAt: new Date(),
|
||
})
|
||
.where(eq(pluginJobs.id, existing.id))
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
}
|
||
|
||
return db
|
||
.insert(pluginJobs)
|
||
.values({
|
||
pluginId,
|
||
jobKey,
|
||
schedule: input.schedule,
|
||
status: input.status ?? "active",
|
||
})
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
},
|
||
|
||
/**
|
||
* Record the start of a specific job execution.
|
||
*
|
||
* @param pluginId - The UUID of the plugin.
|
||
* @param jobId - The UUID of the parent job record.
|
||
* @param trigger - What triggered this run (e.g., 'schedule', 'manual').
|
||
* @returns The newly created `PluginJobRunRecord` in 'pending' status.
|
||
*/
|
||
createJobRun: async (
|
||
pluginId: string,
|
||
jobId: string,
|
||
trigger: PluginJobRunTrigger,
|
||
) => {
|
||
return db
|
||
.insert(pluginJobRuns)
|
||
.values({
|
||
pluginId,
|
||
jobId,
|
||
trigger,
|
||
status: "pending",
|
||
})
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
},
|
||
|
||
/**
|
||
* Update the status, duration, and logs of a job execution record.
|
||
*
|
||
* @param runId - The UUID of the job run.
|
||
* @param input - The update fields (status, error, duration, etc.).
|
||
* @returns The updated `PluginJobRunRecord`.
|
||
*/
|
||
updateJobRun: async (
|
||
runId: string,
|
||
input: {
|
||
status: PluginJobRunStatus;
|
||
durationMs?: number;
|
||
error?: string;
|
||
logs?: string[];
|
||
startedAt?: Date;
|
||
finishedAt?: Date;
|
||
},
|
||
) => {
|
||
return db
|
||
.update(pluginJobRuns)
|
||
.set(input)
|
||
.where(eq(pluginJobRuns.id, runId))
|
||
.returning()
|
||
.then((rows) => rows[0] ?? null);
|
||
},
|
||
|
||
// ----- Webhooks -------------------------------------------------------
|
||
|
||
/**
|
||
* Create a record for an incoming webhook delivery.
|
||
*
|
||
* @param pluginId - The UUID of the receiving plugin.
|
||
* @param webhookKey - The endpoint key defined in the manifest.
|
||
* @param input - The payload, headers, and optional external ID.
|
||
* @returns The newly created `PluginWebhookDeliveryRecord` in 'pending' status.
|
||
*/
|
||
createWebhookDelivery: async (
|
||
pluginId: string,
|
||
webhookKey: string,
|
||
input: {
|
||
externalId?: string;
|
||
payload: Record<string, unknown>;
|
||
headers?: Record<string, string>;
|
||
},
|
||
) => {
|
||
return db
|
||
.insert(pluginWebhookDeliveries)
|
||
.values({
|
||
pluginId,
|
||
webhookKey,
|
||
externalId: input.externalId,
|
||
payload: input.payload,
|
||
headers: input.headers ?? {},
|
||
status: "pending",
|
||
})
|
||
.returning()
|
||
.then((rows) => rows[0]);
|
||
},
|
||
|
||
/**
|
||
* Update the status and processing metrics of a webhook delivery.
|
||
*
|
||
* @param deliveryId - The UUID of the delivery record.
|
||
* @param input - The update fields (status, error, duration, etc.).
|
||
* @returns The updated `PluginWebhookDeliveryRecord`.
|
||
*/
|
||
updateWebhookDelivery: async (
|
||
deliveryId: string,
|
||
input: {
|
||
status: PluginWebhookDeliveryStatus;
|
||
durationMs?: number;
|
||
error?: string;
|
||
startedAt?: Date;
|
||
finishedAt?: Date;
|
||
},
|
||
) => {
|
||
return db
|
||
.update(pluginWebhookDeliveries)
|
||
.set(input)
|
||
.where(eq(pluginWebhookDeliveries.id, deliveryId))
|
||
.returning()
|
||
.then((rows) => rows[0] ?? null);
|
||
},
|
||
};
|
||
}
|