From 61fd5486e88bdb66d899a4b57e8b60b17c7dcead Mon Sep 17 00:00:00 2001 From: HD Date: Mon, 16 Mar 2026 02:10:10 +0700 Subject: [PATCH] fix: wire plugin event subscriptions from worker to host MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plugin workers register event handlers via `ctx.events.on()` in the SDK, but these subscriptions were never forwarded to the host process. The host sends events via `notifyWorker("onEvent", ...)` which produces a JSON-RPC notification (no `id`), but the worker only dispatched `onEvent` as a request handler — notifications were silently dropped. Changes: - Add `events.subscribe` RPC method so workers can register subscriptions on the host-side event bus during setup - Handle `onEvent` notifications in the worker notification dispatcher (previously only `agents.sessions.event` was handled) - Add `events.subscribe` to HostServices interface, capability map, and host client handler - Add `subscribe` handler in host services that registers on the scoped plugin event bus and forwards matched events to the worker --- packages/plugins/sdk/src/host-client-factory.ts | 7 ++++++- packages/plugins/sdk/src/protocol.ts | 4 ++++ packages/plugins/sdk/src/worker-rpc-host.ts | 15 +++++++++++++++ server/src/services/plugin-host-services.ts | 11 +++++++++++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/packages/plugins/sdk/src/host-client-factory.ts b/packages/plugins/sdk/src/host-client-factory.ts index f7829ad7..e445cc0b 100644 --- a/packages/plugins/sdk/src/host-client-factory.ts +++ b/packages/plugins/sdk/src/host-client-factory.ts @@ -103,9 +103,10 @@ export interface HostServices { list(params: WorkerToHostMethods["entities.list"][0]): Promise; }; - /** Provides `events.emit`. */ + /** Provides `events.emit` and `events.subscribe`. */ events: { emit(params: WorkerToHostMethods["events.emit"][0]): Promise; + subscribe(params: WorkerToHostMethods["events.subscribe"][0]): Promise; }; /** Provides `http.fetch`. */ @@ -261,6 +262,7 @@ const METHOD_CAPABILITY_MAP: Record { return services.events.emit(params); }), + "events.subscribe": gated("events.subscribe", async (params) => { + return services.events.subscribe(params); + }), // HTTP "http.fetch": gated("http.fetch", async (params) => { diff --git a/packages/plugins/sdk/src/protocol.ts b/packages/plugins/sdk/src/protocol.ts index 8330f680..61228b53 100644 --- a/packages/plugins/sdk/src/protocol.ts +++ b/packages/plugins/sdk/src/protocol.ts @@ -482,6 +482,10 @@ export interface WorkerToHostMethods { params: { name: string; companyId: string; payload: unknown }, result: void, ]; + "events.subscribe": [ + params: { eventPattern: string; filter?: Record | null }, + result: void, + ]; // HTTP "http.fetch": [ diff --git a/packages/plugins/sdk/src/worker-rpc-host.ts b/packages/plugins/sdk/src/worker-rpc-host.ts index 8242c261..2dbb8196 100644 --- a/packages/plugins/sdk/src/worker-rpc-host.ts +++ b/packages/plugins/sdk/src/worker-rpc-host.ts @@ -387,6 +387,13 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost registration = { name, filter: filterOrFn, fn: maybeFn }; } eventHandlers.push(registration); + // Register subscription on the host so events are forwarded to this worker + void callHost("events.subscribe", { eventPattern: name, filter: registration.filter ?? null }).catch((err) => { + notifyHost("log", { + level: "warn", + message: `Failed to subscribe to event "${name}" on host: ${err instanceof Error ? err.message : String(err)}`, + }); + }); return () => { const idx = eventHandlers.indexOf(registration); if (idx !== -1) eventHandlers.splice(idx, 1); @@ -1107,6 +1114,14 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost const event = notif.params as AgentSessionEvent; const cb = sessionEventCallbacks.get(event.sessionId); if (cb) cb(event); + } else if (notif.method === "onEvent" && notif.params) { + // Plugin event bus notifications — dispatch to registered event handlers + handleOnEvent(notif.params as OnEventParams).catch((err) => { + notifyHost("log", { + level: "error", + message: `Failed to handle event notification: ${err instanceof Error ? err.message : String(err)}`, + }); + }); } } } diff --git a/server/src/services/plugin-host-services.ts b/server/src/services/plugin-host-services.ts index 6be022e2..fd945413 100644 --- a/server/src/services/plugin-host-services.ts +++ b/server/src/services/plugin-host-services.ts @@ -556,6 +556,17 @@ export function buildHostServices( } await scopedBus.emit(params.name, params.companyId, params.payload); }, + async subscribe(params: { eventPattern: string; filter?: Record }) { + scopedBus.subscribe( + params.eventPattern as any, + params.filter as any ?? {}, + async (event) => { + if (notifyWorker) { + notifyWorker("onEvent", { event }); + } + }, + ); + }, }, http: {