fix: wire plugin event subscriptions from worker to host
Plugin workers register event handlers via `ctx.events.on()` in the SDK,
but these subscriptions were never forwarded to the host process. The host
sends events via `notifyWorker("onEvent", ...)` which produces a JSON-RPC
notification (no `id`), but the worker only dispatched `onEvent` as a
request handler — notifications were silently dropped.
Changes:
- Add `events.subscribe` RPC method so workers can register subscriptions
on the host-side event bus during setup
- Handle `onEvent` notifications in the worker notification dispatcher
(previously only `agents.sessions.event` was handled)
- Add `events.subscribe` to HostServices interface, capability map, and
host client handler
- Add `subscribe` handler in host services that registers on the scoped
plugin event bus and forwards matched events to the worker
This commit is contained in:
@@ -103,9 +103,10 @@ export interface HostServices {
|
|||||||
list(params: WorkerToHostMethods["entities.list"][0]): Promise<WorkerToHostMethods["entities.list"][1]>;
|
list(params: WorkerToHostMethods["entities.list"][0]): Promise<WorkerToHostMethods["entities.list"][1]>;
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Provides `events.emit`. */
|
/** Provides `events.emit` and `events.subscribe`. */
|
||||||
events: {
|
events: {
|
||||||
emit(params: WorkerToHostMethods["events.emit"][0]): Promise<void>;
|
emit(params: WorkerToHostMethods["events.emit"][0]): Promise<void>;
|
||||||
|
subscribe(params: WorkerToHostMethods["events.subscribe"][0]): Promise<void>;
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Provides `http.fetch`. */
|
/** Provides `http.fetch`. */
|
||||||
@@ -261,6 +262,7 @@ const METHOD_CAPABILITY_MAP: Record<WorkerToHostMethodName, PluginCapability | n
|
|||||||
|
|
||||||
// Events
|
// Events
|
||||||
"events.emit": "events.emit",
|
"events.emit": "events.emit",
|
||||||
|
"events.subscribe": "events.subscribe",
|
||||||
|
|
||||||
// HTTP
|
// HTTP
|
||||||
"http.fetch": "http.outbound",
|
"http.fetch": "http.outbound",
|
||||||
@@ -407,6 +409,9 @@ export function createHostClientHandlers(
|
|||||||
"events.emit": gated("events.emit", async (params) => {
|
"events.emit": gated("events.emit", async (params) => {
|
||||||
return services.events.emit(params);
|
return services.events.emit(params);
|
||||||
}),
|
}),
|
||||||
|
"events.subscribe": gated("events.subscribe", async (params) => {
|
||||||
|
return services.events.subscribe(params);
|
||||||
|
}),
|
||||||
|
|
||||||
// HTTP
|
// HTTP
|
||||||
"http.fetch": gated("http.fetch", async (params) => {
|
"http.fetch": gated("http.fetch", async (params) => {
|
||||||
|
|||||||
@@ -482,6 +482,10 @@ export interface WorkerToHostMethods {
|
|||||||
params: { name: string; companyId: string; payload: unknown },
|
params: { name: string; companyId: string; payload: unknown },
|
||||||
result: void,
|
result: void,
|
||||||
];
|
];
|
||||||
|
"events.subscribe": [
|
||||||
|
params: { eventPattern: string; filter?: Record<string, unknown> | null },
|
||||||
|
result: void,
|
||||||
|
];
|
||||||
|
|
||||||
// HTTP
|
// HTTP
|
||||||
"http.fetch": [
|
"http.fetch": [
|
||||||
|
|||||||
@@ -387,6 +387,13 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost
|
|||||||
registration = { name, filter: filterOrFn, fn: maybeFn };
|
registration = { name, filter: filterOrFn, fn: maybeFn };
|
||||||
}
|
}
|
||||||
eventHandlers.push(registration);
|
eventHandlers.push(registration);
|
||||||
|
// Register subscription on the host so events are forwarded to this worker
|
||||||
|
void callHost("events.subscribe", { eventPattern: name, filter: registration.filter ?? null }).catch((err) => {
|
||||||
|
notifyHost("log", {
|
||||||
|
level: "warn",
|
||||||
|
message: `Failed to subscribe to event "${name}" on host: ${err instanceof Error ? err.message : String(err)}`,
|
||||||
|
});
|
||||||
|
});
|
||||||
return () => {
|
return () => {
|
||||||
const idx = eventHandlers.indexOf(registration);
|
const idx = eventHandlers.indexOf(registration);
|
||||||
if (idx !== -1) eventHandlers.splice(idx, 1);
|
if (idx !== -1) eventHandlers.splice(idx, 1);
|
||||||
@@ -1107,6 +1114,14 @@ export function startWorkerRpcHost(options: WorkerRpcHostOptions): WorkerRpcHost
|
|||||||
const event = notif.params as AgentSessionEvent;
|
const event = notif.params as AgentSessionEvent;
|
||||||
const cb = sessionEventCallbacks.get(event.sessionId);
|
const cb = sessionEventCallbacks.get(event.sessionId);
|
||||||
if (cb) cb(event);
|
if (cb) cb(event);
|
||||||
|
} else if (notif.method === "onEvent" && notif.params) {
|
||||||
|
// Plugin event bus notifications — dispatch to registered event handlers
|
||||||
|
handleOnEvent(notif.params as OnEventParams).catch((err) => {
|
||||||
|
notifyHost("log", {
|
||||||
|
level: "error",
|
||||||
|
message: `Failed to handle event notification: ${err instanceof Error ? err.message : String(err)}`,
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -556,6 +556,17 @@ export function buildHostServices(
|
|||||||
}
|
}
|
||||||
await scopedBus.emit(params.name, params.companyId, params.payload);
|
await scopedBus.emit(params.name, params.companyId, params.payload);
|
||||||
},
|
},
|
||||||
|
async subscribe(params: { eventPattern: string; filter?: Record<string, unknown> }) {
|
||||||
|
scopedBus.subscribe(
|
||||||
|
params.eventPattern as any,
|
||||||
|
params.filter as any ?? {},
|
||||||
|
async (event) => {
|
||||||
|
if (notifyWorker) {
|
||||||
|
notifyWorker("onEvent", { event });
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
http: {
|
http: {
|
||||||
|
|||||||
Reference in New Issue
Block a user