From 8468d347be46cb2a9e033055e039564467dfa780 Mon Sep 17 00:00:00 2001 From: HD Date: Mon, 16 Mar 2026 02:25:03 +0700 Subject: [PATCH] =?UTF-8?q?fix:=20address=20review=20feedback=20=E2=80=94?= =?UTF-8?q?=20subscription=20cleanup,=20filter=20nullability,=20stale=20di?= =?UTF-8?q?agram?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add scopedBus.clear() in dispose() to prevent subscription accumulation on worker crash/restart cycles - Use two-arg subscribe() overload when filter is null instead of passing empty object; fix filter type to include null - Update ASCII flow diagram: onEvent is a notification, not request/response --- packages/plugins/sdk/src/worker-rpc-host.ts | 3 +-- server/src/services/plugin-host-services.ts | 25 ++++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/plugins/sdk/src/worker-rpc-host.ts b/packages/plugins/sdk/src/worker-rpc-host.ts index 2dbb8196..1e8d5591 100644 --- a/packages/plugins/sdk/src/worker-rpc-host.ts +++ b/packages/plugins/sdk/src/worker-rpc-host.ts @@ -19,8 +19,7 @@ * |--- request(initialize) -------------> | → calls plugin.setup(ctx) * |<-- response(ok:true) ---------------- | * | | - * |--- request(onEvent) ----------------> | → dispatches to registered handler - * |<-- response(void) ------------------ | + * |--- notification(onEvent) -----------> | → dispatches to registered handler * | | * |<-- request(state.get) --------------- | ← SDK client call from plugin code * |--- response(result) ----------------> | diff --git a/server/src/services/plugin-host-services.ts b/server/src/services/plugin-host-services.ts index fd945413..2a3f5ecc 100644 --- a/server/src/services/plugin-host-services.ts +++ b/server/src/services/plugin-host-services.ts @@ -556,16 +556,17 @@ export function buildHostServices( } await scopedBus.emit(params.name, params.companyId, params.payload); }, - async subscribe(params: { eventPattern: string; filter?: Record }) { - scopedBus.subscribe( - params.eventPattern as any, - params.filter as any ?? {}, - async (event) => { - if (notifyWorker) { - notifyWorker("onEvent", { event }); - } - }, - ); + async subscribe(params: { eventPattern: string; filter?: Record | null }) { + const handler = async (event: import("@paperclipai/plugin-sdk").PluginEvent) => { + if (notifyWorker) { + notifyWorker("onEvent", { event }); + } + }; + if (params.filter) { + scopedBus.subscribe(params.eventPattern as any, params.filter as any, handler); + } else { + scopedBus.subscribe(params.eventPattern as any, handler); + } }, }, @@ -1071,6 +1072,10 @@ export function buildHostServices( dispose() { disposed = true; + // Clear event bus subscriptions to prevent accumulation on worker restart. + // Without this, each crash/restart cycle adds duplicate subscriptions. + scopedBus.clear(); + // Snapshot to avoid iterator invalidation from concurrent sendMessage() calls const snapshot = Array.from(activeSubscriptions); activeSubscriptions.clear();