Files
paperclip/server/src/services/plugin-host-services.ts

1064 lines
38 KiB
TypeScript

import type { Db } from "@paperclipai/db";
import { pluginLogs, agentTaskSessions as agentTaskSessionsTable } from "@paperclipai/db";
import { eq, and, like, desc } from "drizzle-orm";
import type {
HostServices,
Company,
Agent,
Project,
Issue,
Goal,
PluginWorkspace,
IssueComment,
} from "@paperclipai/plugin-sdk";
import { companyService } from "./companies.js";
import { agentService } from "./agents.js";
import { projectService } from "./projects.js";
import { issueService } from "./issues.js";
import { goalService } from "./goals.js";
import { heartbeatService } from "./heartbeat.js";
import { subscribeCompanyLiveEvents } from "./live-events.js";
import { randomUUID } from "node:crypto";
import { activityService } from "./activity.js";
import { costService } from "./costs.js";
import { assetService } from "./assets.js";
import { pluginRegistryService } from "./plugin-registry.js";
import { pluginStateStore } from "./plugin-state-store.js";
import { createPluginSecretsHandler } from "./plugin-secrets-handler.js";
import { logActivity } from "./activity-log.js";
import type { PluginEventBus } from "./plugin-event-bus.js";
import { lookup as dnsLookup } from "node:dns/promises";
import type { IncomingMessage, RequestOptions as HttpRequestOptions } from "node:http";
import { request as httpRequest } from "node:http";
import { request as httpsRequest } from "node:https";
import { isIP } from "node:net";
import { logger } from "../middleware/logger.js";
// ---------------------------------------------------------------------------
// SSRF protection for plugin HTTP fetch
// ---------------------------------------------------------------------------
/** Maximum time (ms) a plugin fetch request may take before being aborted. */
const PLUGIN_FETCH_TIMEOUT_MS = 30_000;
/** Maximum time (ms) to wait for a DNS lookup before aborting. */
const DNS_LOOKUP_TIMEOUT_MS = 5_000;
/** Only these protocols are allowed for plugin HTTP requests. */
const ALLOWED_PROTOCOLS = new Set(["http:", "https:"]);
/**
* Check if an IP address is in a private/reserved range (RFC 1918, loopback,
* link-local, etc.) that plugins should never be able to reach.
*
* Handles IPv4-mapped IPv6 addresses (e.g. ::ffff:127.0.0.1) which Node's
* dns.lookup may return depending on OS configuration.
*/
function isPrivateIP(ip: string): boolean {
const lower = ip.toLowerCase();
// Unwrap IPv4-mapped IPv6 addresses (::ffff:x.x.x.x) and re-check as IPv4
const v4MappedMatch = lower.match(/^::ffff:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$/);
if (v4MappedMatch && v4MappedMatch[1]) return isPrivateIP(v4MappedMatch[1]);
// IPv4 patterns
if (ip.startsWith("10.")) return true;
if (ip.startsWith("172.")) {
const second = parseInt(ip.split(".")[1]!, 10);
if (second >= 16 && second <= 31) return true;
}
if (ip.startsWith("192.168.")) return true;
if (ip.startsWith("127.")) return true; // loopback
if (ip.startsWith("169.254.")) return true; // link-local
if (ip === "0.0.0.0") return true;
// IPv6 patterns
if (lower === "::1") return true; // loopback
if (lower.startsWith("fc") || lower.startsWith("fd")) return true; // ULA
if (lower.startsWith("fe80")) return true; // link-local
if (lower === "::") return true;
return false;
}
/**
* Validate a URL for plugin fetch: protocol whitelist + private IP blocking.
*
* SSRF Prevention Strategy:
* 1. Parse and validate the URL syntax
* 2. Enforce protocol whitelist (http/https only)
* 3. Resolve the hostname to IP(s) via DNS
* 4. Validate that ALL resolved IPs are non-private
* 5. Pin the first safe IP into the URL so fetch() does not re-resolve DNS
*
* This prevents DNS rebinding attacks where an attacker controls DNS to
* resolve to a safe IP during validation, then to a private IP when fetch() runs.
*
* @returns Request-routing metadata used to connect directly to the resolved IP
* while preserving the original hostname for HTTP Host and TLS SNI.
*/
interface ValidatedFetchTarget {
parsedUrl: URL;
resolvedAddress: string;
hostHeader: string;
tlsServername?: string;
useTls: boolean;
}
async function validateAndResolveFetchUrl(urlString: string): Promise<ValidatedFetchTarget> {
let parsed: URL;
try {
parsed = new URL(urlString);
} catch {
throw new Error(`Invalid URL: ${urlString}`);
}
if (!ALLOWED_PROTOCOLS.has(parsed.protocol)) {
throw new Error(
`Disallowed protocol "${parsed.protocol}" — only http: and https: are permitted`,
);
}
// Resolve the hostname to an IP and check for private ranges.
// We pin the resolved IP into the URL to eliminate the TOCTOU window
// between DNS resolution here and the second resolution fetch() would do.
const originalHostname = parsed.hostname.replace(/^\[|\]$/g, ""); // strip IPv6 brackets
const hostHeader = parsed.host; // includes port if non-default
// Race the DNS lookup against a timeout to prevent indefinite hangs
// when DNS is misconfigured or unresponsive.
const dnsPromise = dnsLookup(originalHostname, { all: true });
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error(`DNS lookup timed out after ${DNS_LOOKUP_TIMEOUT_MS}ms for ${originalHostname}`)),
DNS_LOOKUP_TIMEOUT_MS,
);
});
try {
const results = await Promise.race([dnsPromise, timeoutPromise]);
if (results.length === 0) {
throw new Error(`DNS resolution returned no results for ${originalHostname}`);
}
// Filter to only non-private IPs instead of rejecting the entire request
// when some IPs are private. This handles multi-homed hosts that resolve
// to both private and public addresses.
const safeResults = results.filter((entry) => !isPrivateIP(entry.address));
if (safeResults.length === 0) {
throw new Error(
`All resolved IPs for ${originalHostname} are in private/reserved ranges`,
);
}
const resolved = safeResults[0]!;
return {
parsedUrl: parsed,
resolvedAddress: resolved.address,
hostHeader,
tlsServername: parsed.protocol === "https:" && isIP(originalHostname) === 0
? originalHostname
: undefined,
useTls: parsed.protocol === "https:",
};
} catch (err) {
// Re-throw our own errors; wrap DNS failures
if (err instanceof Error && (
err.message.startsWith("All resolved IPs") ||
err.message.startsWith("DNS resolution returned") ||
err.message.startsWith("DNS lookup timed out")
)) throw err;
throw new Error(`DNS resolution failed for ${originalHostname}: ${(err as Error).message}`);
}
}
function buildPinnedRequestOptions(
target: ValidatedFetchTarget,
init?: RequestInit,
): { options: HttpRequestOptions & { servername?: string }; body: string | undefined } {
const headers = new Headers(init?.headers);
const method = init?.method ?? "GET";
const body = init?.body === undefined || init?.body === null
? undefined
: typeof init.body === "string"
? init.body
: String(init.body);
headers.set("Host", target.hostHeader);
if (body !== undefined && !headers.has("content-length") && !headers.has("transfer-encoding")) {
headers.set("content-length", String(Buffer.byteLength(body)));
}
const pathname = `${target.parsedUrl.pathname}${target.parsedUrl.search}`;
const auth = target.parsedUrl.username || target.parsedUrl.password
? `${decodeURIComponent(target.parsedUrl.username)}:${decodeURIComponent(target.parsedUrl.password)}`
: undefined;
return {
options: {
protocol: target.parsedUrl.protocol,
host: target.resolvedAddress,
port: target.parsedUrl.port
? Number(target.parsedUrl.port)
: target.useTls
? 443
: 80,
path: pathname,
method,
headers: Object.fromEntries(headers.entries()),
auth,
servername: target.tlsServername,
},
body,
};
}
async function executePinnedHttpRequest(
target: ValidatedFetchTarget,
init: RequestInit | undefined,
signal: AbortSignal,
): Promise<{ status: number; statusText: string; headers: Record<string, string>; body: string }> {
const { options, body } = buildPinnedRequestOptions(target, init);
const response = await new Promise<IncomingMessage>((resolve, reject) => {
const requestFn = target.useTls ? httpsRequest : httpRequest;
const req = requestFn({ ...options, signal }, resolve);
req.on("error", reject);
if (body !== undefined) {
req.write(body);
}
req.end();
});
const MAX_RESPONSE_BODY_BYTES = 200 * 1024 * 1024; // 200 MB
const chunks: Buffer[] = [];
let totalBytes = 0;
await new Promise<void>((resolve, reject) => {
response.on("data", (chunk: Buffer | string) => {
const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
totalBytes += buf.length;
if (totalBytes > MAX_RESPONSE_BODY_BYTES) {
chunks.length = 0;
response.destroy(new Error(`Response body exceeded ${MAX_RESPONSE_BODY_BYTES} bytes`));
return;
}
chunks.push(buf);
});
response.on("end", resolve);
response.on("error", reject);
});
const headers: Record<string, string> = {};
for (const [key, value] of Object.entries(response.headers)) {
if (Array.isArray(value)) {
headers[key] = value.join(", ");
} else if (value !== undefined) {
headers[key] = value;
}
}
return {
status: response.statusCode ?? 500,
statusText: response.statusMessage ?? "",
headers,
body: Buffer.concat(chunks).toString("utf8"),
};
}
const UUID_PATTERN = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
const PATH_LIKE_PATTERN = /[\\/]/;
const WINDOWS_DRIVE_PATH_PATTERN = /^[A-Za-z]:[\\/]/;
function looksLikePath(value: string): boolean {
const normalized = value.trim();
return (
PATH_LIKE_PATTERN.test(normalized)
|| WINDOWS_DRIVE_PATH_PATTERN.test(normalized)
) && !UUID_PATTERN.test(normalized);
}
function sanitizeWorkspaceText(value: string): string {
const trimmed = value.trim();
if (!trimmed || UUID_PATTERN.test(trimmed)) return "";
return trimmed;
}
function sanitizeWorkspacePath(cwd: string | null): string {
if (!cwd) return "";
return looksLikePath(cwd) ? cwd.trim() : "";
}
function sanitizeWorkspaceName(name: string, fallbackPath: string): string {
const safeName = sanitizeWorkspaceText(name);
if (safeName && !looksLikePath(safeName)) {
return safeName;
}
const normalized = fallbackPath.trim().replace(/[\\/]+$/, "");
const segments = normalized.split(/[\\/]/).filter(Boolean);
return segments[segments.length - 1] ?? "Workspace";
}
// ---------------------------------------------------------------------------
// Buffered plugin log writes
// ---------------------------------------------------------------------------
/** How many buffered log entries trigger an immediate flush. */
const LOG_BUFFER_FLUSH_SIZE = 100;
/** How often (ms) the buffer is flushed regardless of size. */
const LOG_BUFFER_FLUSH_INTERVAL_MS = 5_000;
/** Max length for a single plugin log message (bytes/chars). */
const MAX_LOG_MESSAGE_LENGTH = 10_000;
/** Max serialised JSON size for plugin log meta objects. */
const MAX_LOG_META_JSON_LENGTH = 50_000;
/** Max length for a metric name. */
const MAX_METRIC_NAME_LENGTH = 500;
/** Pino reserved field names that plugins must not overwrite. */
const PINO_RESERVED_KEYS = new Set([
"level",
"time",
"pid",
"hostname",
"msg",
"v",
]);
/** Truncate a string to `max` characters, appending a marker if truncated. */
function truncStr(s: string, max: number): string {
if (s.length <= max) return s;
return s.slice(0, max) + "...[truncated]";
}
/** Sanitise a plugin-supplied meta object: enforce size limit and strip reserved keys. */
function sanitiseMeta(meta: Record<string, unknown> | null | undefined): Record<string, unknown> | null {
if (meta == null) return null;
// Strip pino reserved keys
const cleaned: Record<string, unknown> = {};
for (const [k, v] of Object.entries(meta)) {
if (!PINO_RESERVED_KEYS.has(k)) {
cleaned[k] = v;
}
}
// Enforce total serialised size
let json: string;
try {
json = JSON.stringify(cleaned);
} catch {
return { _sanitised: true, _error: "meta was not JSON-serialisable" };
}
if (json.length > MAX_LOG_META_JSON_LENGTH) {
return { _sanitised: true, _error: `meta exceeded ${MAX_LOG_META_JSON_LENGTH} chars` };
}
return cleaned;
}
interface BufferedLogEntry {
db: Db;
pluginId: string;
level: string;
message: string;
meta: Record<string, unknown> | null;
}
const _logBuffer: BufferedLogEntry[] = [];
/**
* Flush all buffered log entries to the database in a single batch insert per
* unique db instance. Errors are swallowed with a console.error fallback so
* flushing never crashes the process.
*/
export async function flushPluginLogBuffer(): Promise<void> {
if (_logBuffer.length === 0) return;
// Drain the buffer atomically so concurrent flushes don't double-insert.
const entries = _logBuffer.splice(0, _logBuffer.length);
// Group entries by db identity so multi-db scenarios are handled correctly.
const byDb = new Map<Db, BufferedLogEntry[]>();
for (const entry of entries) {
const group = byDb.get(entry.db);
if (group) {
group.push(entry);
} else {
byDb.set(entry.db, [entry]);
}
}
for (const [dbInstance, group] of byDb) {
const values = group.map((e) => ({
pluginId: e.pluginId,
level: e.level,
message: e.message,
meta: e.meta,
}));
try {
await dbInstance.insert(pluginLogs).values(values);
} catch (err) {
try {
logger.warn({ err, count: values.length }, "Failed to batch-persist plugin logs to DB");
} catch {
console.error("[plugin-host-services] Batch log flush failed:", err);
}
}
}
}
/** Interval handle for the periodic log flush. */
const _logFlushInterval = setInterval(() => {
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] Periodic log flush error:", err);
});
}, LOG_BUFFER_FLUSH_INTERVAL_MS);
// Allow the interval to be unref'd so it doesn't keep the process alive in tests.
if (_logFlushInterval.unref) _logFlushInterval.unref();
/**
* buildHostServices — creates a concrete implementation of the `HostServices`
* interface for a specific plugin.
*
* This implementation delegates to the core Paperclip domain services,
* providing the bridge between the plugin worker's SDK and the host platform.
*
* @param db - Database connection instance.
* @param pluginId - The UUID of the plugin installation record.
* @param pluginKey - The unique identifier from the plugin manifest (e.g., "acme.linear").
* @param eventBus - The system-wide event bus for publishing plugin events.
* @returns An object implementing the HostServices interface for the plugin SDK.
*/
/** Maximum time (ms) to keep a session event subscription alive before forcing cleanup. */
const SESSION_EVENT_SUBSCRIPTION_TIMEOUT_MS = 30 * 60 * 1_000; // 30 minutes
export function buildHostServices(
db: Db,
pluginId: string,
pluginKey: string,
eventBus: PluginEventBus,
notifyWorker?: (method: string, params: unknown) => void,
): HostServices & { dispose(): void } {
const registry = pluginRegistryService(db);
const stateStore = pluginStateStore(db);
const secretsHandler = createPluginSecretsHandler({ db, pluginId });
const companies = companyService(db);
const agents = agentService(db);
const heartbeat = heartbeatService(db);
const projects = projectService(db);
const issues = issueService(db);
const goals = goalService(db);
const activity = activityService(db);
const costs = costService(db);
const assets = assetService(db);
const scopedBus = eventBus.forPlugin(pluginKey);
// Track active session event subscriptions for cleanup
const activeSubscriptions = new Set<{ unsubscribe: () => void; timer: ReturnType<typeof setTimeout> }>();
let disposed = false;
const ensureCompanyId = (companyId?: string) => {
if (!companyId) throw new Error("companyId is required for this operation");
return companyId;
};
/**
* Plugins are instance-wide in the current runtime. Company IDs are still
* required for company-scoped data access, but there is no per-company
* availability gate to enforce here.
*/
const ensurePluginAvailableForCompany = async (_companyId: string) => {};
const inCompany = <T extends { companyId: string | null | undefined }>(
record: T | null | undefined,
companyId: string,
): record is T => Boolean(record && record.companyId === companyId);
const requireInCompany = <T extends { companyId: string | null | undefined }>(
entityName: string,
record: T | null | undefined,
companyId: string,
): T => {
if (!inCompany(record, companyId)) {
throw new Error(`${entityName} not found`);
}
return record;
};
return {
config: {
async get() {
const configRow = await registry.getConfig(pluginId);
return configRow?.configJson ?? {};
},
},
state: {
async get(params) {
return stateStore.get(pluginId, params.scopeKind as any, params.stateKey, {
scopeId: params.scopeId,
namespace: params.namespace,
});
},
async set(params) {
await stateStore.set(pluginId, {
scopeKind: params.scopeKind as any,
scopeId: params.scopeId,
namespace: params.namespace,
stateKey: params.stateKey,
value: params.value,
});
},
async delete(params) {
await stateStore.delete(pluginId, params.scopeKind as any, params.stateKey, {
scopeId: params.scopeId,
namespace: params.namespace,
});
},
},
entities: {
async upsert(params) {
return registry.upsertEntity(pluginId, params as any) as any;
},
async list(params) {
return registry.listEntities(pluginId, params as any) as any;
},
},
events: {
async emit(params) {
if (params.companyId) {
await ensurePluginAvailableForCompany(params.companyId);
}
await scopedBus.emit(params.name, params.companyId, params.payload);
},
},
http: {
async fetch(params) {
// SSRF protection: validate protocol whitelist + block private IPs.
// Resolve once, then connect directly to that IP to prevent DNS rebinding.
const target = await validateAndResolveFetchUrl(params.url);
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), PLUGIN_FETCH_TIMEOUT_MS);
try {
const init = params.init as RequestInit | undefined;
return await executePinnedHttpRequest(target, init, controller.signal);
} finally {
clearTimeout(timeout);
}
},
},
secrets: {
async resolve(params) {
return secretsHandler.resolve(params);
},
},
assets: {
async upload(params) {
void params;
throw new Error("Plugin asset uploads are not supported in this build.");
},
async getUrl(params) {
void params;
throw new Error("Plugin asset URLs are not supported in this build.");
},
},
activity: {
async log(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
await logActivity(db, {
companyId,
actorType: "system",
actorId: pluginId,
action: params.message,
entityType: params.entityType ?? "plugin",
entityId: params.entityId ?? pluginId,
details: params.metadata,
});
},
},
metrics: {
async write(params) {
const safeName = truncStr(String(params.name ?? ""), MAX_METRIC_NAME_LENGTH);
logger.debug({ pluginId, name: safeName, value: params.value, tags: params.tags }, "Plugin metric write");
// Persist metrics to plugin_logs via the batch buffer (same path as
// logger.log) so they benefit from batched writes and are flushed
// reliably on shutdown. Using level "metric" makes them queryable
// alongside regular logs via the same API (§26).
_logBuffer.push({
db,
pluginId,
level: "metric",
message: safeName,
meta: sanitiseMeta({ value: params.value, tags: params.tags ?? null }),
});
if (_logBuffer.length >= LOG_BUFFER_FLUSH_SIZE) {
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] Triggered metric flush failed:", err);
});
}
},
},
logger: {
async log(params) {
const { level, meta } = params;
const safeMessage = truncStr(String(params.message ?? ""), MAX_LOG_MESSAGE_LENGTH);
const safeMeta = sanitiseMeta(meta);
const pluginLogger = logger.child({ service: "plugin-worker", pluginId });
const logFields = {
...safeMeta,
pluginLogLevel: level,
pluginTimestamp: new Date().toISOString(),
};
if (level === "error") pluginLogger.error(logFields, `[plugin] ${safeMessage}`);
else if (level === "warn") pluginLogger.warn(logFields, `[plugin] ${safeMessage}`);
else if (level === "debug") pluginLogger.debug(logFields, `[plugin] ${safeMessage}`);
else pluginLogger.info(logFields, `[plugin] ${safeMessage}`);
// Persist to plugin_logs table via the module-level batch buffer (§26.1).
// Fire-and-forget — logging should never block the worker.
_logBuffer.push({
db,
pluginId,
level: level ?? "info",
message: safeMessage,
meta: safeMeta,
});
if (_logBuffer.length >= LOG_BUFFER_FLUSH_SIZE) {
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] Triggered log flush failed:", err);
});
}
},
},
companies: {
async list(_params) {
return (await companies.list()) as Company[];
},
async get(params) {
await ensurePluginAvailableForCompany(params.companyId);
return (await companies.getById(params.companyId)) as Company;
},
},
projects: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return (await projects.list(companyId)) as Project[];
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const project = await projects.getById(params.projectId);
return (inCompany(project, companyId) ? project : null) as Project | null;
},
async listWorkspaces(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const project = await projects.getById(params.projectId);
if (!inCompany(project, companyId)) return [];
const rows = await projects.listWorkspaces(params.projectId);
return rows.map((row) => {
const path = sanitizeWorkspacePath(row.cwd);
const name = sanitizeWorkspaceName(row.name, path);
return {
id: row.id,
projectId: row.projectId,
name,
path,
isPrimary: row.isPrimary,
createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
};
});
},
async getPrimaryWorkspace(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const project = await projects.getById(params.projectId);
if (!inCompany(project, companyId)) return null;
const row = project.primaryWorkspace;
if (!row) return null;
const path = sanitizeWorkspacePath(row.cwd);
const name = sanitizeWorkspaceName(row.name, path);
return {
id: row.id,
projectId: row.projectId,
name,
path,
isPrimary: row.isPrimary,
createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
};
},
async getWorkspaceForIssue(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const issue = await issues.getById(params.issueId);
if (!inCompany(issue, companyId)) return null;
const projectId = (issue as Record<string, unknown>).projectId as string | null;
if (!projectId) return null;
const project = await projects.getById(projectId);
if (!inCompany(project, companyId)) return null;
const row = project.primaryWorkspace;
if (!row) return null;
const path = sanitizeWorkspacePath(row.cwd);
const name = sanitizeWorkspaceName(row.name, path);
return {
id: row.id,
projectId: row.projectId,
name,
path,
isPrimary: row.isPrimary,
createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
};
},
},
issues: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return (await issues.list(companyId, params as any)) as Issue[];
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const issue = await issues.getById(params.issueId);
return (inCompany(issue, companyId) ? issue : null) as Issue | null;
},
async create(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return (await issues.create(companyId, params as any)) as Issue;
},
async update(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
return (await issues.update(params.issueId, params.patch as any)) as Issue;
},
async listComments(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
if (!inCompany(await issues.getById(params.issueId), companyId)) return [];
return (await issues.listComments(params.issueId)) as IssueComment[];
},
async createComment(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
return (await issues.addComment(
params.issueId,
params.body,
{},
)) as IssueComment;
},
},
agents: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rows = await agents.list(companyId);
return rows.filter((agent) => !params.status || agent.status === params.status) as Agent[];
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
return (inCompany(agent, companyId) ? agent : null) as Agent | null;
},
async pause(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
return (await agents.pause(params.agentId)) as Agent;
},
async resume(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
return (await agents.resume(params.agentId)) as Agent;
},
async invoke(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
const run = await heartbeat.wakeup(params.agentId, {
source: "automation",
triggerDetail: "system",
reason: params.reason ?? null,
payload: { prompt: params.prompt },
requestedByActorType: "system",
requestedByActorId: pluginId,
});
if (!run) throw new Error("Agent wakeup was skipped by heartbeat policy");
return { runId: run.id };
},
},
goals: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rows = await goals.list(companyId);
return rows.filter((goal) =>
(!params.level || goal.level === params.level) &&
(!params.status || goal.status === params.status),
) as Goal[];
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const goal = await goals.getById(params.goalId);
return (inCompany(goal, companyId) ? goal : null) as Goal | null;
},
async create(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return (await goals.create(companyId, {
title: params.title,
description: params.description,
level: params.level as any,
status: params.status as any,
parentId: params.parentId,
ownerAgentId: params.ownerAgentId,
})) as Goal;
},
async update(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Goal", await goals.getById(params.goalId), companyId);
return (await goals.update(params.goalId, params.patch as any)) as Goal;
},
},
agentSessions: {
async create(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
const taskKey = params.taskKey ?? `plugin:${pluginKey}:session:${randomUUID()}`;
const row = await db
.insert(agentTaskSessionsTable)
.values({
companyId,
agentId: params.agentId,
adapterType: agent!.adapterType,
taskKey,
sessionParamsJson: null,
sessionDisplayId: null,
lastRunId: null,
lastError: null,
})
.returning()
.then((rows) => rows[0]);
return {
sessionId: row!.id,
agentId: params.agentId,
companyId,
status: "active" as const,
createdAt: row!.createdAt.toISOString(),
};
},
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rows = await db
.select()
.from(agentTaskSessionsTable)
.where(
and(
eq(agentTaskSessionsTable.agentId, params.agentId),
eq(agentTaskSessionsTable.companyId, companyId),
like(agentTaskSessionsTable.taskKey, `plugin:${pluginKey}:session:%`),
),
)
.orderBy(desc(agentTaskSessionsTable.createdAt));
return rows.map((row) => ({
sessionId: row.id,
agentId: row.agentId,
companyId: row.companyId,
status: "active" as const,
createdAt: row.createdAt.toISOString(),
}));
},
async sendMessage(params) {
if (disposed) {
throw new Error("Host services have been disposed");
}
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
// Verify session exists and belongs to this plugin
const session = await db
.select()
.from(agentTaskSessionsTable)
.where(
and(
eq(agentTaskSessionsTable.id, params.sessionId),
eq(agentTaskSessionsTable.companyId, companyId),
like(agentTaskSessionsTable.taskKey, `plugin:${pluginKey}:session:%`),
),
)
.then((rows) => rows[0] ?? null);
if (!session) throw new Error(`Session not found: ${params.sessionId}`);
const run = await heartbeat.wakeup(session.agentId, {
source: "automation",
triggerDetail: "system",
reason: params.reason ?? null,
payload: { prompt: params.prompt },
contextSnapshot: {
taskKey: session.taskKey,
wakeSource: "automation",
wakeTriggerDetail: "system",
},
requestedByActorType: "system",
requestedByActorId: pluginId,
});
if (!run) throw new Error("Agent wakeup was skipped by heartbeat policy");
// Subscribe to live events and forward to the plugin worker as notifications.
// Track the subscription so it can be cleaned up on dispose() if the run
// never reaches a terminal status (hang, crash, network partition).
if (notifyWorker) {
const TERMINAL_STATUSES = new Set(["succeeded", "failed", "cancelled", "timed_out"]);
const cleanup = () => {
unsubscribe();
clearTimeout(timeoutTimer);
activeSubscriptions.delete(entry);
};
const unsubscribe = subscribeCompanyLiveEvents(companyId, (event) => {
const payload = event.payload as Record<string, unknown> | undefined;
if (!payload || payload.runId !== run.id) return;
if (event.type === "heartbeat.run.log" || event.type === "heartbeat.run.event") {
notifyWorker("agents.sessions.event", {
sessionId: params.sessionId,
runId: run.id,
seq: (payload.seq as number) ?? 0,
eventType: "chunk",
stream: (payload.stream as string) ?? null,
message: (payload.chunk as string) ?? (payload.message as string) ?? null,
payload: payload,
});
} else if (event.type === "heartbeat.run.status") {
const status = payload.status as string;
if (TERMINAL_STATUSES.has(status)) {
notifyWorker("agents.sessions.event", {
sessionId: params.sessionId,
runId: run.id,
seq: 0,
eventType: status === "succeeded" ? "done" : "error",
stream: "system",
message: status === "succeeded" ? "Run completed" : `Run ${status}`,
payload: payload,
});
cleanup();
} else {
notifyWorker("agents.sessions.event", {
sessionId: params.sessionId,
runId: run.id,
seq: 0,
eventType: "status",
stream: "system",
message: `Run status: ${status}`,
payload: payload,
});
}
}
});
// Safety-net timeout: if the run never reaches a terminal status,
// force-cleanup the subscription to prevent unbounded leaks.
const timeoutTimer = setTimeout(() => {
logger.warn(
{ pluginId, pluginKey, runId: run.id },
"session event subscription timed out — forcing cleanup",
);
cleanup();
}, SESSION_EVENT_SUBSCRIPTION_TIMEOUT_MS);
const entry = { unsubscribe, timer: timeoutTimer };
activeSubscriptions.add(entry);
}
return { runId: run.id };
},
async close(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const deleted = await db
.delete(agentTaskSessionsTable)
.where(
and(
eq(agentTaskSessionsTable.id, params.sessionId),
eq(agentTaskSessionsTable.companyId, companyId),
like(agentTaskSessionsTable.taskKey, `plugin:${pluginKey}:session:%`),
),
)
.returning()
.then((rows) => rows.length);
if (deleted === 0) throw new Error(`Session not found: ${params.sessionId}`);
},
},
/**
* Clean up all active session event subscriptions and flush any buffered
* log entries. Must be called when the plugin worker is stopped, crashed,
* or unloaded to prevent leaked listeners and lost log entries.
*/
dispose() {
disposed = true;
// Snapshot to avoid iterator invalidation from concurrent sendMessage() calls
const snapshot = Array.from(activeSubscriptions);
activeSubscriptions.clear();
for (const entry of snapshot) {
clearTimeout(entry.timer);
entry.unsubscribe();
}
// Flush any buffered log entries synchronously-as-possible on dispose.
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] dispose() log flush failed:", err);
});
},
};
}