diff --git a/package.json b/package.json index b0ac83d5..ad6d692f 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,8 @@ "check:tokens": "node scripts/check-forbidden-tokens.mjs", "docs:dev": "cd docs && npx mintlify dev", "smoke:openclaw-join": "./scripts/smoke/openclaw-join.sh", - "smoke:openclaw-docker-ui": "./scripts/smoke/openclaw-docker-ui.sh" + "smoke:openclaw-docker-ui": "./scripts/smoke/openclaw-docker-ui.sh", + "smoke:openclaw-sse-standalone": "./scripts/smoke/openclaw-sse-standalone.sh" }, "devDependencies": { "@changesets/cli": "^2.30.0", diff --git a/packages/adapters/openclaw/src/index.ts b/packages/adapters/openclaw/src/index.ts index 165435e4..88968ee1 100644 --- a/packages/adapters/openclaw/src/index.ts +++ b/packages/adapters/openclaw/src/index.ts @@ -16,8 +16,8 @@ Don't use when: - The OpenClaw endpoint is not reachable from the Paperclip server. Core fields: -- url (string, required): OpenClaw endpoint URL -- streamTransport (string, optional): \`sse\` (default) or \`webhook\` +- url (string, required): OpenClaw SSE endpoint URL +- streamTransport (string, optional): must be \`sse\` when provided - method (string, optional): HTTP method, default POST - headers (object, optional): extra HTTP headers for requests - webhookAuthHeader (string, optional): Authorization header value if your endpoint requires auth @@ -28,5 +28,5 @@ Session routing fields: - sessionKey (string, optional): fixed session key value when strategy is \`fixed\` (default \`paperclip\`) Operational fields: -- timeoutSec (number, optional): request timeout in seconds (default 30) +- timeoutSec (number, optional): SSE request timeout in seconds (default 0 = no adapter timeout) `; diff --git a/packages/adapters/openclaw/src/server/execute.ts b/packages/adapters/openclaw/src/server/execute.ts index 677cf231..95949964 100644 --- a/packages/adapters/openclaw/src/server/execute.ts +++ b/packages/adapters/openclaw/src/server/execute.ts @@ -1,19 +1,13 @@ import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils"; -import { asNumber, asString, parseObject } from "@paperclipai/adapter-utils/server-utils"; +import { asNumber, asString, buildPaperclipEnv, parseObject } from "@paperclipai/adapter-utils/server-utils"; import { parseOpenClawResponse } from "./parse.js"; -type OpenClawTransport = "sse" | "webhook"; type SessionKeyStrategy = "fixed" | "issue" | "run"; function nonEmpty(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; } -function normalizeTransport(value: unknown): OpenClawTransport { - const normalized = asString(value, "sse").trim().toLowerCase(); - return normalized === "webhook" ? "webhook" : "sse"; -} - function normalizeSessionKeyStrategy(value: unknown): SessionKeyStrategy { const normalized = asString(value, "fixed").trim().toLowerCase(); if (normalized === "issue" || normalized === "run") return normalized; @@ -32,7 +26,7 @@ function resolveSessionKey(input: { return fallback; } -function shouldUseWakeTextPayload(url: string): boolean { +function isWakeCompatibilityEndpoint(url: string): boolean { try { const parsed = new URL(url); const path = parsed.pathname.toLowerCase(); @@ -42,7 +36,28 @@ function shouldUseWakeTextPayload(url: string): boolean { } } -function buildWakeText(payload: { +function isOpenResponsesEndpoint(url: string): boolean { + try { + const parsed = new URL(url); + const path = parsed.pathname.toLowerCase(); + return path === "/v1/responses" || path.endsWith("/v1/responses"); + } catch { + return false; + } +} + +function toStringRecord(value: unknown): Record { + const parsed = parseObject(value); + const out: Record = {}; + for (const [key, entry] of Object.entries(parsed)) { + if (typeof entry === "string") { + out[key] = entry; + } + } + return out; +} + +type WakePayload = { runId: string; agentId: string; companyId: string; @@ -53,22 +68,43 @@ function buildWakeText(payload: { approvalId: string | null; approvalStatus: string | null; issueIds: string[]; -}): string { - const lines = [ - "Paperclip wake event.", - "", - `runId: ${payload.runId}`, - `agentId: ${payload.agentId}`, - `companyId: ${payload.companyId}`, +}; + +function buildWakeText(payload: WakePayload, paperclipEnv: Record): string { + const orderedKeys = [ + "PAPERCLIP_RUN_ID", + "PAPERCLIP_AGENT_ID", + "PAPERCLIP_COMPANY_ID", + "PAPERCLIP_API_URL", + "PAPERCLIP_TASK_ID", + "PAPERCLIP_WAKE_REASON", + "PAPERCLIP_WAKE_COMMENT_ID", + "PAPERCLIP_APPROVAL_ID", + "PAPERCLIP_APPROVAL_STATUS", + "PAPERCLIP_LINKED_ISSUE_IDS", ]; - if (payload.taskId) lines.push(`taskId: ${payload.taskId}`); - if (payload.issueId) lines.push(`issueId: ${payload.issueId}`); - if (payload.wakeReason) lines.push(`wakeReason: ${payload.wakeReason}`); - if (payload.wakeCommentId) lines.push(`wakeCommentId: ${payload.wakeCommentId}`); - if (payload.approvalId) lines.push(`approvalId: ${payload.approvalId}`); - if (payload.approvalStatus) lines.push(`approvalStatus: ${payload.approvalStatus}`); - if (payload.issueIds.length > 0) lines.push(`issueIds: ${payload.issueIds.join(",")}`); + const envLines: string[] = []; + for (const key of orderedKeys) { + const value = paperclipEnv[key]; + if (!value) continue; + envLines.push(`${key}=${value}`); + } + + const lines = [ + "Paperclip wake event for a cloud adapter.", + "", + "Set these values in your run context:", + ...envLines, + "", + `task_id=${payload.taskId ?? ""}`, + `issue_id=${payload.issueId ?? ""}`, + `wake_reason=${payload.wakeReason ?? ""}`, + `wake_comment_id=${payload.wakeCommentId ?? ""}`, + `approval_id=${payload.approvalId ?? ""}`, + `approval_status=${payload.approvalStatus ?? ""}`, + `linked_issue_ids=${payload.issueIds.join(",")}`, + ]; lines.push("", "Run your Paperclip heartbeat procedure now."); return lines.join("\n"); @@ -83,13 +119,6 @@ function isTextRequiredResponse(responseText: string): boolean { return responseText.toLowerCase().includes("text required"); } -function isWebhookAcceptedResponse(parsed: Record | null): boolean { - if (!parsed) return false; - if (parsed.ok === true) return true; - const status = nonEmpty(parsed.status)?.toLowerCase(); - return status === "ok" || status === "accepted"; -} - async function sendJsonRequest(params: { url: string; method: string; @@ -121,26 +150,6 @@ async function readAndLogResponseText(params: { return responseText; } -async function sendWebhookRequest(params: { - url: string; - method: string; - headers: Record; - payload: Record; - onLog: AdapterExecutionContext["onLog"]; - signal: AbortSignal; -}): Promise<{ response: Response; responseText: string }> { - const response = await sendJsonRequest({ - url: params.url, - method: params.method, - headers: params.headers, - payload: params.payload, - signal: params.signal, - }); - - const responseText = await readAndLogResponseText({ response, onLog: params.onLog }); - return { response, responseText }; -} - type ConsumedSse = { eventCount: number; lastEventType: string | null; @@ -387,9 +396,30 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 ? Math.max(1, Math.floor(timeoutSecRaw)) : 0; const headersConfig = parseObject(config.headers) as Record; const payloadTemplate = parseObject(config.payloadTemplate); const webhookAuthHeader = nonEmpty(config.webhookAuthHeader); @@ -431,252 +461,137 @@ export async function execute(ctx: AdapterExecutionContext): Promise = { + ...buildPaperclipEnv(agent), + PAPERCLIP_RUN_ID: runId, }; + if (wakePayload.taskId) paperclipEnv.PAPERCLIP_TASK_ID = wakePayload.taskId; + if (wakePayload.wakeReason) paperclipEnv.PAPERCLIP_WAKE_REASON = wakePayload.wakeReason; + if (wakePayload.wakeCommentId) paperclipEnv.PAPERCLIP_WAKE_COMMENT_ID = wakePayload.wakeCommentId; + if (wakePayload.approvalId) paperclipEnv.PAPERCLIP_APPROVAL_ID = wakePayload.approvalId; + if (wakePayload.approvalStatus) paperclipEnv.PAPERCLIP_APPROVAL_STATUS = wakePayload.approvalStatus; + if (wakePayload.issueIds.length > 0) { + paperclipEnv.PAPERCLIP_LINKED_ISSUE_IDS = wakePayload.issueIds.join(","); + } - const wakeTextBody = { - text: buildWakeText(wakePayload), - mode: "now", - sessionKey, - paperclip: { - ...wakePayload, + const wakeText = buildWakeText(wakePayload, paperclipEnv); + const payloadText = templateText ? `${templateText}\n\n${wakeText}` : wakeText; + const isOpenResponses = isOpenResponsesEndpoint(url); + + const paperclipBody: Record = isOpenResponses + ? { + ...payloadTemplate, + stream: true, + model: + nonEmpty(payloadTemplate.model) ?? + nonEmpty(config.model) ?? + "openclaw", + input: Object.prototype.hasOwnProperty.call(payloadTemplate, "input") + ? payloadTemplate.input + : payloadText, + metadata: { + ...toStringRecord(payloadTemplate.metadata), + ...paperclipEnv, + paperclip_session_key: sessionKey, + }, + } + : { + ...payloadTemplate, + stream: true, sessionKey, - streamTransport: transport, - context, - }, - }; + text: payloadText, + paperclip: { + ...wakePayload, + sessionKey, + streamTransport: "sse", + env: paperclipEnv, + context, + }, + }; + + if (isOpenResponses) { + delete paperclipBody.text; + delete paperclipBody.sessionKey; + delete paperclipBody.paperclip; + if (!headers["x-openclaw-session-key"] && !headers["X-OpenClaw-Session-Key"]) { + headers["x-openclaw-session-key"] = sessionKey; + } + } if (onMeta) { await onMeta({ adapterType: "openclaw", - command: transport === "sse" ? "sse" : "webhook", + command: "sse", commandArgs: [method, url], context, }); } - await onLog("stdout", `[openclaw] invoking ${method} ${url} (transport=${transport})\n`); + await onLog("stdout", `[openclaw] invoking ${method} ${url} (transport=sse)\n`); const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), timeoutSec * 1000); + const timeout = timeoutSec > 0 ? setTimeout(() => controller.abort(), timeoutSec * 1000) : null; try { - const preferWakeTextPayload = shouldUseWakeTextPayload(url); - - if (transport === "sse") { - if (preferWakeTextPayload) { - await onLog( - "stdout", - "[openclaw] /hooks/wake compatibility endpoint does not stream SSE; falling back to wake text payload\n", - ); - const retry = await sendWebhookRequest({ - url, - method, - headers, - payload: wakeTextBody, - onLog, - signal: controller.signal, - }); - - if (retry.response.ok) { - return { - exitCode: 0, - signal: null, - timedOut: false, - provider: "openclaw", - model: null, - summary: `OpenClaw webhook ${method} ${url} (wake compatibility fallback)`, - resultJson: { - status: retry.response.status, - statusText: retry.response.statusText, - compatibilityMode: "wake_text", - transportFallback: "webhook", - response: parseOpenClawResponse(retry.responseText) ?? retry.responseText, - }, - }; - } - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: `OpenClaw webhook failed with status ${retry.response.status}`, - errorCode: "openclaw_http_error", - resultJson: { - status: retry.response.status, - statusText: retry.response.statusText, - compatibilityMode: "wake_text", - transportFallback: "webhook", - response: parseOpenClawResponse(retry.responseText) ?? retry.responseText, - }, - }; - } - - const sseHeaders = { + const response = await sendJsonRequest({ + url, + method, + headers: { ...headers, accept: "text/event-stream", - }; - - const response = await sendJsonRequest({ - url, - method, - headers: sseHeaders, - payload: paperclipBody, - signal: controller.signal, - }); - - if (!response.ok) { - const responseText = await readAndLogResponseText({ response, onLog }); - if (isTextRequiredResponse(responseText)) { - await onLog( - "stdout", - "[openclaw] SSE endpoint reported text-required; falling back to wake compatibility payload\n", - ); - const retry = await sendWebhookRequest({ - url, - method, - headers, - payload: wakeTextBody, - onLog, - signal: controller.signal, - }); - if (retry.response.ok) { - return { - exitCode: 0, - signal: null, - timedOut: false, - provider: "openclaw", - model: null, - summary: `OpenClaw webhook ${method} ${url} (wake compatibility fallback)`, - resultJson: { - status: retry.response.status, - statusText: retry.response.statusText, - compatibilityMode: "wake_text", - transportFallback: "webhook", - response: parseOpenClawResponse(retry.responseText) ?? retry.responseText, - }, - }; - } - } - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: `OpenClaw SSE request failed with status ${response.status}`, - errorCode: "openclaw_http_error", - resultJson: { - status: response.status, - statusText: response.statusText, - response: parseOpenClawResponse(responseText) ?? responseText, - }, - }; - } - - const contentType = (response.headers.get("content-type") ?? "").toLowerCase(); - if (!contentType.includes("text/event-stream")) { - const responseText = await readAndLogResponseText({ response, onLog }); - const parsedResponse = parseOpenClawResponse(responseText); - if (isTextRequiredResponse(responseText)) { - await onLog( - "stdout", - "[openclaw] non-SSE response indicated text-required; falling back to wake compatibility payload\n", - ); - const retry = await sendWebhookRequest({ - url, - method, - headers, - payload: wakeTextBody, - onLog, - signal: controller.signal, - }); - if (retry.response.ok) { - return { - exitCode: 0, - signal: null, - timedOut: false, - provider: "openclaw", - model: null, - summary: `OpenClaw webhook ${method} ${url} (wake compatibility fallback)`, - resultJson: { - status: retry.response.status, - statusText: retry.response.statusText, - compatibilityMode: "wake_text", - transportFallback: "webhook", - response: parseOpenClawResponse(retry.responseText) ?? retry.responseText, - }, - }; - } - } - if (isWebhookAcceptedResponse(parsedResponse)) { - await onLog( - "stdout", - "[openclaw] non-SSE response acknowledged run; treating as webhook compatibility success\n", - ); - return { - exitCode: 0, - signal: null, - timedOut: false, - provider: "openclaw", - model: null, - summary: `OpenClaw webhook ${method} ${url} (non-stream compatibility)`, - resultJson: { - status: response.status, - statusText: response.statusText, - contentType, - compatibilityMode: "json_ack", - transportFallback: "webhook", - response: parsedResponse ?? responseText, - }, - }; - } - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: "OpenClaw SSE endpoint did not return text/event-stream", - errorCode: "openclaw_sse_expected_event_stream", - resultJson: { - status: response.status, - statusText: response.statusText, - contentType, - response: parsedResponse ?? responseText, - }, - }; - } - - const consumed = await consumeSseResponse({ response, onLog }); - if (consumed.failed) { - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed", - errorCode: "openclaw_sse_stream_failed", - resultJson: { - eventCount: consumed.eventCount, - terminal: consumed.terminal, - lastEventType: consumed.lastEventType, - lastData: consumed.lastData, - response: consumed.lastPayload ?? consumed.lastData, - }, - }; - } + }, + payload: paperclipBody, + signal: controller.signal, + }); + if (!response.ok) { + const responseText = await readAndLogResponseText({ response, onLog }); return { - exitCode: 0, + exitCode: 1, signal: null, timedOut: false, - provider: "openclaw", - model: null, - summary: `OpenClaw SSE ${method} ${url}`, + errorMessage: + isTextRequiredResponse(responseText) + ? "OpenClaw endpoint rejected the payload as text-required." + : `OpenClaw SSE request failed with status ${response.status}`, + errorCode: isTextRequiredResponse(responseText) + ? "openclaw_text_required" + : "openclaw_http_error", + resultJson: { + status: response.status, + statusText: response.statusText, + response: parseOpenClawResponse(responseText) ?? responseText, + }, + }; + } + + const contentType = (response.headers.get("content-type") ?? "").toLowerCase(); + if (!contentType.includes("text/event-stream")) { + const responseText = await readAndLogResponseText({ response, onLog }); + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: "OpenClaw SSE endpoint did not return text/event-stream", + errorCode: "openclaw_sse_expected_event_stream", + resultJson: { + status: response.status, + statusText: response.statusText, + contentType, + response: parseOpenClawResponse(responseText) ?? responseText, + }, + }; + } + + const consumed = await consumeSseResponse({ response, onLog }); + if (consumed.failed) { + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed", + errorCode: "openclaw_sse_stream_failed", resultJson: { eventCount: consumed.eventCount, terminal: consumed.terminal, @@ -687,81 +602,19 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 + ? `[openclaw] SSE request timed out after ${timeoutSec}s\n` + : "[openclaw] SSE request aborted\n"; + await onLog("stderr", timeoutMessage); return { exitCode: null, signal: null, timedOut: true, - errorMessage: `Timed out after ${timeoutSec}s`, - errorCode: "timeout", + errorMessage: timeoutSec > 0 ? `Timed out after ${timeoutSec}s` : "Request aborted", + errorCode: "openclaw_sse_timeout", }; } @@ -801,6 +660,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise = {}; if (v.url) ac.url = v.url; ac.method = "POST"; - ac.timeoutSec = 30; + ac.timeoutSec = 0; ac.streamTransport = "sse"; ac.sessionKeyStrategy = "fixed"; ac.sessionKey = "paperclip"; diff --git a/scripts/smoke/openclaw-sse-standalone.sh b/scripts/smoke/openclaw-sse-standalone.sh new file mode 100755 index 00000000..65b42ae7 --- /dev/null +++ b/scripts/smoke/openclaw-sse-standalone.sh @@ -0,0 +1,146 @@ +#!/usr/bin/env bash +set -euo pipefail + +log() { + echo "[openclaw-sse-standalone] $*" +} + +fail() { + echo "[openclaw-sse-standalone] ERROR: $*" >&2 + exit 1 +} + +require_cmd() { + local cmd="$1" + command -v "$cmd" >/dev/null 2>&1 || fail "missing required command: $cmd" +} + +require_cmd curl +require_cmd jq +require_cmd grep + +OPENCLAW_URL="${OPENCLAW_URL:-}" +OPENCLAW_METHOD="${OPENCLAW_METHOD:-POST}" +OPENCLAW_AUTH_HEADER="${OPENCLAW_AUTH_HEADER:-}" +OPENCLAW_TIMEOUT_SEC="${OPENCLAW_TIMEOUT_SEC:-180}" +OPENCLAW_MODEL="${OPENCLAW_MODEL:-openclaw}" +OPENCLAW_USER="${OPENCLAW_USER:-paperclip-smoke}" + +PAPERCLIP_RUN_ID="${PAPERCLIP_RUN_ID:-smoke-run-$(date +%s)}" +PAPERCLIP_AGENT_ID="${PAPERCLIP_AGENT_ID:-openclaw-smoke-agent}" +PAPERCLIP_COMPANY_ID="${PAPERCLIP_COMPANY_ID:-openclaw-smoke-company}" +PAPERCLIP_API_URL="${PAPERCLIP_API_URL:-http://localhost:3100}" +PAPERCLIP_TASK_ID="${PAPERCLIP_TASK_ID:-openclaw-smoke-task}" +PAPERCLIP_WAKE_REASON="${PAPERCLIP_WAKE_REASON:-openclaw_smoke_test}" +PAPERCLIP_WAKE_COMMENT_ID="${PAPERCLIP_WAKE_COMMENT_ID:-}" +PAPERCLIP_APPROVAL_ID="${PAPERCLIP_APPROVAL_ID:-}" +PAPERCLIP_APPROVAL_STATUS="${PAPERCLIP_APPROVAL_STATUS:-}" +PAPERCLIP_LINKED_ISSUE_IDS="${PAPERCLIP_LINKED_ISSUE_IDS:-}" +OPENCLAW_TEXT_PREFIX="${OPENCLAW_TEXT_PREFIX:-Standalone OpenClaw SSE smoke test.}" + +[[ -n "$OPENCLAW_URL" ]] || fail "OPENCLAW_URL is required" + +read -r -d '' TEXT_BODY <&2 || true + fail "non-success HTTP status: ${http_code}" +fi + +if ! grep -Eqi '^content-type:.*text/event-stream' "$headers_file"; then + tail -n 40 "$body_file" >&2 || true + fail "response content-type was not text/event-stream" +fi + +if grep -Eqi 'event:\s*(error|failed|cancel)|"status":"(failed|cancelled|error)"|"type":"[^"]*(failed|cancelled|error)"' "$body_file"; then + tail -n 120 "$body_file" >&2 || true + fail "stream reported a failure event" +fi + +if ! grep -Eqi 'event:\s*(done|completed|response\.completed)|\[DONE\]|"status":"(completed|succeeded|done)"|"type":"response\.completed"' "$body_file"; then + tail -n 120 "$body_file" >&2 || true + fail "stream ended without a terminal completion marker" +fi + +event_count="$(grep -Ec '^event:' "$body_file" || true)" +log "stream completed successfully (events=${event_count})" +echo +tail -n 40 "$body_file" diff --git a/server/src/__tests__/issues-checkout-wakeup.test.ts b/server/src/__tests__/issues-checkout-wakeup.test.ts new file mode 100644 index 00000000..12b23870 --- /dev/null +++ b/server/src/__tests__/issues-checkout-wakeup.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it } from "vitest"; +import { shouldWakeAssigneeOnCheckout } from "../routes/issues-checkout-wakeup.js"; + +describe("shouldWakeAssigneeOnCheckout", () => { + it("keeps wakeup behavior for board actors", () => { + expect( + shouldWakeAssigneeOnCheckout({ + actorType: "board", + actorAgentId: null, + checkoutAgentId: "agent-1", + checkoutRunId: null, + }), + ).toBe(true); + }); + + it("skips wakeup for agent self-checkout in an active run", () => { + expect( + shouldWakeAssigneeOnCheckout({ + actorType: "agent", + actorAgentId: "agent-1", + checkoutAgentId: "agent-1", + checkoutRunId: "run-1", + }), + ).toBe(false); + }); + + it("still wakes when checkout run id is missing", () => { + expect( + shouldWakeAssigneeOnCheckout({ + actorType: "agent", + actorAgentId: "agent-1", + checkoutAgentId: "agent-1", + checkoutRunId: null, + }), + ).toBe(true); + }); + + it("still wakes when agent checks out on behalf of another agent id", () => { + expect( + shouldWakeAssigneeOnCheckout({ + actorType: "agent", + actorAgentId: "agent-1", + checkoutAgentId: "agent-2", + checkoutRunId: "run-1", + }), + ).toBe(true); + }); +}); diff --git a/server/src/__tests__/openclaw-adapter.test.ts b/server/src/__tests__/openclaw-adapter.test.ts index 75835f3a..b11b1ffb 100644 --- a/server/src/__tests__/openclaw-adapter.test.ts +++ b/server/src/__tests__/openclaw-adapter.test.ts @@ -58,28 +58,21 @@ afterEach(() => { }); describe("openclaw adapter execute", () => { - it("uses SSE by default and streams into one run", async () => { + it("uses strict SSE and includes canonical PAPERCLIP context in text payload", async () => { const fetchMock = vi.fn().mockResolvedValue( sseResponse([ - 'event: response.delta\n', - 'data: {"type":"response.delta","delta":"hi"}\n\n', - 'event: response.completed\n', + "event: response.completed\n", 'data: {"type":"response.completed","status":"completed"}\n\n', ]), ); vi.stubGlobal("fetch", fetchMock); - const onLog = vi.fn().mockResolvedValue(undefined); - const result = await execute( - buildContext( - { - url: "https://agent.example/gateway", - method: "POST", - payloadTemplate: { foo: "bar" }, - }, - { onLog }, - ), + buildContext({ + url: "https://agent.example/sse", + method: "POST", + payloadTemplate: { foo: "bar", text: "OpenClaw task prompt" }, + }), ); expect(result.exitCode).toBe(0); @@ -88,24 +81,34 @@ describe("openclaw adapter execute", () => { expect(body.foo).toBe("bar"); expect(body.stream).toBe(true); expect(body.sessionKey).toBe("paperclip"); + expect((body.paperclip as Record).streamTransport).toBe("sse"); expect((body.paperclip as Record).runId).toBe("run-123"); expect((body.paperclip as Record).sessionKey).toBe("paperclip"); - expect((body.paperclip as Record).streamTransport).toBe("sse"); - expect(onLog).toHaveBeenCalled(); + expect( + ((body.paperclip as Record).env as Record).PAPERCLIP_RUN_ID, + ).toBe("run-123"); + const text = String(body.text ?? ""); + expect(text).toContain("OpenClaw task prompt"); + expect(text).toContain("PAPERCLIP_RUN_ID=run-123"); + expect(text).toContain("PAPERCLIP_AGENT_ID=agent-123"); + expect(text).toContain("PAPERCLIP_COMPANY_ID=company-123"); + expect(text).toContain("PAPERCLIP_TASK_ID=task-123"); + expect(text).toContain("PAPERCLIP_WAKE_REASON=issue_assigned"); + expect(text).toContain("PAPERCLIP_LINKED_ISSUE_IDS=issue-123"); }); it("derives issue session keys when configured", async () => { const fetchMock = vi.fn().mockResolvedValue( sseResponse([ - 'event: done\n', - 'data: [DONE]\n\n', + "event: done\n", + "data: [DONE]\n\n", ]), ); vi.stubGlobal("fetch", fetchMock); const result = await execute( buildContext({ - url: "https://agent.example/gateway", + url: "https://agent.example/sse", method: "POST", sessionKeyStrategy: "issue", }), @@ -117,7 +120,43 @@ describe("openclaw adapter execute", () => { expect((body.paperclip as Record).sessionKey).toBe("paperclip:issue:issue-123"); }); - it("fails when SSE endpoint does not return text/event-stream and no compatibility fallback applies", async () => { + it("maps requests to OpenResponses schema for /v1/responses endpoints", async () => { + const fetchMock = vi.fn().mockResolvedValue( + sseResponse([ + "event: response.completed\n", + 'data: {"type":"response.completed","status":"completed"}\n\n', + ]), + ); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/v1/responses", + method: "POST", + payloadTemplate: { + model: "openclaw", + user: "paperclip", + }, + }), + ); + + expect(result.exitCode).toBe(0); + const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; + expect(body.stream).toBe(true); + expect(body.model).toBe("openclaw"); + expect(typeof body.input).toBe("string"); + expect(String(body.input)).toContain("PAPERCLIP_RUN_ID=run-123"); + expect(body.metadata).toBeTypeOf("object"); + expect((body.metadata as Record).PAPERCLIP_RUN_ID).toBe("run-123"); + expect(body.text).toBeUndefined(); + expect(body.paperclip).toBeUndefined(); + expect(body.sessionKey).toBeUndefined(); + + const headers = (fetchMock.mock.calls[0]?.[1]?.headers ?? {}) as Record; + expect(headers["x-openclaw-session-key"]).toBe("paperclip"); + }); + + it("fails when SSE endpoint does not return text/event-stream", async () => { const fetchMock = vi.fn().mockResolvedValue( new Response(JSON.stringify({ ok: false, error: "unexpected payload" }), { status: 200, @@ -131,7 +170,7 @@ describe("openclaw adapter execute", () => { const result = await execute( buildContext({ - url: "https://agent.example/gateway", + url: "https://agent.example/sse", method: "POST", }), ); @@ -140,11 +179,30 @@ describe("openclaw adapter execute", () => { expect(result.errorCode).toBe("openclaw_sse_expected_event_stream"); }); - it("treats webhook-style JSON ack as compatibility success when SSE endpoint returns JSON", async () => { + it("fails when SSE stream closes without a terminal event", async () => { const fetchMock = vi.fn().mockResolvedValue( - new Response(JSON.stringify({ ok: true, runId: "oc-run-1" }), { - status: 200, - statusText: "OK", + sseResponse([ + "event: response.delta\n", + 'data: {"type":"response.delta","delta":"partial"}\n\n', + ]), + ); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/sse", + }), + ); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).toBe("openclaw_sse_stream_incomplete"); + }); + + it("fails with explicit text-required error when endpoint rejects payload", async () => { + const fetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ error: "text required" }), { + status: 400, + statusText: "Bad Request", headers: { "content-type": "application/json", }, @@ -154,102 +212,48 @@ describe("openclaw adapter execute", () => { const result = await execute( buildContext({ - url: "https://agent.example/hooks/paperclip", - method: "POST", + url: "https://agent.example/sse", }), ); - expect(result.exitCode).toBe(0); - expect(result.resultJson?.compatibilityMode).toBe("json_ack"); - expect(result.resultJson?.transportFallback).toBe("webhook"); + expect(result.exitCode).toBe(1); + expect(result.errorCode).toBe("openclaw_text_required"); }); - it("falls back to wake text payload when SSE is configured against /hooks/wake", async () => { - const fetchMock = vi.fn().mockResolvedValue( - new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }), + it("rejects non-sse transport configuration", async () => { + const fetchMock = vi.fn(); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/sse", + streamTransport: "webhook", + }), ); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).toBe("openclaw_stream_transport_unsupported"); + expect(fetchMock).not.toHaveBeenCalled(); + }); + + it("rejects /hooks/wake compatibility endpoints in strict SSE mode", async () => { + const fetchMock = vi.fn(); vi.stubGlobal("fetch", fetchMock); const result = await execute( buildContext({ url: "https://agent.example/hooks/wake", - method: "POST", }), ); - expect(result.exitCode).toBe(0); - expect(fetchMock).toHaveBeenCalledTimes(1); - const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; - expect(body.mode).toBe("now"); - expect(typeof body.text).toBe("string"); - expect(body.paperclip).toBeTypeOf("object"); - expect((body.paperclip as Record).runId).toBe("run-123"); - expect(result.resultJson?.compatibilityMode).toBe("wake_text"); - expect(result.resultJson?.transportFallback).toBe("webhook"); - }); - - it("uses wake text payload for /hooks/wake endpoints in webhook mode", async () => { - const fetchMock = vi.fn().mockResolvedValue( - new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }), - ); - vi.stubGlobal("fetch", fetchMock); - - const result = await execute( - buildContext({ - url: "https://agent.example/hooks/wake", - method: "POST", - streamTransport: "webhook", - }), - ); - - expect(result.exitCode).toBe(0); - expect(fetchMock).toHaveBeenCalledTimes(1); - const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; - expect(body.mode).toBe("now"); - expect(typeof body.text).toBe("string"); - expect(body.paperclip).toBeTypeOf("object"); - expect((body.paperclip as Record).runId).toBe("run-123"); - }); - - it("retries with wake text payload when endpoint reports text required in webhook mode", async () => { - const fetchMock = vi - .fn() - .mockResolvedValueOnce( - new Response(JSON.stringify({ ok: false, error: "text required" }), { - status: 400, - statusText: "Bad Request", - }), - ) - .mockResolvedValueOnce( - new Response(JSON.stringify({ ok: true }), { status: 200, statusText: "OK" }), - ); - vi.stubGlobal("fetch", fetchMock); - - const result = await execute( - buildContext({ - url: "https://agent.example/hooks/paperclip", - method: "POST", - streamTransport: "webhook", - }), - ); - - expect(result.exitCode).toBe(0); - expect(fetchMock).toHaveBeenCalledTimes(2); - - const firstBody = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; - expect(firstBody.paperclip).toBeTypeOf("object"); - - const secondBody = JSON.parse(String(fetchMock.mock.calls[1]?.[1]?.body ?? "{}")) as Record; - expect(secondBody.mode).toBe("now"); - expect(typeof secondBody.text).toBe("string"); - expect(secondBody.paperclip).toBeTypeOf("object"); - expect((secondBody.paperclip as Record).runId).toBe("run-123"); - expect(result.resultJson?.compatibilityMode).toBe("wake_text"); + expect(result.exitCode).toBe(1); + expect(result.errorCode).toBe("openclaw_sse_incompatible_endpoint"); + expect(fetchMock).not.toHaveBeenCalled(); }); }); describe("openclaw adapter environment checks", () => { - it("reports compatibility mode info for /hooks/wake endpoints", async () => { + it("reports /hooks/wake endpoints as incompatible for strict SSE mode", async () => { const fetchMock = vi .fn() .mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" })); @@ -269,9 +273,26 @@ describe("openclaw adapter environment checks", () => { }, }); - const compatibilityCheck = result.checks.find( - (check) => check.code === "openclaw_wake_endpoint_compat_mode", - ); - expect(compatibilityCheck?.level).toBe("info"); + const check = result.checks.find((entry) => entry.code === "openclaw_wake_endpoint_incompatible"); + expect(check?.level).toBe("error"); + }); + + it("reports unsupported streamTransport settings", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" })); + vi.stubGlobal("fetch", fetchMock); + + const result = await testEnvironment({ + companyId: "company-123", + adapterType: "openclaw", + config: { + url: "https://agent.example/sse", + streamTransport: "webhook", + }, + }); + + const check = result.checks.find((entry) => entry.code === "openclaw_stream_transport_unsupported"); + expect(check?.level).toBe("error"); }); }); diff --git a/server/src/routes/access.ts b/server/src/routes/access.ts index 610f952c..e310c9a9 100644 --- a/server/src/routes/access.ts +++ b/server/src/routes/access.ts @@ -104,6 +104,11 @@ function isLoopbackHost(hostname: string): boolean { return value === "localhost" || value === "127.0.0.1" || value === "::1"; } +function isWakePath(pathname: string): boolean { + const value = pathname.trim().toLowerCase(); + return value === "/hooks/wake" || value.endsWith("/hooks/wake"); +} + function normalizeHostname(value: string | null | undefined): string | null { if (!value) return null; const trimmed = value.trim(); @@ -217,13 +222,13 @@ function normalizeAgentDefaultsForJoin(input: { code: "openclaw_callback_config_missing", level: "warn", message: "No OpenClaw callback config was provided in agentDefaultsPayload.", - hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw webhook immediately after approval.", + hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw SSE endpoint immediately after approval.", }); return { normalized: null as Record | null, diagnostics }; } const defaults = input.defaultsPayload as Record; - const normalized: Record = {}; + const normalized: Record = { streamTransport: "sse" }; let callbackUrl: URL | null = null; const rawUrl = typeof defaults.url === "string" ? defaults.url.trim() : ""; @@ -232,7 +237,7 @@ function normalizeAgentDefaultsForJoin(input: { code: "openclaw_callback_url_missing", level: "warn", message: "OpenClaw callback URL is missing.", - hint: "Set agentDefaultsPayload.url to your OpenClaw webhook endpoint.", + hint: "Set agentDefaultsPayload.url to your OpenClaw SSE endpoint.", }); } else { try { @@ -252,6 +257,14 @@ function normalizeAgentDefaultsForJoin(input: { message: `Callback endpoint set to ${callbackUrl.toString()}`, }); } + if (isWakePath(callbackUrl.pathname)) { + diagnostics.push({ + code: "openclaw_callback_wake_path_incompatible", + level: "warn", + message: "Configured callback path targets /hooks/wake, which is not stream-capable for strict SSE mode.", + hint: "Use an endpoint that returns text/event-stream for the full run duration.", + }); + } if (isLoopbackHost(callbackUrl.hostname)) { diagnostics.push({ code: "openclaw_callback_loopback", @@ -273,7 +286,7 @@ function normalizeAgentDefaultsForJoin(input: { normalized.method = rawMethod || "POST"; if (typeof defaults.timeoutSec === "number" && Number.isFinite(defaults.timeoutSec)) { - normalized.timeoutSec = Math.max(1, Math.min(120, Math.floor(defaults.timeoutSec))); + normalized.timeoutSec = Math.max(0, Math.min(7200, Math.floor(defaults.timeoutSec))); } const headers = normalizeHeaderMap(defaults.headers); @@ -470,10 +483,10 @@ function buildInviteOnboardingManifest( requiredFields: { requestType: "agent", agentName: "Display name for this agent", - adapterType: "Use 'openclaw' for OpenClaw webhook-based agents", + adapterType: "Use 'openclaw' for OpenClaw streaming agents", capabilities: "Optional capability summary", agentDefaultsPayload: - "Optional adapter config such as url/method/headers/webhookAuthHeader for OpenClaw callback endpoint", + "Optional adapter config such as url/method/headers/webhookAuthHeader for OpenClaw SSE endpoint", }, registrationEndpoint: { method: "POST", @@ -498,7 +511,7 @@ function buildInviteOnboardingManifest( path: testResolutionPath, url: testResolutionUrl, query: { - url: "https://your-openclaw-webhook.example/webhook", + url: "https://your-openclaw-agent.example/v1/responses", timeoutMs: 5000, }, }, @@ -579,10 +592,11 @@ export function buildInviteOnboardingTextDocument( ' "adapterType": "openclaw",', ' "capabilities": "Optional summary",', ' "agentDefaultsPayload": {', - ' "url": "https://your-openclaw-webhook.example/webhook",', + ' "url": "https://your-openclaw-agent.example/v1/responses",', + ' "streamTransport": "sse",', ' "method": "POST",', ' "headers": { "x-openclaw-auth": "replace-me" },', - ' "timeoutSec": 30', + ' "timeoutSec": 0', " }", "}", "", @@ -622,9 +636,9 @@ export function buildInviteOnboardingTextDocument( lines.push( "", "## Optional: test callback resolution from Paperclip", - `${onboarding.connectivity.testResolutionEndpoint.method ?? "GET"} ${onboarding.connectivity.testResolutionEndpoint.url}?url=https%3A%2F%2Fyour-openclaw-webhook.example%2Fwebhook`, + `${onboarding.connectivity.testResolutionEndpoint.method ?? "GET"} ${onboarding.connectivity.testResolutionEndpoint.url}?url=https%3A%2F%2Fyour-openclaw-agent.example%2Fv1%2Fresponses`, "", - "This endpoint checks whether Paperclip can reach your webhook URL and reports reachable, timeout, or unreachable.", + "This endpoint checks whether Paperclip can reach your OpenClaw endpoint and reports reachable, timeout, or unreachable.", ); } diff --git a/server/src/routes/issues-checkout-wakeup.ts b/server/src/routes/issues-checkout-wakeup.ts new file mode 100644 index 00000000..287b7bbf --- /dev/null +++ b/server/src/routes/issues-checkout-wakeup.ts @@ -0,0 +1,14 @@ +type CheckoutWakeInput = { + actorType: "board" | "agent" | "none"; + actorAgentId: string | null; + checkoutAgentId: string; + checkoutRunId: string | null; +}; + +export function shouldWakeAssigneeOnCheckout(input: CheckoutWakeInput): boolean { + if (input.actorType !== "agent") return true; + if (!input.actorAgentId) return true; + if (input.actorAgentId !== input.checkoutAgentId) return true; + if (!input.checkoutRunId) return true; + return false; +} diff --git a/server/src/routes/issues.ts b/server/src/routes/issues.ts index 824493f4..779ac34c 100644 --- a/server/src/routes/issues.ts +++ b/server/src/routes/issues.ts @@ -25,6 +25,7 @@ import { import { logger } from "../middleware/logger.js"; import { forbidden, HttpError, unauthorized } from "../errors.js"; import { assertCompanyAccess, getActorInfo } from "./authz.js"; +import { shouldWakeAssigneeOnCheckout } from "./issues-checkout-wakeup.js"; const MAX_ATTACHMENT_BYTES = Number(process.env.PAPERCLIP_ATTACHMENT_MAX_BYTES) || 10 * 1024 * 1024; const ALLOWED_ATTACHMENT_CONTENT_TYPES = new Set([ @@ -634,17 +635,26 @@ export function issueRoutes(db: Db, storage: StorageService) { details: { agentId: req.body.agentId }, }); - void heartbeat - .wakeup(req.body.agentId, { - source: "assignment", - triggerDetail: "system", - reason: "issue_checked_out", - payload: { issueId: issue.id, mutation: "checkout" }, - requestedByActorType: actor.actorType, - requestedByActorId: actor.actorId, - contextSnapshot: { issueId: issue.id, source: "issue.checkout" }, + if ( + shouldWakeAssigneeOnCheckout({ + actorType: req.actor.type, + actorAgentId: req.actor.type === "agent" ? req.actor.agentId ?? null : null, + checkoutAgentId: req.body.agentId, + checkoutRunId, }) - .catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue checkout")); + ) { + void heartbeat + .wakeup(req.body.agentId, { + source: "assignment", + triggerDetail: "system", + reason: "issue_checked_out", + payload: { issueId: issue.id, mutation: "checkout" }, + requestedByActorType: actor.actorType, + requestedByActorId: actor.actorId, + contextSnapshot: { issueId: issue.id, source: "issue.checkout" }, + }) + .catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue checkout")); + } res.json(updated); });