diff --git a/packages/plugins/sdk/src/worker-rpc-host.ts b/packages/plugins/sdk/src/worker-rpc-host.ts index 2dbb8196..1e8d5591 100644 --- a/packages/plugins/sdk/src/worker-rpc-host.ts +++ b/packages/plugins/sdk/src/worker-rpc-host.ts @@ -19,8 +19,7 @@ * |--- request(initialize) -------------> | → calls plugin.setup(ctx) * |<-- response(ok:true) ---------------- | * | | - * |--- request(onEvent) ----------------> | → dispatches to registered handler - * |<-- response(void) ------------------ | + * |--- notification(onEvent) -----------> | → dispatches to registered handler * | | * |<-- request(state.get) --------------- | ← SDK client call from plugin code * |--- response(result) ----------------> | diff --git a/server/src/services/plugin-host-services.ts b/server/src/services/plugin-host-services.ts index fd945413..2a3f5ecc 100644 --- a/server/src/services/plugin-host-services.ts +++ b/server/src/services/plugin-host-services.ts @@ -556,16 +556,17 @@ export function buildHostServices( } await scopedBus.emit(params.name, params.companyId, params.payload); }, - async subscribe(params: { eventPattern: string; filter?: Record }) { - scopedBus.subscribe( - params.eventPattern as any, - params.filter as any ?? {}, - async (event) => { - if (notifyWorker) { - notifyWorker("onEvent", { event }); - } - }, - ); + async subscribe(params: { eventPattern: string; filter?: Record | null }) { + const handler = async (event: import("@paperclipai/plugin-sdk").PluginEvent) => { + if (notifyWorker) { + notifyWorker("onEvent", { event }); + } + }; + if (params.filter) { + scopedBus.subscribe(params.eventPattern as any, params.filter as any, handler); + } else { + scopedBus.subscribe(params.eventPattern as any, handler); + } }, }, @@ -1071,6 +1072,10 @@ export function buildHostServices( dispose() { disposed = true; + // Clear event bus subscriptions to prevent accumulation on worker restart. + // Without this, each crash/restart cycle adds duplicate subscriptions. + scopedBus.clear(); + // Snapshot to avoid iterator invalidation from concurrent sendMessage() calls const snapshot = Array.from(activeSubscriptions); activeSubscriptions.clear();