diff --git a/packages/plugins/sdk/src/host-client-factory.ts b/packages/plugins/sdk/src/host-client-factory.ts index f7829ad7..e445cc0b 100644 --- a/packages/plugins/sdk/src/host-client-factory.ts +++ b/packages/plugins/sdk/src/host-client-factory.ts @@ -103,9 +103,10 @@ export interface HostServices { list(params: WorkerToHostMethods["entities.list"][0]): Promise; }; - /** Provides `events.emit`. */ + /** Provides `events.emit` and `events.subscribe`. */ events: { emit(params: WorkerToHostMethods["events.emit"][0]): Promise; + subscribe(params: WorkerToHostMethods["events.subscribe"][0]): Promise; }; /** Provides `http.fetch`. */ @@ -261,6 +262,7 @@ const METHOD_CAPABILITY_MAP: Record { return services.events.emit(params); }), + "events.subscribe": gated("events.subscribe", async (params) => { + return services.events.subscribe(params); + }), // HTTP "http.fetch": gated("http.fetch", async (params) => { diff --git a/packages/plugins/sdk/src/protocol.ts b/packages/plugins/sdk/src/protocol.ts index 8330f680..61228b53 100644 --- a/packages/plugins/sdk/src/protocol.ts +++ b/packages/plugins/sdk/src/protocol.ts @@ -482,6 +482,10 @@ export interface WorkerToHostMethods { params: { name: string; companyId: string; payload: unknown }, result: void, ]; + "events.subscribe": [ + params: { eventPattern: string; filter?: Record | null }, + result: void, + ]; // HTTP "http.fetch": [ diff --git a/packages/plugins/sdk/src/worker-rpc-host.ts b/packages/plugins/sdk/src/worker-rpc-host.ts index 8242c261..1e8d5591 100644 --- a/packages/plugins/sdk/src/worker-rpc-host.ts +++ b/packages/plugins/sdk/src/worker-rpc-host.ts @@ -19,8 +19,7 @@ * |--- request(initialize) -------------> | → calls plugin.setup(ctx) * |<-- response(ok:true) ---------------- | * | | - * |--- request(onEvent) ----------------> | → dispatches to registered handler - * |<-- response(void) ------------------ | + * |--- notification(onEvent) -----------> | → dispatches to registered handler * | | * |<-- request(state.get) --------------- | ← SDK client call from plugin code * |--- response(result) ----------------> | @@ -387,6 +386,13 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost registration = { name, filter: filterOrFn, fn: maybeFn }; } eventHandlers.push(registration); + // Register subscription on the host so events are forwarded to this worker + void callHost("events.subscribe", { eventPattern: name, filter: registration.filter ?? null }).catch((err) => { + notifyHost("log", { + level: "warn", + message: `Failed to subscribe to event "${name}" on host: ${err instanceof Error ? err.message : String(err)}`, + }); + }); return () => { const idx = eventHandlers.indexOf(registration); if (idx !== -1) eventHandlers.splice(idx, 1); @@ -1107,6 +1113,14 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost const event = notif.params as AgentSessionEvent; const cb = sessionEventCallbacks.get(event.sessionId); if (cb) cb(event); + } else if (notif.method === "onEvent" && notif.params) { + // Plugin event bus notifications — dispatch to registered event handlers + handleOnEvent(notif.params as OnEventParams).catch((err) => { + notifyHost("log", { + level: "error", + message: `Failed to handle event notification: ${err instanceof Error ? err.message : String(err)}`, + }); + }); } } } diff --git a/server/src/services/plugin-host-services.ts b/server/src/services/plugin-host-services.ts index 6be022e2..2a3f5ecc 100644 --- a/server/src/services/plugin-host-services.ts +++ b/server/src/services/plugin-host-services.ts @@ -556,6 +556,18 @@ export function buildHostServices( } await scopedBus.emit(params.name, params.companyId, params.payload); }, + async subscribe(params: { eventPattern: string; filter?: Record | null }) { + const handler = async (event: import("@paperclipai/plugin-sdk").PluginEvent) => { + if (notifyWorker) { + notifyWorker("onEvent", { event }); + } + }; + if (params.filter) { + scopedBus.subscribe(params.eventPattern as any, params.filter as any, handler); + } else { + scopedBus.subscribe(params.eventPattern as any, handler); + } + }, }, http: { @@ -1060,6 +1072,10 @@ export function buildHostServices( dispose() { disposed = true; + // Clear event bus subscriptions to prevent accumulation on worker restart. + // Without this, each crash/restart cycle adds duplicate subscriptions. + scopedBus.clear(); + // Snapshot to avoid iterator invalidation from concurrent sendMessage() calls const snapshot = Array.from(activeSubscriptions); activeSubscriptions.clear();