From babea2564959ae94af4d7fca79fefda1bcc2d595 Mon Sep 17 00:00:00 2001 From: Dotta Date: Thu, 5 Mar 2026 14:28:59 -0600 Subject: [PATCH] feat(openclaw): add SSE-first transport and session routing --- packages/adapters/openclaw/src/index.ts | 15 +- .../adapters/openclaw/src/server/execute.ts | 411 +++++++++++++++++- .../adapters/openclaw/src/ui/build-config.ts | 3 + server/src/__tests__/openclaw-adapter.test.ts | 115 ++++- ui/src/adapters/openclaw/config-fields.tsx | 72 ++- 5 files changed, 573 insertions(+), 43 deletions(-) diff --git a/packages/adapters/openclaw/src/index.ts b/packages/adapters/openclaw/src/index.ts index 8ddc4cf4..165435e4 100644 --- a/packages/adapters/openclaw/src/index.ts +++ b/packages/adapters/openclaw/src/index.ts @@ -8,22 +8,25 @@ export const agentConfigurationDoc = `# openclaw agent configuration Adapter: openclaw Use when: -- You run an OpenClaw agent remotely and wake it via webhook. -- You want Paperclip heartbeat/task events delivered over HTTP. +- You run an OpenClaw agent remotely and wake it over HTTP. +- You want SSE-first execution so one Paperclip run captures live progress and completion. Don't use when: - You need local CLI execution inside Paperclip (use claude_local/codex_local/opencode_local/process). - The OpenClaw endpoint is not reachable from the Paperclip server. Core fields: -- url (string, required): OpenClaw webhook endpoint URL -- If the URL path is \`/hooks/wake\`, Paperclip uses OpenClaw compatibility payload (\`{ text, mode }\`). -- For full structured Paperclip context payloads, use a mapped endpoint (for example \`/hooks/paperclip\`). +- url (string, required): OpenClaw endpoint URL +- streamTransport (string, optional): \`sse\` (default) or \`webhook\` - method (string, optional): HTTP method, default POST -- headers (object, optional): extra HTTP headers for webhook calls +- headers (object, optional): extra HTTP headers for requests - webhookAuthHeader (string, optional): Authorization header value if your endpoint requires auth - payloadTemplate (object, optional): additional JSON payload fields merged into each wake payload +Session routing fields: +- sessionKeyStrategy (string, optional): \`fixed\` (default), \`issue\`, or \`run\` +- sessionKey (string, optional): fixed session key value when strategy is \`fixed\` (default \`paperclip\`) + Operational fields: - timeoutSec (number, optional): request timeout in seconds (default 30) `; diff --git a/packages/adapters/openclaw/src/server/execute.ts b/packages/adapters/openclaw/src/server/execute.ts index d3e67aa1..bf903dfb 100644 --- a/packages/adapters/openclaw/src/server/execute.ts +++ b/packages/adapters/openclaw/src/server/execute.ts @@ -2,10 +2,36 @@ import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclip import { asNumber, asString, parseObject } from "@paperclipai/adapter-utils/server-utils"; import { parseOpenClawResponse } from "./parse.js"; +type OpenClawTransport = "sse" | "webhook"; +type SessionKeyStrategy = "fixed" | "issue" | "run"; + function nonEmpty(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; } +function normalizeTransport(value: unknown): OpenClawTransport { + const normalized = asString(value, "sse").trim().toLowerCase(); + return normalized === "webhook" ? "webhook" : "sse"; +} + +function normalizeSessionKeyStrategy(value: unknown): SessionKeyStrategy { + const normalized = asString(value, "fixed").trim().toLowerCase(); + if (normalized === "issue" || normalized === "run") return normalized; + return "fixed"; +} + +function resolveSessionKey(input: { + strategy: SessionKeyStrategy; + configuredSessionKey: string | null; + runId: string; + issueId: string | null; +}): string { + const fallback = input.configuredSessionKey ?? "paperclip"; + if (input.strategy === "run") return `paperclip:run:${input.runId}`; + if (input.strategy === "issue" && input.issueId) return `paperclip:issue:${input.issueId}`; + return fallback; +} + function shouldUseWakeTextPayload(url: string): boolean { try { const parsed = new URL(url); @@ -57,6 +83,37 @@ function isTextRequiredResponse(responseText: string): boolean { return responseText.toLowerCase().includes("text required"); } +async function sendJsonRequest(params: { + url: string; + method: string; + headers: Record; + payload: Record; + signal: AbortSignal; +}): Promise { + return fetch(params.url, { + method: params.method, + headers: params.headers, + body: JSON.stringify(params.payload), + signal: params.signal, + }); +} + +async function readAndLogResponseText(params: { + response: Response; + onLog: AdapterExecutionContext["onLog"]; +}): Promise { + const responseText = await params.response.text(); + if (responseText.trim().length > 0) { + await params.onLog( + "stdout", + `[openclaw] response (${params.response.status}) ${responseText.slice(0, 2000)}\n`, + ); + } else { + await params.onLog("stdout", `[openclaw] response (${params.response.status}) \n`); + } + return responseText; +} + async function sendWebhookRequest(params: { url: string; method: string; @@ -65,21 +122,249 @@ async function sendWebhookRequest(params: { onLog: AdapterExecutionContext["onLog"]; signal: AbortSignal; }): Promise<{ response: Response; responseText: string }> { - const response = await fetch(params.url, { + const response = await sendJsonRequest({ + url: params.url, method: params.method, headers: params.headers, - body: JSON.stringify(params.payload), + payload: params.payload, signal: params.signal, }); - const responseText = await response.text(); - if (responseText.trim().length > 0) { - await params.onLog("stdout", `[openclaw] response (${response.status}) ${responseText.slice(0, 2000)}\n`); - } else { - await params.onLog("stdout", `[openclaw] response (${response.status}) \n`); + const responseText = await readAndLogResponseText({ response, onLog: params.onLog }); + return { response, responseText }; +} + +type ConsumedSse = { + eventCount: number; + lastEventType: string | null; + lastData: string | null; + lastPayload: Record | null; + terminal: boolean; + failed: boolean; + errorMessage: string | null; +}; + +function inferSseTerminal(input: { + eventType: string; + data: string; + parsedPayload: Record | null; +}): { terminal: boolean; failed: boolean; errorMessage: string | null } { + const normalizedType = input.eventType.trim().toLowerCase(); + const trimmedData = input.data.trim(); + const payload = input.parsedPayload; + const payloadType = nonEmpty(payload?.type)?.toLowerCase() ?? null; + const payloadStatus = nonEmpty(payload?.status)?.toLowerCase() ?? null; + + if (trimmedData === "[DONE]") { + return { terminal: true, failed: false, errorMessage: null }; } - return { response, responseText }; + const failType = + normalizedType.includes("error") || + normalizedType.includes("failed") || + normalizedType.includes("cancel"); + if (failType) { + return { + terminal: true, + failed: true, + errorMessage: + nonEmpty(payload?.error) ?? + nonEmpty(payload?.message) ?? + (trimmedData.length > 0 ? trimmedData : "OpenClaw SSE error"), + }; + } + + const doneType = + normalizedType === "done" || + normalizedType.endsWith(".completed") || + normalizedType.endsWith(".done") || + normalizedType === "completed"; + if (doneType) { + return { terminal: true, failed: false, errorMessage: null }; + } + + if (payloadStatus) { + if ( + payloadStatus === "completed" || + payloadStatus === "succeeded" || + payloadStatus === "done" + ) { + return { terminal: true, failed: false, errorMessage: null }; + } + if ( + payloadStatus === "failed" || + payloadStatus === "cancelled" || + payloadStatus === "error" + ) { + return { + terminal: true, + failed: true, + errorMessage: + nonEmpty(payload?.error) ?? + nonEmpty(payload?.message) ?? + `OpenClaw SSE status ${payloadStatus}`, + }; + } + } + + if (payloadType) { + if (payloadType.endsWith(".completed") || payloadType.endsWith(".done")) { + return { terminal: true, failed: false, errorMessage: null }; + } + if ( + payloadType.endsWith(".failed") || + payloadType.endsWith(".cancelled") || + payloadType.endsWith(".error") + ) { + return { + terminal: true, + failed: true, + errorMessage: + nonEmpty(payload?.error) ?? + nonEmpty(payload?.message) ?? + `OpenClaw SSE type ${payloadType}`, + }; + } + } + + if (payload?.done === true) { + return { terminal: true, failed: false, errorMessage: null }; + } + + return { terminal: false, failed: false, errorMessage: null }; +} + +async function consumeSseResponse(params: { + response: Response; + onLog: AdapterExecutionContext["onLog"]; +}): Promise { + const reader = params.response.body?.getReader(); + if (!reader) { + throw new Error("OpenClaw SSE response body is missing"); + } + + const decoder = new TextDecoder(); + let buffer = ""; + let eventType = "message"; + let dataLines: string[] = []; + let eventCount = 0; + let lastEventType: string | null = null; + let lastData: string | null = null; + let lastPayload: Record | null = null; + let terminal = false; + let failed = false; + let errorMessage: string | null = null; + + const dispatchEvent = async (): Promise => { + if (dataLines.length === 0) { + eventType = "message"; + return false; + } + + const data = dataLines.join("\n"); + const trimmedData = data.trim(); + const parsedPayload = parseOpenClawResponse(trimmedData); + + eventCount += 1; + lastEventType = eventType; + lastData = data; + if (parsedPayload) lastPayload = parsedPayload; + + const preview = + trimmedData.length > 1000 ? `${trimmedData.slice(0, 1000)}...` : trimmedData; + await params.onLog("stdout", `[openclaw:sse] event=${eventType} data=${preview}\n`); + + const resolution = inferSseTerminal({ + eventType, + data, + parsedPayload, + }); + + dataLines = []; + eventType = "message"; + + if (resolution.terminal) { + terminal = true; + failed = resolution.failed; + errorMessage = resolution.errorMessage; + return true; + } + + return false; + }; + + let shouldStop = false; + while (!shouldStop) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + while (!shouldStop) { + const newlineIndex = buffer.indexOf("\n"); + if (newlineIndex === -1) break; + + let line = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + if (line.endsWith("\r")) line = line.slice(0, -1); + + if (line.length === 0) { + shouldStop = await dispatchEvent(); + continue; + } + + if (line.startsWith(":")) continue; + + const colonIndex = line.indexOf(":"); + const field = colonIndex === -1 ? line : line.slice(0, colonIndex); + const rawValue = + colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, ""); + + if (field === "event") { + eventType = rawValue || "message"; + } else if (field === "data") { + dataLines.push(rawValue); + } + } + } + + buffer += decoder.decode(); + if (!shouldStop && buffer.trim().length > 0) { + for (const rawLine of buffer.split(/\r?\n/)) { + const line = rawLine.trimEnd(); + if (line.length === 0) { + shouldStop = await dispatchEvent(); + if (shouldStop) break; + continue; + } + if (line.startsWith(":")) continue; + + const colonIndex = line.indexOf(":"); + const field = colonIndex === -1 ? line : line.slice(0, colonIndex); + const rawValue = + colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, ""); + + if (field === "event") { + eventType = rawValue || "message"; + } else if (field === "data") { + dataLines.push(rawValue); + } + } + } + + if (!shouldStop && dataLines.length > 0) { + await dispatchEvent(); + } + + return { + eventCount, + lastEventType, + lastData, + lastPayload, + terminal, + failed, + errorMessage, + }; } export async function execute(ctx: AdapterExecutionContext): Promise { @@ -95,11 +380,13 @@ export async function execute(ctx: AdapterExecutionContext): Promise; const payloadTemplate = parseObject(config.payloadTemplate); const webhookAuthHeader = nonEmpty(config.webhookAuthHeader); + const sessionKeyStrategy = normalizeSessionKeyStrategy(config.sessionKeyStrategy); const headers: Record = { "content-type": "application/json", @@ -124,17 +411,31 @@ export async function execute(ctx: AdapterExecutionContext): Promise typeof value === "string" && value.trim().length > 0) + ? context.issueIds.filter( + (value): value is string => typeof value === "string" && value.trim().length > 0, + ) : [], }; + const sessionKey = resolveSessionKey({ + strategy: sessionKeyStrategy, + configuredSessionKey: nonEmpty(config.sessionKey), + runId, + issueId: wakePayload.issueId ?? wakePayload.taskId, + }); + const paperclipBody = { ...payloadTemplate, + stream: transport === "sse", + sessionKey, paperclip: { ...wakePayload, + sessionKey, + streamTransport: transport, context, }, }; + const wakeTextBody = { text: buildWakeText(wakePayload), mode: "now", @@ -143,18 +444,101 @@ export async function execute(ctx: AdapterExecutionContext): Promise controller.abort(), timeoutSec * 1000); try { + if (transport === "sse") { + const sseHeaders = { + ...headers, + accept: "text/event-stream", + }; + + const response = await sendJsonRequest({ + url, + method, + headers: sseHeaders, + payload: paperclipBody, + signal: controller.signal, + }); + + if (!response.ok) { + const responseText = await readAndLogResponseText({ response, onLog }); + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: `OpenClaw SSE request failed with status ${response.status}`, + errorCode: "openclaw_http_error", + resultJson: { + status: response.status, + statusText: response.statusText, + response: parseOpenClawResponse(responseText) ?? responseText, + }, + }; + } + + const contentType = (response.headers.get("content-type") ?? "").toLowerCase(); + if (!contentType.includes("text/event-stream")) { + const responseText = await readAndLogResponseText({ response, onLog }); + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: "OpenClaw SSE endpoint did not return text/event-stream", + errorCode: "openclaw_sse_expected_event_stream", + resultJson: { + status: response.status, + statusText: response.statusText, + contentType, + response: parseOpenClawResponse(responseText) ?? responseText, + }, + }; + } + + const consumed = await consumeSseResponse({ response, onLog }); + if (consumed.failed) { + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed", + errorCode: "openclaw_sse_stream_failed", + resultJson: { + eventCount: consumed.eventCount, + terminal: consumed.terminal, + lastEventType: consumed.lastEventType, + lastData: consumed.lastData, + response: consumed.lastPayload ?? consumed.lastData, + }, + }; + } + + return { + exitCode: 0, + signal: null, + timedOut: false, + provider: "openclaw", + model: null, + summary: `OpenClaw SSE ${method} ${url}`, + resultJson: { + eventCount: consumed.eventCount, + terminal: consumed.terminal, + lastEventType: consumed.lastEventType, + lastData: consumed.lastData, + response: consumed.lastPayload ?? consumed.lastData, + }, + }; + } + const preferWakeTextPayload = shouldUseWakeTextPayload(url); if (preferWakeTextPayload) { await onLog("stdout", "[openclaw] using wake text payload for /hooks/wake compatibility\n"); @@ -175,7 +559,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise): AdapterExecutionContext { +function buildContext( + config: Record, + overrides?: Partial, +): AdapterExecutionContext { return { runId: "run-123", agent: { @@ -26,38 +29,118 @@ function buildContext(config: Record): AdapterExecutionContext issueIds: ["issue-123"], }, onLog: async () => {}, + ...overrides, }; } +function sseResponse(lines: string[]) { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + for (const line of lines) { + controller.enqueue(encoder.encode(line)); + } + controller.close(); + }, + }); + return new Response(stream, { + status: 200, + statusText: "OK", + headers: { + "content-type": "text/event-stream", + }, + }); +} + afterEach(() => { vi.restoreAllMocks(); vi.unstubAllGlobals(); }); describe("openclaw adapter execute", () => { - it("sends structured paperclip payload to mapped endpoints", async () => { + it("uses SSE by default and streams into one run", async () => { const fetchMock = vi.fn().mockResolvedValue( - new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }), + sseResponse([ + 'event: response.delta\n', + 'data: {"type":"response.delta","delta":"hi"}\n\n', + 'event: response.completed\n', + 'data: {"type":"response.completed","status":"completed"}\n\n', + ]), ); vi.stubGlobal("fetch", fetchMock); + const onLog = vi.fn().mockResolvedValue(undefined); + const result = await execute( - buildContext({ - url: "https://agent.example/hooks/paperclip", - method: "POST", - payloadTemplate: { foo: "bar" }, - }), + buildContext( + { + url: "https://agent.example/gateway", + method: "POST", + payloadTemplate: { foo: "bar" }, + }, + { onLog }, + ), ); expect(result.exitCode).toBe(0); expect(fetchMock).toHaveBeenCalledTimes(1); const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; expect(body.foo).toBe("bar"); - expect(body.paperclip).toBeTypeOf("object"); + expect(body.stream).toBe(true); + expect(body.sessionKey).toBe("paperclip"); expect((body.paperclip as Record).runId).toBe("run-123"); + expect((body.paperclip as Record).sessionKey).toBe("paperclip"); + expect((body.paperclip as Record).streamTransport).toBe("sse"); + expect(onLog).toHaveBeenCalled(); }); - it("uses wake text payload for /hooks/wake endpoints", async () => { + it("derives issue session keys when configured", async () => { + const fetchMock = vi.fn().mockResolvedValue( + sseResponse([ + 'event: done\n', + 'data: [DONE]\n\n', + ]), + ); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/gateway", + method: "POST", + sessionKeyStrategy: "issue", + }), + ); + + expect(result.exitCode).toBe(0); + const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; + expect(body.sessionKey).toBe("paperclip:issue:issue-123"); + expect((body.paperclip as Record).sessionKey).toBe("paperclip:issue:issue-123"); + }); + + it("fails when SSE endpoint does not return text/event-stream", async () => { + const fetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ ok: true }), { + status: 200, + statusText: "OK", + headers: { + "content-type": "application/json", + }, + }), + ); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/gateway", + method: "POST", + }), + ); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).toBe("openclaw_sse_expected_event_stream"); + }); + + it("uses wake text payload for /hooks/wake endpoints in webhook mode", async () => { const fetchMock = vi.fn().mockResolvedValue( new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }), ); @@ -67,6 +150,7 @@ describe("openclaw adapter execute", () => { buildContext({ url: "https://agent.example/hooks/wake", method: "POST", + streamTransport: "webhook", }), ); @@ -78,7 +162,7 @@ describe("openclaw adapter execute", () => { expect(body.paperclip).toBeUndefined(); }); - it("retries with wake text payload when endpoint reports text required", async () => { + it("retries with wake text payload when endpoint reports text required in webhook mode", async () => { const fetchMock = vi .fn() .mockResolvedValueOnce( @@ -96,6 +180,7 @@ describe("openclaw adapter execute", () => { buildContext({ url: "https://agent.example/hooks/paperclip", method: "POST", + streamTransport: "webhook", }), ); @@ -114,7 +199,9 @@ describe("openclaw adapter execute", () => { describe("openclaw adapter environment checks", () => { it("reports compatibility mode info for /hooks/wake endpoints", async () => { - const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" })); + const fetchMock = vi + .fn() + .mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" })); vi.stubGlobal("fetch", fetchMock); const result = await testEnvironment({ @@ -131,7 +218,9 @@ describe("openclaw adapter environment checks", () => { }, }); - const compatibilityCheck = result.checks.find((check) => check.code === "openclaw_wake_endpoint_compat_mode"); + const compatibilityCheck = result.checks.find( + (check) => check.code === "openclaw_wake_endpoint_compat_mode", + ); expect(compatibilityCheck?.level).toBe("info"); }); }); diff --git a/ui/src/adapters/openclaw/config-fields.tsx b/ui/src/adapters/openclaw/config-fields.tsx index abad6b12..b63d0661 100644 --- a/ui/src/adapters/openclaw/config-fields.tsx +++ b/ui/src/adapters/openclaw/config-fields.tsx @@ -16,9 +16,20 @@ export function OpenClawConfigFields({ eff, mark, }: AdapterConfigFieldsProps) { + const transport = eff( + "adapterConfig", + "streamTransport", + String(config.streamTransport ?? "sse"), + ); + const sessionStrategy = eff( + "adapterConfig", + "sessionKeyStrategy", + String(config.sessionKeyStrategy ?? "fixed"), + ); + return ( <> - + {!isCreate && ( - - mark("adapterConfig", "webhookAuthHeader", v || undefined)} - immediate - className={inputClass} - placeholder="Bearer " - /> - + <> + + + + + + + + + {sessionStrategy === "fixed" && ( + + mark("adapterConfig", "sessionKey", v || undefined)} + immediate + className={inputClass} + placeholder="paperclip" + /> + + )} + + + mark("adapterConfig", "webhookAuthHeader", v || undefined)} + immediate + className={inputClass} + placeholder="Bearer " + /> + + )} );