feat(openclaw): add SSE-first transport and session routing

This commit is contained in:
Dotta
2026-03-05 14:28:59 -06:00
parent e9ffde610b
commit babea25649
5 changed files with 573 additions and 43 deletions

View File

@@ -8,22 +8,25 @@ export const agentConfigurationDoc = `# openclaw agent configuration
Adapter: openclaw
Use when:
- You run an OpenClaw agent remotely and wake it via webhook.
- You want Paperclip heartbeat/task events delivered over HTTP.
- You run an OpenClaw agent remotely and wake it over HTTP.
- You want SSE-first execution so one Paperclip run captures live progress and completion.
Don't use when:
- You need local CLI execution inside Paperclip (use claude_local/codex_local/opencode_local/process).
- The OpenClaw endpoint is not reachable from the Paperclip server.
Core fields:
- url (string, required): OpenClaw webhook endpoint URL
- If the URL path is \`/hooks/wake\`, Paperclip uses OpenClaw compatibility payload (\`{ text, mode }\`).
- For full structured Paperclip context payloads, use a mapped endpoint (for example \`/hooks/paperclip\`).
- url (string, required): OpenClaw endpoint URL
- streamTransport (string, optional): \`sse\` (default) or \`webhook\`
- method (string, optional): HTTP method, default POST
- headers (object, optional): extra HTTP headers for webhook calls
- headers (object, optional): extra HTTP headers for requests
- webhookAuthHeader (string, optional): Authorization header value if your endpoint requires auth
- payloadTemplate (object, optional): additional JSON payload fields merged into each wake payload
Session routing fields:
- sessionKeyStrategy (string, optional): \`fixed\` (default), \`issue\`, or \`run\`
- sessionKey (string, optional): fixed session key value when strategy is \`fixed\` (default \`paperclip\`)
Operational fields:
- timeoutSec (number, optional): request timeout in seconds (default 30)
`;

View File

@@ -2,10 +2,36 @@ import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclip
import { asNumber, asString, parseObject } from "@paperclipai/adapter-utils/server-utils";
import { parseOpenClawResponse } from "./parse.js";
type OpenClawTransport = "sse" | "webhook";
type SessionKeyStrategy = "fixed" | "issue" | "run";
function nonEmpty(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
}
function normalizeTransport(value: unknown): OpenClawTransport {
const normalized = asString(value, "sse").trim().toLowerCase();
return normalized === "webhook" ? "webhook" : "sse";
}
function normalizeSessionKeyStrategy(value: unknown): SessionKeyStrategy {
const normalized = asString(value, "fixed").trim().toLowerCase();
if (normalized === "issue" || normalized === "run") return normalized;
return "fixed";
}
function resolveSessionKey(input: {
strategy: SessionKeyStrategy;
configuredSessionKey: string | null;
runId: string;
issueId: string | null;
}): string {
const fallback = input.configuredSessionKey ?? "paperclip";
if (input.strategy === "run") return `paperclip:run:${input.runId}`;
if (input.strategy === "issue" && input.issueId) return `paperclip:issue:${input.issueId}`;
return fallback;
}
function shouldUseWakeTextPayload(url: string): boolean {
try {
const parsed = new URL(url);
@@ -57,6 +83,37 @@ function isTextRequiredResponse(responseText: string): boolean {
return responseText.toLowerCase().includes("text required");
}
async function sendJsonRequest(params: {
url: string;
method: string;
headers: Record<string, string>;
payload: Record<string, unknown>;
signal: AbortSignal;
}): Promise<Response> {
return fetch(params.url, {
method: params.method,
headers: params.headers,
body: JSON.stringify(params.payload),
signal: params.signal,
});
}
async function readAndLogResponseText(params: {
response: Response;
onLog: AdapterExecutionContext["onLog"];
}): Promise<string> {
const responseText = await params.response.text();
if (responseText.trim().length > 0) {
await params.onLog(
"stdout",
`[openclaw] response (${params.response.status}) ${responseText.slice(0, 2000)}\n`,
);
} else {
await params.onLog("stdout", `[openclaw] response (${params.response.status}) <empty>\n`);
}
return responseText;
}
async function sendWebhookRequest(params: {
url: string;
method: string;
@@ -65,21 +122,249 @@ async function sendWebhookRequest(params: {
onLog: AdapterExecutionContext["onLog"];
signal: AbortSignal;
}): Promise<{ response: Response; responseText: string }> {
const response = await fetch(params.url, {
const response = await sendJsonRequest({
url: params.url,
method: params.method,
headers: params.headers,
body: JSON.stringify(params.payload),
payload: params.payload,
signal: params.signal,
});
const responseText = await response.text();
if (responseText.trim().length > 0) {
await params.onLog("stdout", `[openclaw] response (${response.status}) ${responseText.slice(0, 2000)}\n`);
} else {
await params.onLog("stdout", `[openclaw] response (${response.status}) <empty>\n`);
const responseText = await readAndLogResponseText({ response, onLog: params.onLog });
return { response, responseText };
}
type ConsumedSse = {
eventCount: number;
lastEventType: string | null;
lastData: string | null;
lastPayload: Record<string, unknown> | null;
terminal: boolean;
failed: boolean;
errorMessage: string | null;
};
function inferSseTerminal(input: {
eventType: string;
data: string;
parsedPayload: Record<string, unknown> | null;
}): { terminal: boolean; failed: boolean; errorMessage: string | null } {
const normalizedType = input.eventType.trim().toLowerCase();
const trimmedData = input.data.trim();
const payload = input.parsedPayload;
const payloadType = nonEmpty(payload?.type)?.toLowerCase() ?? null;
const payloadStatus = nonEmpty(payload?.status)?.toLowerCase() ?? null;
if (trimmedData === "[DONE]") {
return { terminal: true, failed: false, errorMessage: null };
}
return { response, responseText };
const failType =
normalizedType.includes("error") ||
normalizedType.includes("failed") ||
normalizedType.includes("cancel");
if (failType) {
return {
terminal: true,
failed: true,
errorMessage:
nonEmpty(payload?.error) ??
nonEmpty(payload?.message) ??
(trimmedData.length > 0 ? trimmedData : "OpenClaw SSE error"),
};
}
const doneType =
normalizedType === "done" ||
normalizedType.endsWith(".completed") ||
normalizedType.endsWith(".done") ||
normalizedType === "completed";
if (doneType) {
return { terminal: true, failed: false, errorMessage: null };
}
if (payloadStatus) {
if (
payloadStatus === "completed" ||
payloadStatus === "succeeded" ||
payloadStatus === "done"
) {
return { terminal: true, failed: false, errorMessage: null };
}
if (
payloadStatus === "failed" ||
payloadStatus === "cancelled" ||
payloadStatus === "error"
) {
return {
terminal: true,
failed: true,
errorMessage:
nonEmpty(payload?.error) ??
nonEmpty(payload?.message) ??
`OpenClaw SSE status ${payloadStatus}`,
};
}
}
if (payloadType) {
if (payloadType.endsWith(".completed") || payloadType.endsWith(".done")) {
return { terminal: true, failed: false, errorMessage: null };
}
if (
payloadType.endsWith(".failed") ||
payloadType.endsWith(".cancelled") ||
payloadType.endsWith(".error")
) {
return {
terminal: true,
failed: true,
errorMessage:
nonEmpty(payload?.error) ??
nonEmpty(payload?.message) ??
`OpenClaw SSE type ${payloadType}`,
};
}
}
if (payload?.done === true) {
return { terminal: true, failed: false, errorMessage: null };
}
return { terminal: false, failed: false, errorMessage: null };
}
async function consumeSseResponse(params: {
response: Response;
onLog: AdapterExecutionContext["onLog"];
}): Promise<ConsumedSse> {
const reader = params.response.body?.getReader();
if (!reader) {
throw new Error("OpenClaw SSE response body is missing");
}
const decoder = new TextDecoder();
let buffer = "";
let eventType = "message";
let dataLines: string[] = [];
let eventCount = 0;
let lastEventType: string | null = null;
let lastData: string | null = null;
let lastPayload: Record<string, unknown> | null = null;
let terminal = false;
let failed = false;
let errorMessage: string | null = null;
const dispatchEvent = async (): Promise<boolean> => {
if (dataLines.length === 0) {
eventType = "message";
return false;
}
const data = dataLines.join("\n");
const trimmedData = data.trim();
const parsedPayload = parseOpenClawResponse(trimmedData);
eventCount += 1;
lastEventType = eventType;
lastData = data;
if (parsedPayload) lastPayload = parsedPayload;
const preview =
trimmedData.length > 1000 ? `${trimmedData.slice(0, 1000)}...` : trimmedData;
await params.onLog("stdout", `[openclaw:sse] event=${eventType} data=${preview}\n`);
const resolution = inferSseTerminal({
eventType,
data,
parsedPayload,
});
dataLines = [];
eventType = "message";
if (resolution.terminal) {
terminal = true;
failed = resolution.failed;
errorMessage = resolution.errorMessage;
return true;
}
return false;
};
let shouldStop = false;
while (!shouldStop) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
while (!shouldStop) {
const newlineIndex = buffer.indexOf("\n");
if (newlineIndex === -1) break;
let line = buffer.slice(0, newlineIndex);
buffer = buffer.slice(newlineIndex + 1);
if (line.endsWith("\r")) line = line.slice(0, -1);
if (line.length === 0) {
shouldStop = await dispatchEvent();
continue;
}
if (line.startsWith(":")) continue;
const colonIndex = line.indexOf(":");
const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
const rawValue =
colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, "");
if (field === "event") {
eventType = rawValue || "message";
} else if (field === "data") {
dataLines.push(rawValue);
}
}
}
buffer += decoder.decode();
if (!shouldStop && buffer.trim().length > 0) {
for (const rawLine of buffer.split(/\r?\n/)) {
const line = rawLine.trimEnd();
if (line.length === 0) {
shouldStop = await dispatchEvent();
if (shouldStop) break;
continue;
}
if (line.startsWith(":")) continue;
const colonIndex = line.indexOf(":");
const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
const rawValue =
colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, "");
if (field === "event") {
eventType = rawValue || "message";
} else if (field === "data") {
dataLines.push(rawValue);
}
}
}
if (!shouldStop && dataLines.length > 0) {
await dispatchEvent();
}
return {
eventCount,
lastEventType,
lastData,
lastPayload,
terminal,
failed,
errorMessage,
};
}
export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExecutionResult> {
@@ -95,11 +380,13 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
const transport = normalizeTransport(config.streamTransport);
const method = asString(config.method, "POST").trim().toUpperCase() || "POST";
const timeoutSec = Math.max(1, asNumber(config.timeoutSec, 30));
const headersConfig = parseObject(config.headers) as Record<string, unknown>;
const payloadTemplate = parseObject(config.payloadTemplate);
const webhookAuthHeader = nonEmpty(config.webhookAuthHeader);
const sessionKeyStrategy = normalizeSessionKeyStrategy(config.sessionKeyStrategy);
const headers: Record<string, string> = {
"content-type": "application/json",
@@ -124,17 +411,31 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
approvalId: nonEmpty(context.approvalId),
approvalStatus: nonEmpty(context.approvalStatus),
issueIds: Array.isArray(context.issueIds)
? context.issueIds.filter((value): value is string => typeof value === "string" && value.trim().length > 0)
? context.issueIds.filter(
(value): value is string => typeof value === "string" && value.trim().length > 0,
)
: [],
};
const sessionKey = resolveSessionKey({
strategy: sessionKeyStrategy,
configuredSessionKey: nonEmpty(config.sessionKey),
runId,
issueId: wakePayload.issueId ?? wakePayload.taskId,
});
const paperclipBody = {
...payloadTemplate,
stream: transport === "sse",
sessionKey,
paperclip: {
...wakePayload,
sessionKey,
streamTransport: transport,
context,
},
};
const wakeTextBody = {
text: buildWakeText(wakePayload),
mode: "now",
@@ -143,18 +444,101 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (onMeta) {
await onMeta({
adapterType: "openclaw",
command: "webhook",
command: transport === "sse" ? "sse" : "webhook",
commandArgs: [method, url],
context,
});
}
await onLog("stdout", `[openclaw] invoking ${method} ${url}\n`);
await onLog("stdout", `[openclaw] invoking ${method} ${url} (transport=${transport})\n`);
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), timeoutSec * 1000);
try {
if (transport === "sse") {
const sseHeaders = {
...headers,
accept: "text/event-stream",
};
const response = await sendJsonRequest({
url,
method,
headers: sseHeaders,
payload: paperclipBody,
signal: controller.signal,
});
if (!response.ok) {
const responseText = await readAndLogResponseText({ response, onLog });
return {
exitCode: 1,
signal: null,
timedOut: false,
errorMessage: `OpenClaw SSE request failed with status ${response.status}`,
errorCode: "openclaw_http_error",
resultJson: {
status: response.status,
statusText: response.statusText,
response: parseOpenClawResponse(responseText) ?? responseText,
},
};
}
const contentType = (response.headers.get("content-type") ?? "").toLowerCase();
if (!contentType.includes("text/event-stream")) {
const responseText = await readAndLogResponseText({ response, onLog });
return {
exitCode: 1,
signal: null,
timedOut: false,
errorMessage: "OpenClaw SSE endpoint did not return text/event-stream",
errorCode: "openclaw_sse_expected_event_stream",
resultJson: {
status: response.status,
statusText: response.statusText,
contentType,
response: parseOpenClawResponse(responseText) ?? responseText,
},
};
}
const consumed = await consumeSseResponse({ response, onLog });
if (consumed.failed) {
return {
exitCode: 1,
signal: null,
timedOut: false,
errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed",
errorCode: "openclaw_sse_stream_failed",
resultJson: {
eventCount: consumed.eventCount,
terminal: consumed.terminal,
lastEventType: consumed.lastEventType,
lastData: consumed.lastData,
response: consumed.lastPayload ?? consumed.lastData,
},
};
}
return {
exitCode: 0,
signal: null,
timedOut: false,
provider: "openclaw",
model: null,
summary: `OpenClaw SSE ${method} ${url}`,
resultJson: {
eventCount: consumed.eventCount,
terminal: consumed.terminal,
lastEventType: consumed.lastEventType,
lastData: consumed.lastData,
response: consumed.lastPayload ?? consumed.lastData,
},
};
}
const preferWakeTextPayload = shouldUseWakeTextPayload(url);
if (preferWakeTextPayload) {
await onLog("stdout", "[openclaw] using wake text payload for /hooks/wake compatibility\n");
@@ -175,7 +559,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const canRetryWithWakeText = !preferWakeTextPayload && isTextRequiredResponse(responseText);
if (canRetryWithWakeText) {
await onLog("stdout", "[openclaw] endpoint requires text payload; retrying with wake compatibility format\n");
await onLog(
"stdout",
"[openclaw] endpoint requires text payload; retrying with wake compatibility format\n",
);
const retry = await sendWebhookRequest({
url,

View File

@@ -5,5 +5,8 @@ export function buildOpenClawConfig(v: CreateConfigValues): Record<string, unkno
if (v.url) ac.url = v.url;
ac.method = "POST";
ac.timeoutSec = 30;
ac.streamTransport = "sse";
ac.sessionKeyStrategy = "fixed";
ac.sessionKey = "paperclip";
return ac;
}

View File

@@ -2,7 +2,10 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import { execute, testEnvironment } from "@paperclipai/adapter-openclaw/server";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
function buildContext(config: Record<string, unknown>): AdapterExecutionContext {
function buildContext(
config: Record<string, unknown>,
overrides?: Partial<AdapterExecutionContext>,
): AdapterExecutionContext {
return {
runId: "run-123",
agent: {
@@ -26,38 +29,118 @@ function buildContext(config: Record<string, unknown>): AdapterExecutionContext
issueIds: ["issue-123"],
},
onLog: async () => {},
...overrides,
};
}
function sseResponse(lines: string[]) {
const encoder = new TextEncoder();
const stream = new ReadableStream<Uint8Array>({
start(controller) {
for (const line of lines) {
controller.enqueue(encoder.encode(line));
}
controller.close();
},
});
return new Response(stream, {
status: 200,
statusText: "OK",
headers: {
"content-type": "text/event-stream",
},
});
}
afterEach(() => {
vi.restoreAllMocks();
vi.unstubAllGlobals();
});
describe("openclaw adapter execute", () => {
it("sends structured paperclip payload to mapped endpoints", async () => {
it("uses SSE by default and streams into one run", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }),
sseResponse([
'event: response.delta\n',
'data: {"type":"response.delta","delta":"hi"}\n\n',
'event: response.completed\n',
'data: {"type":"response.completed","status":"completed"}\n\n',
]),
);
vi.stubGlobal("fetch", fetchMock);
const onLog = vi.fn<AdapterExecutionContext["onLog"]>().mockResolvedValue(undefined);
const result = await execute(
buildContext({
url: "https://agent.example/hooks/paperclip",
method: "POST",
payloadTemplate: { foo: "bar" },
}),
buildContext(
{
url: "https://agent.example/gateway",
method: "POST",
payloadTemplate: { foo: "bar" },
},
{ onLog },
),
);
expect(result.exitCode).toBe(0);
expect(fetchMock).toHaveBeenCalledTimes(1);
const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
expect(body.foo).toBe("bar");
expect(body.paperclip).toBeTypeOf("object");
expect(body.stream).toBe(true);
expect(body.sessionKey).toBe("paperclip");
expect((body.paperclip as Record<string, unknown>).runId).toBe("run-123");
expect((body.paperclip as Record<string, unknown>).sessionKey).toBe("paperclip");
expect((body.paperclip as Record<string, unknown>).streamTransport).toBe("sse");
expect(onLog).toHaveBeenCalled();
});
it("uses wake text payload for /hooks/wake endpoints", async () => {
it("derives issue session keys when configured", async () => {
const fetchMock = vi.fn().mockResolvedValue(
sseResponse([
'event: done\n',
'data: [DONE]\n\n',
]),
);
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/gateway",
method: "POST",
sessionKeyStrategy: "issue",
}),
);
expect(result.exitCode).toBe(0);
const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
expect(body.sessionKey).toBe("paperclip:issue:issue-123");
expect((body.paperclip as Record<string, unknown>).sessionKey).toBe("paperclip:issue:issue-123");
});
it("fails when SSE endpoint does not return text/event-stream", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ ok: true }), {
status: 200,
statusText: "OK",
headers: {
"content-type": "application/json",
},
}),
);
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/gateway",
method: "POST",
}),
);
expect(result.exitCode).toBe(1);
expect(result.errorCode).toBe("openclaw_sse_expected_event_stream");
});
it("uses wake text payload for /hooks/wake endpoints in webhook mode", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }),
);
@@ -67,6 +150,7 @@ describe("openclaw adapter execute", () => {
buildContext({
url: "https://agent.example/hooks/wake",
method: "POST",
streamTransport: "webhook",
}),
);
@@ -78,7 +162,7 @@ describe("openclaw adapter execute", () => {
expect(body.paperclip).toBeUndefined();
});
it("retries with wake text payload when endpoint reports text required", async () => {
it("retries with wake text payload when endpoint reports text required in webhook mode", async () => {
const fetchMock = vi
.fn()
.mockResolvedValueOnce(
@@ -96,6 +180,7 @@ describe("openclaw adapter execute", () => {
buildContext({
url: "https://agent.example/hooks/paperclip",
method: "POST",
streamTransport: "webhook",
}),
);
@@ -114,7 +199,9 @@ describe("openclaw adapter execute", () => {
describe("openclaw adapter environment checks", () => {
it("reports compatibility mode info for /hooks/wake endpoints", async () => {
const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" }));
const fetchMock = vi
.fn()
.mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" }));
vi.stubGlobal("fetch", fetchMock);
const result = await testEnvironment({
@@ -131,7 +218,9 @@ describe("openclaw adapter environment checks", () => {
},
});
const compatibilityCheck = result.checks.find((check) => check.code === "openclaw_wake_endpoint_compat_mode");
const compatibilityCheck = result.checks.find(
(check) => check.code === "openclaw_wake_endpoint_compat_mode",
);
expect(compatibilityCheck?.level).toBe("info");
});
});

View File

@@ -16,9 +16,20 @@ export function OpenClawConfigFields({
eff,
mark,
}: AdapterConfigFieldsProps) {
const transport = eff(
"adapterConfig",
"streamTransport",
String(config.streamTransport ?? "sse"),
);
const sessionStrategy = eff(
"adapterConfig",
"sessionKeyStrategy",
String(config.sessionKeyStrategy ?? "fixed"),
);
return (
<>
<Field label="Webhook URL" hint={help.webhookUrl}>
<Field label="Gateway URL" hint={help.webhookUrl}>
<DraftInput
value={
isCreate
@@ -36,17 +47,54 @@ export function OpenClawConfigFields({
/>
</Field>
{!isCreate && (
<Field label="Webhook auth header (optional)">
<DraftInput
value={
eff("adapterConfig", "webhookAuthHeader", String(config.webhookAuthHeader ?? ""))
}
onCommit={(v) => mark("adapterConfig", "webhookAuthHeader", v || undefined)}
immediate
className={inputClass}
placeholder="Bearer <token>"
/>
</Field>
<>
<Field label="Transport">
<select
value={transport}
onChange={(e) => mark("adapterConfig", "streamTransport", e.target.value)}
className={inputClass}
>
<option value="sse">SSE (recommended)</option>
<option value="webhook">Webhook</option>
</select>
</Field>
<Field label="Session strategy">
<select
value={sessionStrategy}
onChange={(e) => mark("adapterConfig", "sessionKeyStrategy", e.target.value)}
className={inputClass}
>
<option value="fixed">Fixed</option>
<option value="issue">Per issue</option>
<option value="run">Per run</option>
</select>
</Field>
{sessionStrategy === "fixed" && (
<Field label="Session key">
<DraftInput
value={eff("adapterConfig", "sessionKey", String(config.sessionKey ?? "paperclip"))}
onCommit={(v) => mark("adapterConfig", "sessionKey", v || undefined)}
immediate
className={inputClass}
placeholder="paperclip"
/>
</Field>
)}
<Field label="Webhook auth header (optional)">
<DraftInput
value={
eff("adapterConfig", "webhookAuthHeader", String(config.webhookAuthHeader ?? ""))
}
onCommit={(v) => mark("adapterConfig", "webhookAuthHeader", v || undefined)}
immediate
className={inputClass}
placeholder="Bearer <token>"
/>
</Field>
</>
)}
</>
);