From 61fd5486e88bdb66d899a4b57e8b60b17c7dcead Mon Sep 17 00:00:00 2001 From: HD Date: Mon, 16 Mar 2026 02:10:10 +0700 Subject: [PATCH 1/2] fix: wire plugin event subscriptions from worker to host MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plugin workers register event handlers via `ctx.events.on()` in the SDK, but these subscriptions were never forwarded to the host process. The host sends events via `notifyWorker("onEvent", ...)` which produces a JSON-RPC notification (no `id`), but the worker only dispatched `onEvent` as a request handler — notifications were silently dropped. Changes: - Add `events.subscribe` RPC method so workers can register subscriptions on the host-side event bus during setup - Handle `onEvent` notifications in the worker notification dispatcher (previously only `agents.sessions.event` was handled) - Add `events.subscribe` to HostServices interface, capability map, and host client handler - Add `subscribe` handler in host services that registers on the scoped plugin event bus and forwards matched events to the worker --- packages/plugins/sdk/src/host-client-factory.ts | 7 ++++++- packages/plugins/sdk/src/protocol.ts | 4 ++++ packages/plugins/sdk/src/worker-rpc-host.ts | 15 +++++++++++++++ server/src/services/plugin-host-services.ts | 11 +++++++++++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/packages/plugins/sdk/src/host-client-factory.ts b/packages/plugins/sdk/src/host-client-factory.ts index f7829ad7..e445cc0b 100644 --- a/packages/plugins/sdk/src/host-client-factory.ts +++ b/packages/plugins/sdk/src/host-client-factory.ts @@ -103,9 +103,10 @@ export interface HostServices { list(params: WorkerToHostMethods["entities.list"][0]): Promise; }; - /** Provides `events.emit`. */ + /** Provides `events.emit` and `events.subscribe`. */ events: { emit(params: WorkerToHostMethods["events.emit"][0]): Promise; + subscribe(params: WorkerToHostMethods["events.subscribe"][0]): Promise; }; /** Provides `http.fetch`. */ @@ -261,6 +262,7 @@ const METHOD_CAPABILITY_MAP: Record { return services.events.emit(params); }), + "events.subscribe": gated("events.subscribe", async (params) => { + return services.events.subscribe(params); + }), // HTTP "http.fetch": gated("http.fetch", async (params) => { diff --git a/packages/plugins/sdk/src/protocol.ts b/packages/plugins/sdk/src/protocol.ts index 8330f680..61228b53 100644 --- a/packages/plugins/sdk/src/protocol.ts +++ b/packages/plugins/sdk/src/protocol.ts @@ -482,6 +482,10 @@ export interface WorkerToHostMethods { params: { name: string; companyId: string; payload: unknown }, result: void, ]; + "events.subscribe": [ + params: { eventPattern: string; filter?: Record | null }, + result: void, + ]; // HTTP "http.fetch": [ diff --git a/packages/plugins/sdk/src/worker-rpc-host.ts b/packages/plugins/sdk/src/worker-rpc-host.ts index 8242c261..2dbb8196 100644 --- a/packages/plugins/sdk/src/worker-rpc-host.ts +++ b/packages/plugins/sdk/src/worker-rpc-host.ts @@ -387,6 +387,13 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost registration = { name, filter: filterOrFn, fn: maybeFn }; } eventHandlers.push(registration); + // Register subscription on the host so events are forwarded to this worker + void callHost("events.subscribe", { eventPattern: name, filter: registration.filter ?? null }).catch((err) => { + notifyHost("log", { + level: "warn", + message: `Failed to subscribe to event "${name}" on host: ${err instanceof Error ? err.message : String(err)}`, + }); + }); return () => { const idx = eventHandlers.indexOf(registration); if (idx !== -1) eventHandlers.splice(idx, 1); @@ -1107,6 +1114,14 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost const event = notif.params as AgentSessionEvent; const cb = sessionEventCallbacks.get(event.sessionId); if (cb) cb(event); + } else if (notif.method === "onEvent" && notif.params) { + // Plugin event bus notifications — dispatch to registered event handlers + handleOnEvent(notif.params as OnEventParams).catch((err) => { + notifyHost("log", { + level: "error", + message: `Failed to handle event notification: ${err instanceof Error ? err.message : String(err)}`, + }); + }); } } } diff --git a/server/src/services/plugin-host-services.ts b/server/src/services/plugin-host-services.ts index 6be022e2..fd945413 100644 --- a/server/src/services/plugin-host-services.ts +++ b/server/src/services/plugin-host-services.ts @@ -556,6 +556,17 @@ export function buildHostServices( } await scopedBus.emit(params.name, params.companyId, params.payload); }, + async subscribe(params: { eventPattern: string; filter?: Record }) { + scopedBus.subscribe( + params.eventPattern as any, + params.filter as any ?? {}, + async (event) => { + if (notifyWorker) { + notifyWorker("onEvent", { event }); + } + }, + ); + }, }, http: { From 8468d347be46cb2a9e033055e039564467dfa780 Mon Sep 17 00:00:00 2001 From: HD Date: Mon, 16 Mar 2026 02:25:03 +0700 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20subscription=20cleanup,=20filter=20nullability,=20s?= =?UTF-8?q?tale=20diagram?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add scopedBus.clear() in dispose() to prevent subscription accumulation on worker crash/restart cycles - Use two-arg subscribe() overload when filter is null instead of passing empty object; fix filter type to include null - Update ASCII flow diagram: onEvent is a notification, not request/response --- packages/plugins/sdk/src/worker-rpc-host.ts | 3 +-- server/src/services/plugin-host-services.ts | 25 ++++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/plugins/sdk/src/worker-rpc-host.ts b/packages/plugins/sdk/src/worker-rpc-host.ts index 2dbb8196..1e8d5591 100644 --- a/packages/plugins/sdk/src/worker-rpc-host.ts +++ b/packages/plugins/sdk/src/worker-rpc-host.ts @@ -19,8 +19,7 @@ * |--- request(initialize) -------------> | → calls plugin.setup(ctx) * |<-- response(ok:true) ---------------- | * | | - * |--- request(onEvent) ----------------> | → dispatches to registered handler - * |<-- response(void) ------------------ | + * |--- notification(onEvent) -----------> | → dispatches to registered handler * | | * |<-- request(state.get) --------------- | ← SDK client call from plugin code * |--- response(result) ----------------> | diff --git a/server/src/services/plugin-host-services.ts b/server/src/services/plugin-host-services.ts index fd945413..2a3f5ecc 100644 --- a/server/src/services/plugin-host-services.ts +++ b/server/src/services/plugin-host-services.ts @@ -556,16 +556,17 @@ export function buildHostServices( } await scopedBus.emit(params.name, params.companyId, params.payload); }, - async subscribe(params: { eventPattern: string; filter?: Record }) { - scopedBus.subscribe( - params.eventPattern as any, - params.filter as any ?? {}, - async (event) => { - if (notifyWorker) { - notifyWorker("onEvent", { event }); - } - }, - ); + async subscribe(params: { eventPattern: string; filter?: Record | null }) { + const handler = async (event: import("@paperclipai/plugin-sdk").PluginEvent) => { + if (notifyWorker) { + notifyWorker("onEvent", { event }); + } + }; + if (params.filter) { + scopedBus.subscribe(params.eventPattern as any, params.filter as any, handler); + } else { + scopedBus.subscribe(params.eventPattern as any, handler); + } }, }, @@ -1071,6 +1072,10 @@ export function buildHostServices( dispose() { disposed = true; + // Clear event bus subscriptions to prevent accumulation on worker restart. + // Without this, each crash/restart cycle adds duplicate subscriptions. + scopedBus.clear(); + // Snapshot to avoid iterator invalidation from concurrent sendMessage() calls const snapshot = Array.from(activeSubscriptions); activeSubscriptions.clear();