Cut over OpenClaw adapter to strict SSE streaming

This commit is contained in:
Dotta
2026-03-05 15:54:55 -06:00
parent 35a7acc058
commit 0cc75c6e10
11 changed files with 617 additions and 494 deletions

View File

@@ -0,0 +1,48 @@
import { describe, expect, it } from "vitest";
import { shouldWakeAssigneeOnCheckout } from "../routes/issues-checkout-wakeup.js";
describe("shouldWakeAssigneeOnCheckout", () => {
it("keeps wakeup behavior for board actors", () => {
expect(
shouldWakeAssigneeOnCheckout({
actorType: "board",
actorAgentId: null,
checkoutAgentId: "agent-1",
checkoutRunId: null,
}),
).toBe(true);
});
it("skips wakeup for agent self-checkout in an active run", () => {
expect(
shouldWakeAssigneeOnCheckout({
actorType: "agent",
actorAgentId: "agent-1",
checkoutAgentId: "agent-1",
checkoutRunId: "run-1",
}),
).toBe(false);
});
it("still wakes when checkout run id is missing", () => {
expect(
shouldWakeAssigneeOnCheckout({
actorType: "agent",
actorAgentId: "agent-1",
checkoutAgentId: "agent-1",
checkoutRunId: null,
}),
).toBe(true);
});
it("still wakes when agent checks out on behalf of another agent id", () => {
expect(
shouldWakeAssigneeOnCheckout({
actorType: "agent",
actorAgentId: "agent-1",
checkoutAgentId: "agent-2",
checkoutRunId: "run-1",
}),
).toBe(true);
});
});

View File

@@ -58,28 +58,21 @@ afterEach(() => {
});
describe("openclaw adapter execute", () => {
it("uses SSE by default and streams into one run", async () => {
it("uses strict SSE and includes canonical PAPERCLIP context in text payload", async () => {
const fetchMock = vi.fn().mockResolvedValue(
sseResponse([
'event: response.delta\n',
'data: {"type":"response.delta","delta":"hi"}\n\n',
'event: response.completed\n',
"event: response.completed\n",
'data: {"type":"response.completed","status":"completed"}\n\n',
]),
);
vi.stubGlobal("fetch", fetchMock);
const onLog = vi.fn<AdapterExecutionContext["onLog"]>().mockResolvedValue(undefined);
const result = await execute(
buildContext(
{
url: "https://agent.example/gateway",
method: "POST",
payloadTemplate: { foo: "bar" },
},
{ onLog },
),
buildContext({
url: "https://agent.example/sse",
method: "POST",
payloadTemplate: { foo: "bar", text: "OpenClaw task prompt" },
}),
);
expect(result.exitCode).toBe(0);
@@ -88,24 +81,34 @@ describe("openclaw adapter execute", () => {
expect(body.foo).toBe("bar");
expect(body.stream).toBe(true);
expect(body.sessionKey).toBe("paperclip");
expect((body.paperclip as Record<string, unknown>).streamTransport).toBe("sse");
expect((body.paperclip as Record<string, unknown>).runId).toBe("run-123");
expect((body.paperclip as Record<string, unknown>).sessionKey).toBe("paperclip");
expect((body.paperclip as Record<string, unknown>).streamTransport).toBe("sse");
expect(onLog).toHaveBeenCalled();
expect(
((body.paperclip as Record<string, unknown>).env as Record<string, unknown>).PAPERCLIP_RUN_ID,
).toBe("run-123");
const text = String(body.text ?? "");
expect(text).toContain("OpenClaw task prompt");
expect(text).toContain("PAPERCLIP_RUN_ID=run-123");
expect(text).toContain("PAPERCLIP_AGENT_ID=agent-123");
expect(text).toContain("PAPERCLIP_COMPANY_ID=company-123");
expect(text).toContain("PAPERCLIP_TASK_ID=task-123");
expect(text).toContain("PAPERCLIP_WAKE_REASON=issue_assigned");
expect(text).toContain("PAPERCLIP_LINKED_ISSUE_IDS=issue-123");
});
it("derives issue session keys when configured", async () => {
const fetchMock = vi.fn().mockResolvedValue(
sseResponse([
'event: done\n',
'data: [DONE]\n\n',
"event: done\n",
"data: [DONE]\n\n",
]),
);
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/gateway",
url: "https://agent.example/sse",
method: "POST",
sessionKeyStrategy: "issue",
}),
@@ -117,7 +120,43 @@ describe("openclaw adapter execute", () => {
expect((body.paperclip as Record<string, unknown>).sessionKey).toBe("paperclip:issue:issue-123");
});
it("fails when SSE endpoint does not return text/event-stream and no compatibility fallback applies", async () => {
it("maps requests to OpenResponses schema for /v1/responses endpoints", async () => {
const fetchMock = vi.fn().mockResolvedValue(
sseResponse([
"event: response.completed\n",
'data: {"type":"response.completed","status":"completed"}\n\n',
]),
);
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/v1/responses",
method: "POST",
payloadTemplate: {
model: "openclaw",
user: "paperclip",
},
}),
);
expect(result.exitCode).toBe(0);
const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
expect(body.stream).toBe(true);
expect(body.model).toBe("openclaw");
expect(typeof body.input).toBe("string");
expect(String(body.input)).toContain("PAPERCLIP_RUN_ID=run-123");
expect(body.metadata).toBeTypeOf("object");
expect((body.metadata as Record<string, unknown>).PAPERCLIP_RUN_ID).toBe("run-123");
expect(body.text).toBeUndefined();
expect(body.paperclip).toBeUndefined();
expect(body.sessionKey).toBeUndefined();
const headers = (fetchMock.mock.calls[0]?.[1]?.headers ?? {}) as Record<string, string>;
expect(headers["x-openclaw-session-key"]).toBe("paperclip");
});
it("fails when SSE endpoint does not return text/event-stream", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ ok: false, error: "unexpected payload" }), {
status: 200,
@@ -131,7 +170,7 @@ describe("openclaw adapter execute", () => {
const result = await execute(
buildContext({
url: "https://agent.example/gateway",
url: "https://agent.example/sse",
method: "POST",
}),
);
@@ -140,11 +179,30 @@ describe("openclaw adapter execute", () => {
expect(result.errorCode).toBe("openclaw_sse_expected_event_stream");
});
it("treats webhook-style JSON ack as compatibility success when SSE endpoint returns JSON", async () => {
it("fails when SSE stream closes without a terminal event", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ ok: true, runId: "oc-run-1" }), {
status: 200,
statusText: "OK",
sseResponse([
"event: response.delta\n",
'data: {"type":"response.delta","delta":"partial"}\n\n',
]),
);
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/sse",
}),
);
expect(result.exitCode).toBe(1);
expect(result.errorCode).toBe("openclaw_sse_stream_incomplete");
});
it("fails with explicit text-required error when endpoint rejects payload", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ error: "text required" }), {
status: 400,
statusText: "Bad Request",
headers: {
"content-type": "application/json",
},
@@ -154,102 +212,48 @@ describe("openclaw adapter execute", () => {
const result = await execute(
buildContext({
url: "https://agent.example/hooks/paperclip",
method: "POST",
url: "https://agent.example/sse",
}),
);
expect(result.exitCode).toBe(0);
expect(result.resultJson?.compatibilityMode).toBe("json_ack");
expect(result.resultJson?.transportFallback).toBe("webhook");
expect(result.exitCode).toBe(1);
expect(result.errorCode).toBe("openclaw_text_required");
});
it("falls back to wake text payload when SSE is configured against /hooks/wake", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }),
it("rejects non-sse transport configuration", async () => {
const fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/sse",
streamTransport: "webhook",
}),
);
expect(result.exitCode).toBe(1);
expect(result.errorCode).toBe("openclaw_stream_transport_unsupported");
expect(fetchMock).not.toHaveBeenCalled();
});
it("rejects /hooks/wake compatibility endpoints in strict SSE mode", async () => {
const fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/hooks/wake",
method: "POST",
}),
);
expect(result.exitCode).toBe(0);
expect(fetchMock).toHaveBeenCalledTimes(1);
const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
expect(body.mode).toBe("now");
expect(typeof body.text).toBe("string");
expect(body.paperclip).toBeTypeOf("object");
expect((body.paperclip as Record<string, unknown>).runId).toBe("run-123");
expect(result.resultJson?.compatibilityMode).toBe("wake_text");
expect(result.resultJson?.transportFallback).toBe("webhook");
});
it("uses wake text payload for /hooks/wake endpoints in webhook mode", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }),
);
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/hooks/wake",
method: "POST",
streamTransport: "webhook",
}),
);
expect(result.exitCode).toBe(0);
expect(fetchMock).toHaveBeenCalledTimes(1);
const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
expect(body.mode).toBe("now");
expect(typeof body.text).toBe("string");
expect(body.paperclip).toBeTypeOf("object");
expect((body.paperclip as Record<string, unknown>).runId).toBe("run-123");
});
it("retries with wake text payload when endpoint reports text required in webhook mode", async () => {
const fetchMock = vi
.fn()
.mockResolvedValueOnce(
new Response(JSON.stringify({ ok: false, error: "text required" }), {
status: 400,
statusText: "Bad Request",
}),
)
.mockResolvedValueOnce(
new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }),
);
vi.stubGlobal("fetch", fetchMock);
const result = await execute(
buildContext({
url: "https://agent.example/hooks/paperclip",
method: "POST",
streamTransport: "webhook",
}),
);
expect(result.exitCode).toBe(0);
expect(fetchMock).toHaveBeenCalledTimes(2);
const firstBody = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
expect(firstBody.paperclip).toBeTypeOf("object");
const secondBody = JSON.parse(String(fetchMock.mock.calls[1]?.[1]?.body ?? "{}")) as Record<string, unknown>;
expect(secondBody.mode).toBe("now");
expect(typeof secondBody.text).toBe("string");
expect(secondBody.paperclip).toBeTypeOf("object");
expect((secondBody.paperclip as Record<string, unknown>).runId).toBe("run-123");
expect(result.resultJson?.compatibilityMode).toBe("wake_text");
expect(result.exitCode).toBe(1);
expect(result.errorCode).toBe("openclaw_sse_incompatible_endpoint");
expect(fetchMock).not.toHaveBeenCalled();
});
});
describe("openclaw adapter environment checks", () => {
it("reports compatibility mode info for /hooks/wake endpoints", async () => {
it("reports /hooks/wake endpoints as incompatible for strict SSE mode", async () => {
const fetchMock = vi
.fn()
.mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" }));
@@ -269,9 +273,26 @@ describe("openclaw adapter environment checks", () => {
},
});
const compatibilityCheck = result.checks.find(
(check) => check.code === "openclaw_wake_endpoint_compat_mode",
);
expect(compatibilityCheck?.level).toBe("info");
const check = result.checks.find((entry) => entry.code === "openclaw_wake_endpoint_incompatible");
expect(check?.level).toBe("error");
});
it("reports unsupported streamTransport settings", async () => {
const fetchMock = vi
.fn()
.mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" }));
vi.stubGlobal("fetch", fetchMock);
const result = await testEnvironment({
companyId: "company-123",
adapterType: "openclaw",
config: {
url: "https://agent.example/sse",
streamTransport: "webhook",
},
});
const check = result.checks.find((entry) => entry.code === "openclaw_stream_transport_unsupported");
expect(check?.level).toBe("error");
});
});

View File

@@ -104,6 +104,11 @@ function isLoopbackHost(hostname: string): boolean {
return value === "localhost" || value === "127.0.0.1" || value === "::1";
}
function isWakePath(pathname: string): boolean {
const value = pathname.trim().toLowerCase();
return value === "/hooks/wake" || value.endsWith("/hooks/wake");
}
function normalizeHostname(value: string | null | undefined): string | null {
if (!value) return null;
const trimmed = value.trim();
@@ -217,13 +222,13 @@ function normalizeAgentDefaultsForJoin(input: {
code: "openclaw_callback_config_missing",
level: "warn",
message: "No OpenClaw callback config was provided in agentDefaultsPayload.",
hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw webhook immediately after approval.",
hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw SSE endpoint immediately after approval.",
});
return { normalized: null as Record<string, unknown> | null, diagnostics };
}
const defaults = input.defaultsPayload as Record<string, unknown>;
const normalized: Record<string, unknown> = {};
const normalized: Record<string, unknown> = { streamTransport: "sse" };
let callbackUrl: URL | null = null;
const rawUrl = typeof defaults.url === "string" ? defaults.url.trim() : "";
@@ -232,7 +237,7 @@ function normalizeAgentDefaultsForJoin(input: {
code: "openclaw_callback_url_missing",
level: "warn",
message: "OpenClaw callback URL is missing.",
hint: "Set agentDefaultsPayload.url to your OpenClaw webhook endpoint.",
hint: "Set agentDefaultsPayload.url to your OpenClaw SSE endpoint.",
});
} else {
try {
@@ -252,6 +257,14 @@ function normalizeAgentDefaultsForJoin(input: {
message: `Callback endpoint set to ${callbackUrl.toString()}`,
});
}
if (isWakePath(callbackUrl.pathname)) {
diagnostics.push({
code: "openclaw_callback_wake_path_incompatible",
level: "warn",
message: "Configured callback path targets /hooks/wake, which is not stream-capable for strict SSE mode.",
hint: "Use an endpoint that returns text/event-stream for the full run duration.",
});
}
if (isLoopbackHost(callbackUrl.hostname)) {
diagnostics.push({
code: "openclaw_callback_loopback",
@@ -273,7 +286,7 @@ function normalizeAgentDefaultsForJoin(input: {
normalized.method = rawMethod || "POST";
if (typeof defaults.timeoutSec === "number" && Number.isFinite(defaults.timeoutSec)) {
normalized.timeoutSec = Math.max(1, Math.min(120, Math.floor(defaults.timeoutSec)));
normalized.timeoutSec = Math.max(0, Math.min(7200, Math.floor(defaults.timeoutSec)));
}
const headers = normalizeHeaderMap(defaults.headers);
@@ -470,10 +483,10 @@ function buildInviteOnboardingManifest(
requiredFields: {
requestType: "agent",
agentName: "Display name for this agent",
adapterType: "Use 'openclaw' for OpenClaw webhook-based agents",
adapterType: "Use 'openclaw' for OpenClaw streaming agents",
capabilities: "Optional capability summary",
agentDefaultsPayload:
"Optional adapter config such as url/method/headers/webhookAuthHeader for OpenClaw callback endpoint",
"Optional adapter config such as url/method/headers/webhookAuthHeader for OpenClaw SSE endpoint",
},
registrationEndpoint: {
method: "POST",
@@ -498,7 +511,7 @@ function buildInviteOnboardingManifest(
path: testResolutionPath,
url: testResolutionUrl,
query: {
url: "https://your-openclaw-webhook.example/webhook",
url: "https://your-openclaw-agent.example/v1/responses",
timeoutMs: 5000,
},
},
@@ -579,10 +592,11 @@ export function buildInviteOnboardingTextDocument(
' "adapterType": "openclaw",',
' "capabilities": "Optional summary",',
' "agentDefaultsPayload": {',
' "url": "https://your-openclaw-webhook.example/webhook",',
' "url": "https://your-openclaw-agent.example/v1/responses",',
' "streamTransport": "sse",',
' "method": "POST",',
' "headers": { "x-openclaw-auth": "replace-me" },',
' "timeoutSec": 30',
' "timeoutSec": 0',
" }",
"}",
"",
@@ -622,9 +636,9 @@ export function buildInviteOnboardingTextDocument(
lines.push(
"",
"## Optional: test callback resolution from Paperclip",
`${onboarding.connectivity.testResolutionEndpoint.method ?? "GET"} ${onboarding.connectivity.testResolutionEndpoint.url}?url=https%3A%2F%2Fyour-openclaw-webhook.example%2Fwebhook`,
`${onboarding.connectivity.testResolutionEndpoint.method ?? "GET"} ${onboarding.connectivity.testResolutionEndpoint.url}?url=https%3A%2F%2Fyour-openclaw-agent.example%2Fv1%2Fresponses`,
"",
"This endpoint checks whether Paperclip can reach your webhook URL and reports reachable, timeout, or unreachable.",
"This endpoint checks whether Paperclip can reach your OpenClaw endpoint and reports reachable, timeout, or unreachable.",
);
}

View File

@@ -0,0 +1,14 @@
type CheckoutWakeInput = {
actorType: "board" | "agent" | "none";
actorAgentId: string | null;
checkoutAgentId: string;
checkoutRunId: string | null;
};
export function shouldWakeAssigneeOnCheckout(input: CheckoutWakeInput): boolean {
if (input.actorType !== "agent") return true;
if (!input.actorAgentId) return true;
if (input.actorAgentId !== input.checkoutAgentId) return true;
if (!input.checkoutRunId) return true;
return false;
}

View File

@@ -25,6 +25,7 @@ import {
import { logger } from "../middleware/logger.js";
import { forbidden, HttpError, unauthorized } from "../errors.js";
import { assertCompanyAccess, getActorInfo } from "./authz.js";
import { shouldWakeAssigneeOnCheckout } from "./issues-checkout-wakeup.js";
const MAX_ATTACHMENT_BYTES = Number(process.env.PAPERCLIP_ATTACHMENT_MAX_BYTES) || 10 * 1024 * 1024;
const ALLOWED_ATTACHMENT_CONTENT_TYPES = new Set([
@@ -634,17 +635,26 @@ export function issueRoutes(db: Db, storage: StorageService) {
details: { agentId: req.body.agentId },
});
void heartbeat
.wakeup(req.body.agentId, {
source: "assignment",
triggerDetail: "system",
reason: "issue_checked_out",
payload: { issueId: issue.id, mutation: "checkout" },
requestedByActorType: actor.actorType,
requestedByActorId: actor.actorId,
contextSnapshot: { issueId: issue.id, source: "issue.checkout" },
if (
shouldWakeAssigneeOnCheckout({
actorType: req.actor.type,
actorAgentId: req.actor.type === "agent" ? req.actor.agentId ?? null : null,
checkoutAgentId: req.body.agentId,
checkoutRunId,
})
.catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue checkout"));
) {
void heartbeat
.wakeup(req.body.agentId, {
source: "assignment",
triggerDetail: "system",
reason: "issue_checked_out",
payload: { issueId: issue.id, mutation: "checkout" },
requestedByActorType: actor.actorType,
requestedByActorId: actor.actorId,
contextSnapshot: { issueId: issue.id, source: "issue.checkout" },
})
.catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue checkout"));
}
res.json(updated);
});