diff --git a/packages/adapters/openclaw/src/index.ts b/packages/adapters/openclaw/src/index.ts index 61da17d6..4ae9cdba 100644 --- a/packages/adapters/openclaw/src/index.ts +++ b/packages/adapters/openclaw/src/index.ts @@ -9,15 +9,17 @@ Adapter: openclaw Use when: - You run an OpenClaw agent remotely and wake it over HTTP. -- You want SSE-first execution so one Paperclip run captures live progress and completion. +- You want selectable transport: + - \`sse\` for streaming execution in one Paperclip run. + - \`webhook\` for wake-style callbacks (including /hooks/wake compatibility). Don't use when: - You need local CLI execution inside Paperclip (use claude_local/codex_local/opencode_local/process). - The OpenClaw endpoint is not reachable from the Paperclip server. Core fields: -- url (string, required): OpenClaw SSE endpoint URL -- streamTransport (string, optional): must be \`sse\` when provided +- url (string, required): OpenClaw endpoint URL +- streamTransport (string, optional): \`sse\` (default) or \`webhook\` - method (string, optional): HTTP method, default POST - headers (object, optional): extra HTTP headers for requests - webhookAuthHeader (string, optional): Authorization header value if your endpoint requires auth diff --git a/packages/adapters/openclaw/src/server/execute-common.ts b/packages/adapters/openclaw/src/server/execute-common.ts new file mode 100644 index 00000000..f56a880c --- /dev/null +++ b/packages/adapters/openclaw/src/server/execute-common.ts @@ -0,0 +1,394 @@ +import type { AdapterExecutionContext } from "@paperclipai/adapter-utils"; +import { asNumber, asString, buildPaperclipEnv, parseObject } from "@paperclipai/adapter-utils/server-utils"; +import { createHash } from "node:crypto"; +import { parseOpenClawResponse } from "./parse.js"; + +export type OpenClawTransport = "sse" | "webhook"; +export type SessionKeyStrategy = "fixed" | "issue" | "run"; + +export type WakePayload = { + runId: string; + agentId: string; + companyId: string; + taskId: string | null; + issueId: string | null; + wakeReason: string | null; + wakeCommentId: string | null; + approvalId: string | null; + approvalStatus: string | null; + issueIds: string[]; +}; + +export type OpenClawExecutionState = { + method: string; + timeoutSec: number; + headers: Record; + payloadTemplate: Record; + wakePayload: WakePayload; + sessionKey: string; + paperclipEnv: Record; + wakeText: string; +}; + +const SENSITIVE_LOG_KEY_PATTERN = + /(^|[_-])(auth|authorization|token|secret|password|api[_-]?key|private[_-]?key)([_-]|$)|^x-openclaw-auth$/i; + +export function nonEmpty(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; +} + +export function toAuthorizationHeaderValue(rawToken: string): string { + const trimmed = rawToken.trim(); + if (!trimmed) return trimmed; + return /^bearer\s+/i.test(trimmed) ? trimmed : `Bearer ${trimmed}`; +} + +export function resolvePaperclipApiUrlOverride(value: unknown): string | null { + const raw = nonEmpty(value); + if (!raw) return null; + try { + const parsed = new URL(raw); + if (parsed.protocol !== "http:" && parsed.protocol !== "https:") return null; + return parsed.toString(); + } catch { + return null; + } +} + +export function normalizeSessionKeyStrategy(value: unknown): SessionKeyStrategy { + const normalized = asString(value, "fixed").trim().toLowerCase(); + if (normalized === "issue" || normalized === "run") return normalized; + return "fixed"; +} + +export function resolveSessionKey(input: { + strategy: SessionKeyStrategy; + configuredSessionKey: string | null; + runId: string; + issueId: string | null; +}): string { + const fallback = input.configuredSessionKey ?? "paperclip"; + if (input.strategy === "run") return `paperclip:run:${input.runId}`; + if (input.strategy === "issue" && input.issueId) return `paperclip:issue:${input.issueId}`; + return fallback; +} + +export function isWakeCompatibilityEndpoint(url: string): boolean { + try { + const parsed = new URL(url); + const path = parsed.pathname.toLowerCase(); + return path === "/hooks/wake" || path.endsWith("/hooks/wake"); + } catch { + return false; + } +} + +export function isOpenResponsesEndpoint(url: string): boolean { + try { + const parsed = new URL(url); + const path = parsed.pathname.toLowerCase(); + return path === "/v1/responses" || path.endsWith("/v1/responses"); + } catch { + return false; + } +} + +export function toStringRecord(value: unknown): Record { + const parsed = parseObject(value); + const out: Record = {}; + for (const [key, entry] of Object.entries(parsed)) { + if (typeof entry === "string") { + out[key] = entry; + } + } + return out; +} + +function isSensitiveLogKey(key: string): boolean { + return SENSITIVE_LOG_KEY_PATTERN.test(key.trim()); +} + +function sha256Prefix(value: string): string { + return createHash("sha256").update(value).digest("hex").slice(0, 12); +} + +function redactSecretForLog(value: string): string { + return `[redacted len=${value.length} sha256=${sha256Prefix(value)}]`; +} + +function truncateForLog(value: string, maxChars = 320): string { + if (value.length <= maxChars) return value; + return `${value.slice(0, maxChars)}... [truncated ${value.length - maxChars} chars]`; +} + +export function redactForLog(value: unknown, keyPath: string[] = [], depth = 0): unknown { + const currentKey = keyPath[keyPath.length - 1] ?? ""; + if (typeof value === "string") { + if (isSensitiveLogKey(currentKey)) return redactSecretForLog(value); + return truncateForLog(value); + } + if (typeof value === "number" || typeof value === "boolean" || value == null) { + return value; + } + if (Array.isArray(value)) { + if (depth >= 6) return "[array-truncated]"; + const out = value.slice(0, 20).map((entry, index) => redactForLog(entry, [...keyPath, `${index}`], depth + 1)); + if (value.length > 20) out.push(`[+${value.length - 20} more items]`); + return out; + } + if (typeof value === "object") { + if (depth >= 6) return "[object-truncated]"; + const entries = Object.entries(value as Record); + const out: Record = {}; + for (const [key, entry] of entries.slice(0, 80)) { + out[key] = redactForLog(entry, [...keyPath, key], depth + 1); + } + if (entries.length > 80) { + out.__truncated__ = `+${entries.length - 80} keys`; + } + return out; + } + return String(value); +} + +export function stringifyForLog(value: unknown, maxChars: number): string { + const text = JSON.stringify(value); + if (text.length <= maxChars) return text; + return `${text.slice(0, maxChars)}... [truncated ${text.length - maxChars} chars]`; +} + +export function buildWakePayload(ctx: AdapterExecutionContext): WakePayload { + const { runId, agent, context } = ctx; + return { + runId, + agentId: agent.id, + companyId: agent.companyId, + taskId: nonEmpty(context.taskId) ?? nonEmpty(context.issueId), + issueId: nonEmpty(context.issueId), + wakeReason: nonEmpty(context.wakeReason), + wakeCommentId: nonEmpty(context.wakeCommentId) ?? nonEmpty(context.commentId), + approvalId: nonEmpty(context.approvalId), + approvalStatus: nonEmpty(context.approvalStatus), + issueIds: Array.isArray(context.issueIds) + ? context.issueIds.filter( + (value): value is string => typeof value === "string" && value.trim().length > 0, + ) + : [], + }; +} + +export function buildPaperclipEnvForWake(ctx: AdapterExecutionContext, wakePayload: WakePayload): Record { + const paperclipApiUrlOverride = resolvePaperclipApiUrlOverride(ctx.config.paperclipApiUrl); + const paperclipEnv: Record = { + ...buildPaperclipEnv(ctx.agent), + PAPERCLIP_RUN_ID: ctx.runId, + }; + + if (paperclipApiUrlOverride) { + paperclipEnv.PAPERCLIP_API_URL = paperclipApiUrlOverride; + } + if (wakePayload.taskId) paperclipEnv.PAPERCLIP_TASK_ID = wakePayload.taskId; + if (wakePayload.wakeReason) paperclipEnv.PAPERCLIP_WAKE_REASON = wakePayload.wakeReason; + if (wakePayload.wakeCommentId) paperclipEnv.PAPERCLIP_WAKE_COMMENT_ID = wakePayload.wakeCommentId; + if (wakePayload.approvalId) paperclipEnv.PAPERCLIP_APPROVAL_ID = wakePayload.approvalId; + if (wakePayload.approvalStatus) paperclipEnv.PAPERCLIP_APPROVAL_STATUS = wakePayload.approvalStatus; + if (wakePayload.issueIds.length > 0) { + paperclipEnv.PAPERCLIP_LINKED_ISSUE_IDS = wakePayload.issueIds.join(","); + } + + return paperclipEnv; +} + +export function buildWakeText(payload: WakePayload, paperclipEnv: Record): string { + const claimedApiKeyPath = "~/.openclaw/workspace/paperclip-claimed-api-key.json"; + const orderedKeys = [ + "PAPERCLIP_RUN_ID", + "PAPERCLIP_AGENT_ID", + "PAPERCLIP_COMPANY_ID", + "PAPERCLIP_API_URL", + "PAPERCLIP_TASK_ID", + "PAPERCLIP_WAKE_REASON", + "PAPERCLIP_WAKE_COMMENT_ID", + "PAPERCLIP_APPROVAL_ID", + "PAPERCLIP_APPROVAL_STATUS", + "PAPERCLIP_LINKED_ISSUE_IDS", + ]; + + const envLines: string[] = []; + for (const key of orderedKeys) { + const value = paperclipEnv[key]; + if (!value) continue; + envLines.push(`${key}=${value}`); + } + + const lines = [ + "Paperclip wake event for a cloud adapter.", + "", + "Set these values in your run context:", + ...envLines, + `PAPERCLIP_API_KEY=`, + "", + `Load PAPERCLIP_API_KEY from ${claimedApiKeyPath} (the token you saved after claim-api-key).`, + "", + `task_id=${payload.taskId ?? ""}`, + `issue_id=${payload.issueId ?? ""}`, + `wake_reason=${payload.wakeReason ?? ""}`, + `wake_comment_id=${payload.wakeCommentId ?? ""}`, + `approval_id=${payload.approvalId ?? ""}`, + `approval_status=${payload.approvalStatus ?? ""}`, + `linked_issue_ids=${payload.issueIds.join(",")}`, + ]; + + lines.push("", "Run your Paperclip heartbeat procedure now."); + return lines.join("\n"); +} + +export function appendWakeText(baseText: string, wakeText: string): string { + const trimmedBase = baseText.trim(); + return trimmedBase.length > 0 ? `${trimmedBase}\n\n${wakeText}` : wakeText; +} + +function buildOpenResponsesWakeInputMessage(wakeText: string): Record { + return { + type: "message", + role: "user", + content: [ + { + type: "input_text", + text: wakeText, + }, + ], + }; +} + +export function appendWakeTextToOpenResponsesInput(input: unknown, wakeText: string): unknown { + if (typeof input === "string") { + return appendWakeText(input, wakeText); + } + + if (Array.isArray(input)) { + return [...input, buildOpenResponsesWakeInputMessage(wakeText)]; + } + + if (typeof input === "object" && input !== null) { + const parsed = parseObject(input); + const content = parsed.content; + if (typeof content === "string") { + return { + ...parsed, + content: appendWakeText(content, wakeText), + }; + } + if (Array.isArray(content)) { + return { + ...parsed, + content: [ + ...content, + { + type: "input_text", + text: wakeText, + }, + ], + }; + } + return [parsed, buildOpenResponsesWakeInputMessage(wakeText)]; + } + + return wakeText; +} + +export function isTextRequiredResponse(responseText: string): boolean { + const parsed = parseOpenClawResponse(responseText); + const parsedError = parsed && typeof parsed.error === "string" ? parsed.error : null; + if (parsedError && parsedError.toLowerCase().includes("text required")) { + return true; + } + return responseText.toLowerCase().includes("text required"); +} + +export async function sendJsonRequest(params: { + url: string; + method: string; + headers: Record; + payload: Record; + signal: AbortSignal; +}): Promise { + return fetch(params.url, { + method: params.method, + headers: params.headers, + body: JSON.stringify(params.payload), + signal: params.signal, + }); +} + +export async function readAndLogResponseText(params: { + response: Response; + onLog: AdapterExecutionContext["onLog"]; +}): Promise { + const responseText = await params.response.text(); + if (responseText.trim().length > 0) { + await params.onLog( + "stdout", + `[openclaw] response (${params.response.status}) ${responseText.slice(0, 2000)}\n`, + ); + } else { + await params.onLog("stdout", `[openclaw] response (${params.response.status}) \n`); + } + return responseText; +} + +export function buildExecutionState(ctx: AdapterExecutionContext): OpenClawExecutionState { + const method = asString(ctx.config.method, "POST").trim().toUpperCase() || "POST"; + const timeoutSecRaw = asNumber(ctx.config.timeoutSec, 0); + const timeoutSec = timeoutSecRaw > 0 ? Math.max(1, Math.floor(timeoutSecRaw)) : 0; + const headersConfig = parseObject(ctx.config.headers) as Record; + const payloadTemplate = parseObject(ctx.config.payloadTemplate); + const webhookAuthHeader = nonEmpty(ctx.config.webhookAuthHeader); + const sessionKeyStrategy = normalizeSessionKeyStrategy(ctx.config.sessionKeyStrategy); + + const headers: Record = { + "content-type": "application/json", + }; + for (const [key, value] of Object.entries(headersConfig)) { + if (typeof value === "string" && value.trim().length > 0) { + headers[key] = value; + } + } + + const openClawAuthHeader = nonEmpty(headers["x-openclaw-auth"] ?? headers["X-OpenClaw-Auth"]); + if (openClawAuthHeader && !headers.authorization && !headers.Authorization) { + headers.authorization = toAuthorizationHeaderValue(openClawAuthHeader); + } + if (webhookAuthHeader && !headers.authorization && !headers.Authorization) { + headers.authorization = webhookAuthHeader; + } + + const wakePayload = buildWakePayload(ctx); + const sessionKey = resolveSessionKey({ + strategy: sessionKeyStrategy, + configuredSessionKey: nonEmpty(ctx.config.sessionKey), + runId: ctx.runId, + issueId: wakePayload.issueId ?? wakePayload.taskId, + }); + + const paperclipEnv = buildPaperclipEnvForWake(ctx, wakePayload); + const wakeText = buildWakeText(wakePayload, paperclipEnv); + + return { + method, + timeoutSec, + headers, + payloadTemplate, + wakePayload, + sessionKey, + paperclipEnv, + wakeText, + }; +} + +export function buildWakeCompatibilityPayload(wakeText: string): Record { + return { + text: wakeText, + mode: "now", + }; +} diff --git a/packages/adapters/openclaw/src/server/execute-sse.ts b/packages/adapters/openclaw/src/server/execute-sse.ts new file mode 100644 index 00000000..2729f466 --- /dev/null +++ b/packages/adapters/openclaw/src/server/execute-sse.ts @@ -0,0 +1,469 @@ +import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils"; +import { + appendWakeTextToOpenResponsesInput, + buildExecutionState, + isOpenResponsesEndpoint, + isTextRequiredResponse, + readAndLogResponseText, + redactForLog, + sendJsonRequest, + stringifyForLog, + toStringRecord, + type OpenClawExecutionState, +} from "./execute-common.js"; +import { parseOpenClawResponse } from "./parse.js"; + +type ConsumedSse = { + eventCount: number; + lastEventType: string | null; + lastData: string | null; + lastPayload: Record | null; + terminal: boolean; + failed: boolean; + errorMessage: string | null; +}; + +function nonEmpty(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; +} + +function inferSseTerminal(input: { + eventType: string; + data: string; + parsedPayload: Record | null; +}): { terminal: boolean; failed: boolean; errorMessage: string | null } { + const normalizedType = input.eventType.trim().toLowerCase(); + const trimmedData = input.data.trim(); + const payload = input.parsedPayload; + const payloadType = nonEmpty(payload?.type)?.toLowerCase() ?? null; + const payloadStatus = nonEmpty(payload?.status)?.toLowerCase() ?? null; + + if (trimmedData === "[DONE]") { + return { terminal: true, failed: false, errorMessage: null }; + } + + const failType = + normalizedType.includes("error") || + normalizedType.includes("failed") || + normalizedType.includes("cancel"); + if (failType) { + return { + terminal: true, + failed: true, + errorMessage: + nonEmpty(payload?.error) ?? + nonEmpty(payload?.message) ?? + (trimmedData.length > 0 ? trimmedData : "OpenClaw SSE error"), + }; + } + + const doneType = + normalizedType === "done" || + normalizedType.endsWith(".completed") || + normalizedType === "completed"; + if (doneType) { + return { terminal: true, failed: false, errorMessage: null }; + } + + if (payloadStatus) { + if ( + payloadStatus === "completed" || + payloadStatus === "succeeded" || + payloadStatus === "done" + ) { + return { terminal: true, failed: false, errorMessage: null }; + } + if ( + payloadStatus === "failed" || + payloadStatus === "cancelled" || + payloadStatus === "error" + ) { + return { + terminal: true, + failed: true, + errorMessage: + nonEmpty(payload?.error) ?? + nonEmpty(payload?.message) ?? + `OpenClaw SSE status ${payloadStatus}`, + }; + } + } + + if (payloadType) { + if (payloadType.endsWith(".completed")) { + return { terminal: true, failed: false, errorMessage: null }; + } + if ( + payloadType.endsWith(".failed") || + payloadType.endsWith(".cancelled") || + payloadType.endsWith(".error") + ) { + return { + terminal: true, + failed: true, + errorMessage: + nonEmpty(payload?.error) ?? + nonEmpty(payload?.message) ?? + `OpenClaw SSE type ${payloadType}`, + }; + } + } + + if (payload?.done === true) { + return { terminal: true, failed: false, errorMessage: null }; + } + + return { terminal: false, failed: false, errorMessage: null }; +} + +async function consumeSseResponse(params: { + response: Response; + onLog: AdapterExecutionContext["onLog"]; +}): Promise { + const reader = params.response.body?.getReader(); + if (!reader) { + throw new Error("OpenClaw SSE response body is missing"); + } + + const decoder = new TextDecoder(); + let buffer = ""; + let eventType = "message"; + let dataLines: string[] = []; + let eventCount = 0; + let lastEventType: string | null = null; + let lastData: string | null = null; + let lastPayload: Record | null = null; + let terminal = false; + let failed = false; + let errorMessage: string | null = null; + + const dispatchEvent = async (): Promise => { + if (dataLines.length === 0) { + eventType = "message"; + return false; + } + + const data = dataLines.join("\n"); + const trimmedData = data.trim(); + const parsedPayload = parseOpenClawResponse(trimmedData); + + eventCount += 1; + lastEventType = eventType; + lastData = data; + if (parsedPayload) lastPayload = parsedPayload; + + const preview = + trimmedData.length > 1000 ? `${trimmedData.slice(0, 1000)}...` : trimmedData; + await params.onLog("stdout", `[openclaw:sse] event=${eventType} data=${preview}\n`); + + const resolution = inferSseTerminal({ + eventType, + data, + parsedPayload, + }); + + dataLines = []; + eventType = "message"; + + if (resolution.terminal) { + terminal = true; + failed = resolution.failed; + errorMessage = resolution.errorMessage; + return true; + } + + return false; + }; + + let shouldStop = false; + while (!shouldStop) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + while (!shouldStop) { + const newlineIndex = buffer.indexOf("\n"); + if (newlineIndex === -1) break; + + let line = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + if (line.endsWith("\r")) line = line.slice(0, -1); + + if (line.length === 0) { + shouldStop = await dispatchEvent(); + continue; + } + + if (line.startsWith(":")) continue; + + const colonIndex = line.indexOf(":"); + const field = colonIndex === -1 ? line : line.slice(0, colonIndex); + const rawValue = + colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, ""); + + if (field === "event") { + eventType = rawValue || "message"; + } else if (field === "data") { + dataLines.push(rawValue); + } + } + } + + buffer += decoder.decode(); + if (!shouldStop && buffer.trim().length > 0) { + for (const rawLine of buffer.split(/\r?\n/)) { + const line = rawLine.trimEnd(); + if (line.length === 0) { + shouldStop = await dispatchEvent(); + if (shouldStop) break; + continue; + } + if (line.startsWith(":")) continue; + + const colonIndex = line.indexOf(":"); + const field = colonIndex === -1 ? line : line.slice(0, colonIndex); + const rawValue = + colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, ""); + + if (field === "event") { + eventType = rawValue || "message"; + } else if (field === "data") { + dataLines.push(rawValue); + } + } + } + + if (!shouldStop && dataLines.length > 0) { + await dispatchEvent(); + } + + return { + eventCount, + lastEventType, + lastData, + lastPayload, + terminal, + failed, + errorMessage, + }; +} + +function buildSseBody(input: { + url: string; + state: OpenClawExecutionState; + context: AdapterExecutionContext["context"]; + configModel: unknown; +}): { headers: Record; body: Record } { + const { url, state, context, configModel } = input; + const templateText = nonEmpty(state.payloadTemplate.text); + const payloadText = templateText ? `${templateText}\n\n${state.wakeText}` : state.wakeText; + + const isOpenResponses = isOpenResponsesEndpoint(url); + const openResponsesInput = Object.prototype.hasOwnProperty.call(state.payloadTemplate, "input") + ? appendWakeTextToOpenResponsesInput(state.payloadTemplate.input, state.wakeText) + : payloadText; + + const body: Record = isOpenResponses + ? { + ...state.payloadTemplate, + stream: true, + model: + nonEmpty(state.payloadTemplate.model) ?? + nonEmpty(configModel) ?? + "openclaw", + input: openResponsesInput, + metadata: { + ...toStringRecord(state.payloadTemplate.metadata), + ...state.paperclipEnv, + paperclip_session_key: state.sessionKey, + }, + } + : { + ...state.payloadTemplate, + stream: true, + sessionKey: state.sessionKey, + text: payloadText, + paperclip: { + ...state.wakePayload, + sessionKey: state.sessionKey, + streamTransport: "sse", + env: state.paperclipEnv, + context, + }, + }; + + const headers: Record = { + ...state.headers, + accept: "text/event-stream", + }; + + if (isOpenResponses && !headers["x-openclaw-session-key"] && !headers["X-OpenClaw-Session-Key"]) { + headers["x-openclaw-session-key"] = state.sessionKey; + } + + return { headers, body }; +} + +export async function executeSse(ctx: AdapterExecutionContext, url: string): Promise { + const { onLog, onMeta, context } = ctx; + const state = buildExecutionState(ctx); + + if (onMeta) { + await onMeta({ + adapterType: "openclaw", + command: "sse", + commandArgs: [state.method, url], + context, + }); + } + + const { headers, body } = buildSseBody({ + url, + state, + context, + configModel: ctx.config.model, + }); + + const outboundHeaderKeys = Object.keys(headers).sort(); + await onLog( + "stdout", + `[openclaw] outbound headers (redacted): ${stringifyForLog(redactForLog(headers), 4_000)}\n`, + ); + await onLog( + "stdout", + `[openclaw] outbound payload (redacted): ${stringifyForLog(redactForLog(body), 12_000)}\n`, + ); + await onLog("stdout", `[openclaw] outbound header keys: ${outboundHeaderKeys.join(", ")}\n`); + await onLog("stdout", `[openclaw] invoking ${state.method} ${url} (transport=sse)\n`); + + const controller = new AbortController(); + const timeout = state.timeoutSec > 0 ? setTimeout(() => controller.abort(), state.timeoutSec * 1000) : null; + + try { + const response = await sendJsonRequest({ + url, + method: state.method, + headers, + payload: body, + signal: controller.signal, + }); + + if (!response.ok) { + const responseText = await readAndLogResponseText({ response, onLog }); + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: + isTextRequiredResponse(responseText) + ? "OpenClaw endpoint rejected the payload as text-required." + : `OpenClaw SSE request failed with status ${response.status}`, + errorCode: isTextRequiredResponse(responseText) + ? "openclaw_text_required" + : "openclaw_http_error", + resultJson: { + status: response.status, + statusText: response.statusText, + response: parseOpenClawResponse(responseText) ?? responseText, + }, + }; + } + + const contentType = (response.headers.get("content-type") ?? "").toLowerCase(); + if (!contentType.includes("text/event-stream")) { + const responseText = await readAndLogResponseText({ response, onLog }); + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: "OpenClaw SSE endpoint did not return text/event-stream", + errorCode: "openclaw_sse_expected_event_stream", + resultJson: { + status: response.status, + statusText: response.statusText, + contentType, + response: parseOpenClawResponse(responseText) ?? responseText, + }, + }; + } + + const consumed = await consumeSseResponse({ response, onLog }); + if (consumed.failed) { + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed", + errorCode: "openclaw_sse_stream_failed", + resultJson: { + eventCount: consumed.eventCount, + terminal: consumed.terminal, + lastEventType: consumed.lastEventType, + lastData: consumed.lastData, + response: consumed.lastPayload ?? consumed.lastData, + }, + }; + } + + if (!consumed.terminal) { + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: "OpenClaw SSE stream closed without a terminal event", + errorCode: "openclaw_sse_stream_incomplete", + resultJson: { + eventCount: consumed.eventCount, + terminal: consumed.terminal, + lastEventType: consumed.lastEventType, + lastData: consumed.lastData, + response: consumed.lastPayload ?? consumed.lastData, + }, + }; + } + + return { + exitCode: 0, + signal: null, + timedOut: false, + provider: "openclaw", + model: null, + summary: `OpenClaw SSE ${state.method} ${url}`, + resultJson: { + eventCount: consumed.eventCount, + terminal: consumed.terminal, + lastEventType: consumed.lastEventType, + lastData: consumed.lastData, + response: consumed.lastPayload ?? consumed.lastData, + }, + }; + } catch (err) { + if (err instanceof Error && err.name === "AbortError") { + const timeoutMessage = + state.timeoutSec > 0 + ? `[openclaw] SSE request timed out after ${state.timeoutSec}s\n` + : "[openclaw] SSE request aborted\n"; + await onLog("stderr", timeoutMessage); + return { + exitCode: null, + signal: null, + timedOut: true, + errorMessage: state.timeoutSec > 0 ? `Timed out after ${state.timeoutSec}s` : "Request aborted", + errorCode: "openclaw_sse_timeout", + }; + } + + const message = err instanceof Error ? err.message : String(err); + await onLog("stderr", `[openclaw] request failed: ${message}\n`); + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: message, + errorCode: "openclaw_request_failed", + }; + } finally { + if (timeout) clearTimeout(timeout); + } +} diff --git a/packages/adapters/openclaw/src/server/execute-webhook.ts b/packages/adapters/openclaw/src/server/execute-webhook.ts new file mode 100644 index 00000000..483eb3c0 --- /dev/null +++ b/packages/adapters/openclaw/src/server/execute-webhook.ts @@ -0,0 +1,227 @@ +import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils"; +import { + appendWakeText, + buildExecutionState, + buildWakeCompatibilityPayload, + isTextRequiredResponse, + isWakeCompatibilityEndpoint, + readAndLogResponseText, + redactForLog, + sendJsonRequest, + stringifyForLog, + type OpenClawExecutionState, +} from "./execute-common.js"; +import { parseOpenClawResponse } from "./parse.js"; + +function nonEmpty(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; +} + +function buildWebhookBody(input: { + state: OpenClawExecutionState; + context: AdapterExecutionContext["context"]; +}): Record { + const { state, context } = input; + const templateText = nonEmpty(state.payloadTemplate.text); + const payloadText = templateText ? appendWakeText(templateText, state.wakeText) : state.wakeText; + + return { + ...state.payloadTemplate, + stream: false, + sessionKey: state.sessionKey, + text: payloadText, + paperclip: { + ...state.wakePayload, + sessionKey: state.sessionKey, + streamTransport: "webhook", + env: state.paperclipEnv, + context, + }, + }; +} + +async function sendWebhookRequest(params: { + url: string; + method: string; + headers: Record; + payload: Record; + onLog: AdapterExecutionContext["onLog"]; + signal: AbortSignal; +}): Promise<{ response: Response; responseText: string }> { + const response = await sendJsonRequest({ + url: params.url, + method: params.method, + headers: params.headers, + payload: params.payload, + signal: params.signal, + }); + + const responseText = await readAndLogResponseText({ response, onLog: params.onLog }); + return { response, responseText }; +} + +export async function executeWebhook(ctx: AdapterExecutionContext, url: string): Promise { + const { onLog, onMeta, context } = ctx; + const state = buildExecutionState(ctx); + + if (onMeta) { + await onMeta({ + adapterType: "openclaw", + command: "webhook", + commandArgs: [state.method, url], + context, + }); + } + + const headers = { ...state.headers }; + const webhookBody = buildWebhookBody({ state, context }); + const wakeCompatibilityBody = buildWakeCompatibilityPayload(state.wakeText); + const preferWakeCompatibilityBody = isWakeCompatibilityEndpoint(url); + const initialBody = preferWakeCompatibilityBody ? wakeCompatibilityBody : webhookBody; + + const outboundHeaderKeys = Object.keys(headers).sort(); + await onLog( + "stdout", + `[openclaw] outbound headers (redacted): ${stringifyForLog(redactForLog(headers), 4_000)}\n`, + ); + await onLog( + "stdout", + `[openclaw] outbound payload (redacted): ${stringifyForLog(redactForLog(initialBody), 12_000)}\n`, + ); + await onLog("stdout", `[openclaw] outbound header keys: ${outboundHeaderKeys.join(", ")}\n`); + await onLog("stdout", `[openclaw] invoking ${state.method} ${url} (transport=webhook)\n`); + + if (preferWakeCompatibilityBody) { + await onLog("stdout", "[openclaw] using wake text payload for /hooks/wake compatibility\n"); + } + + const controller = new AbortController(); + const timeout = state.timeoutSec > 0 ? setTimeout(() => controller.abort(), state.timeoutSec * 1000) : null; + + try { + const initialResponse = await sendWebhookRequest({ + url, + method: state.method, + headers, + payload: initialBody, + onLog, + signal: controller.signal, + }); + + if (!initialResponse.response.ok) { + const canRetryWithWakeCompatibility = + !preferWakeCompatibilityBody && isTextRequiredResponse(initialResponse.responseText); + + if (canRetryWithWakeCompatibility) { + await onLog( + "stdout", + "[openclaw] endpoint requires text payload; retrying with wake compatibility format\n", + ); + + const retryResponse = await sendWebhookRequest({ + url, + method: state.method, + headers, + payload: wakeCompatibilityBody, + onLog, + signal: controller.signal, + }); + + if (retryResponse.response.ok) { + return { + exitCode: 0, + signal: null, + timedOut: false, + provider: "openclaw", + model: null, + summary: `OpenClaw webhook ${state.method} ${url} (wake compatibility)`, + resultJson: { + status: retryResponse.response.status, + statusText: retryResponse.response.statusText, + compatibilityMode: "wake_text", + response: parseOpenClawResponse(retryResponse.responseText) ?? retryResponse.responseText, + }, + }; + } + + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: + isTextRequiredResponse(retryResponse.responseText) + ? "OpenClaw endpoint rejected the wake compatibility payload as text-required." + : `OpenClaw webhook failed with status ${retryResponse.response.status}`, + errorCode: isTextRequiredResponse(retryResponse.responseText) + ? "openclaw_text_required" + : "openclaw_http_error", + resultJson: { + status: retryResponse.response.status, + statusText: retryResponse.response.statusText, + compatibilityMode: "wake_text", + response: parseOpenClawResponse(retryResponse.responseText) ?? retryResponse.responseText, + }, + }; + } + + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: + isTextRequiredResponse(initialResponse.responseText) + ? "OpenClaw endpoint rejected the payload as text-required." + : `OpenClaw webhook failed with status ${initialResponse.response.status}`, + errorCode: isTextRequiredResponse(initialResponse.responseText) + ? "openclaw_text_required" + : "openclaw_http_error", + resultJson: { + status: initialResponse.response.status, + statusText: initialResponse.response.statusText, + response: parseOpenClawResponse(initialResponse.responseText) ?? initialResponse.responseText, + }, + }; + } + + return { + exitCode: 0, + signal: null, + timedOut: false, + provider: "openclaw", + model: null, + summary: `OpenClaw webhook ${state.method} ${url}`, + resultJson: { + status: initialResponse.response.status, + statusText: initialResponse.response.statusText, + response: parseOpenClawResponse(initialResponse.responseText) ?? initialResponse.responseText, + }, + }; + } catch (err) { + if (err instanceof Error && err.name === "AbortError") { + const timeoutMessage = + state.timeoutSec > 0 + ? `[openclaw] webhook request timed out after ${state.timeoutSec}s\n` + : "[openclaw] webhook request aborted\n"; + await onLog("stderr", timeoutMessage); + return { + exitCode: null, + signal: null, + timedOut: true, + errorMessage: state.timeoutSec > 0 ? `Timed out after ${state.timeoutSec}s` : "Request aborted", + errorCode: "openclaw_webhook_timeout", + }; + } + + const message = err instanceof Error ? err.message : String(err); + await onLog("stderr", `[openclaw] request failed: ${message}\n`); + return { + exitCode: 1, + signal: null, + timedOut: false, + errorMessage: message, + errorCode: "openclaw_request_failed", + }; + } finally { + if (timeout) clearTimeout(timeout); + } +} diff --git a/packages/adapters/openclaw/src/server/execute.ts b/packages/adapters/openclaw/src/server/execute.ts index 487ee2b4..68b651ea 100644 --- a/packages/adapters/openclaw/src/server/execute.ts +++ b/packages/adapters/openclaw/src/server/execute.ts @@ -1,523 +1,18 @@ import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils"; -import { asNumber, asString, buildPaperclipEnv, parseObject } from "@paperclipai/adapter-utils/server-utils"; -import { createHash } from "node:crypto"; -import { parseOpenClawResponse } from "./parse.js"; +import { asString } from "@paperclipai/adapter-utils/server-utils"; +import { isWakeCompatibilityEndpoint } from "./execute-common.js"; +import { executeSse } from "./execute-sse.js"; +import { executeWebhook } from "./execute-webhook.js"; -type SessionKeyStrategy = "fixed" | "issue" | "run"; - -function nonEmpty(value: unknown): string | null { - return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; -} - -function toAuthorizationHeaderValue(rawToken: string): string { - const trimmed = rawToken.trim(); - if (!trimmed) return trimmed; - return /^bearer\s+/i.test(trimmed) ? trimmed : `Bearer ${trimmed}`; -} - -function resolvePaperclipApiUrlOverride(value: unknown): string | null { - const raw = nonEmpty(value); - if (!raw) return null; - try { - const parsed = new URL(raw); - if (parsed.protocol !== "http:" && parsed.protocol !== "https:") return null; - return parsed.toString(); - } catch { - return null; - } -} - -function normalizeSessionKeyStrategy(value: unknown): SessionKeyStrategy { - const normalized = asString(value, "fixed").trim().toLowerCase(); - if (normalized === "issue" || normalized === "run") return normalized; - return "fixed"; -} - -function resolveSessionKey(input: { - strategy: SessionKeyStrategy; - configuredSessionKey: string | null; - runId: string; - issueId: string | null; -}): string { - const fallback = input.configuredSessionKey ?? "paperclip"; - if (input.strategy === "run") return `paperclip:run:${input.runId}`; - if (input.strategy === "issue" && input.issueId) return `paperclip:issue:${input.issueId}`; - return fallback; -} - -function isWakeCompatibilityEndpoint(url: string): boolean { - try { - const parsed = new URL(url); - const path = parsed.pathname.toLowerCase(); - return path === "/hooks/wake" || path.endsWith("/hooks/wake"); - } catch { - return false; - } -} - -function isOpenResponsesEndpoint(url: string): boolean { - try { - const parsed = new URL(url); - const path = parsed.pathname.toLowerCase(); - return path === "/v1/responses" || path.endsWith("/v1/responses"); - } catch { - return false; - } -} - -function toStringRecord(value: unknown): Record { - const parsed = parseObject(value); - const out: Record = {}; - for (const [key, entry] of Object.entries(parsed)) { - if (typeof entry === "string") { - out[key] = entry; - } - } - return out; -} - -const SENSITIVE_LOG_KEY_PATTERN = - /(^|[_-])(auth|authorization|token|secret|password|api[_-]?key|private[_-]?key)([_-]|$)|^x-openclaw-auth$/i; - -function isSensitiveLogKey(key: string): boolean { - return SENSITIVE_LOG_KEY_PATTERN.test(key.trim()); -} - -function sha256Prefix(value: string): string { - return createHash("sha256").update(value).digest("hex").slice(0, 12); -} - -function redactSecretForLog(value: string): string { - return `[redacted len=${value.length} sha256=${sha256Prefix(value)}]`; -} - -function truncateForLog(value: string, maxChars = 320): string { - if (value.length <= maxChars) return value; - return `${value.slice(0, maxChars)}... [truncated ${value.length - maxChars} chars]`; -} - -function redactForLog(value: unknown, keyPath: string[] = [], depth = 0): unknown { - const currentKey = keyPath[keyPath.length - 1] ?? ""; - if (typeof value === "string") { - if (isSensitiveLogKey(currentKey)) return redactSecretForLog(value); - return truncateForLog(value); - } - if (typeof value === "number" || typeof value === "boolean" || value == null) { - return value; - } - if (Array.isArray(value)) { - if (depth >= 6) return "[array-truncated]"; - const out = value.slice(0, 20).map((entry, index) => redactForLog(entry, [...keyPath, `${index}`], depth + 1)); - if (value.length > 20) out.push(`[+${value.length - 20} more items]`); - return out; - } - if (typeof value === "object") { - if (depth >= 6) return "[object-truncated]"; - const entries = Object.entries(value as Record); - const out: Record = {}; - for (const [key, entry] of entries.slice(0, 80)) { - out[key] = redactForLog(entry, [...keyPath, key], depth + 1); - } - if (entries.length > 80) { - out.__truncated__ = `+${entries.length - 80} keys`; - } - return out; - } - return String(value); -} - -function stringifyForLog(value: unknown, maxChars: number): string { - const text = JSON.stringify(value); - if (text.length <= maxChars) return text; - return `${text.slice(0, maxChars)}... [truncated ${text.length - maxChars} chars]`; -} - -type WakePayload = { - runId: string; - agentId: string; - companyId: string; - taskId: string | null; - issueId: string | null; - wakeReason: string | null; - wakeCommentId: string | null; - approvalId: string | null; - approvalStatus: string | null; - issueIds: string[]; -}; - -function buildWakeText(payload: WakePayload, paperclipEnv: Record): string { - const claimedApiKeyPath = "~/.openclaw/workspace/paperclip-claimed-api-key.json"; - const orderedKeys = [ - "PAPERCLIP_RUN_ID", - "PAPERCLIP_AGENT_ID", - "PAPERCLIP_COMPANY_ID", - "PAPERCLIP_API_URL", - "PAPERCLIP_TASK_ID", - "PAPERCLIP_WAKE_REASON", - "PAPERCLIP_WAKE_COMMENT_ID", - "PAPERCLIP_APPROVAL_ID", - "PAPERCLIP_APPROVAL_STATUS", - "PAPERCLIP_LINKED_ISSUE_IDS", - ]; - - const envLines: string[] = []; - for (const key of orderedKeys) { - const value = paperclipEnv[key]; - if (!value) continue; - envLines.push(`${key}=${value}`); - } - - const lines = [ - "Paperclip wake event for a cloud adapter.", - "", - "Set these values in your run context:", - ...envLines, - `PAPERCLIP_API_KEY=`, - "", - `Load PAPERCLIP_API_KEY from ${claimedApiKeyPath} (the token you saved after claim-api-key).`, - "", - `task_id=${payload.taskId ?? ""}`, - `issue_id=${payload.issueId ?? ""}`, - `wake_reason=${payload.wakeReason ?? ""}`, - `wake_comment_id=${payload.wakeCommentId ?? ""}`, - `approval_id=${payload.approvalId ?? ""}`, - `approval_status=${payload.approvalStatus ?? ""}`, - `linked_issue_ids=${payload.issueIds.join(",")}`, - ]; - - lines.push("", "Run your Paperclip heartbeat procedure now."); - return lines.join("\n"); -} - -function appendWakeText(baseText: string, wakeText: string): string { - const trimmedBase = baseText.trim(); - return trimmedBase.length > 0 ? `${trimmedBase}\n\n${wakeText}` : wakeText; -} - -function buildOpenResponsesWakeInputMessage(wakeText: string): Record { - return { - type: "message", - role: "user", - content: [ - { - type: "input_text", - text: wakeText, - }, - ], - }; -} - -function appendWakeTextToOpenResponsesInput(input: unknown, wakeText: string): unknown { - if (typeof input === "string") { - return appendWakeText(input, wakeText); - } - - if (Array.isArray(input)) { - return [...input, buildOpenResponsesWakeInputMessage(wakeText)]; - } - - if (typeof input === "object" && input !== null) { - const parsed = parseObject(input); - const content = parsed.content; - if (typeof content === "string") { - return { - ...parsed, - content: appendWakeText(content, wakeText), - }; - } - if (Array.isArray(content)) { - return { - ...parsed, - content: [ - ...content, - { - type: "input_text", - text: wakeText, - }, - ], - }; - } - return [parsed, buildOpenResponsesWakeInputMessage(wakeText)]; - } - - return wakeText; -} - -function isTextRequiredResponse(responseText: string): boolean { - const parsed = parseOpenClawResponse(responseText); - const parsedError = parsed && typeof parsed.error === "string" ? parsed.error : null; - if (parsedError && parsedError.toLowerCase().includes("text required")) { - return true; - } - return responseText.toLowerCase().includes("text required"); -} - -async function sendJsonRequest(params: { - url: string; - method: string; - headers: Record; - payload: Record; - signal: AbortSignal; -}): Promise { - return fetch(params.url, { - method: params.method, - headers: params.headers, - body: JSON.stringify(params.payload), - signal: params.signal, - }); -} - -async function readAndLogResponseText(params: { - response: Response; - onLog: AdapterExecutionContext["onLog"]; -}): Promise { - const responseText = await params.response.text(); - if (responseText.trim().length > 0) { - await params.onLog( - "stdout", - `[openclaw] response (${params.response.status}) ${responseText.slice(0, 2000)}\n`, - ); - } else { - await params.onLog("stdout", `[openclaw] response (${params.response.status}) \n`); - } - return responseText; -} - -type ConsumedSse = { - eventCount: number; - lastEventType: string | null; - lastData: string | null; - lastPayload: Record | null; - terminal: boolean; - failed: boolean; - errorMessage: string | null; -}; - -function inferSseTerminal(input: { - eventType: string; - data: string; - parsedPayload: Record | null; -}): { terminal: boolean; failed: boolean; errorMessage: string | null } { - const normalizedType = input.eventType.trim().toLowerCase(); - const trimmedData = input.data.trim(); - const payload = input.parsedPayload; - const payloadType = nonEmpty(payload?.type)?.toLowerCase() ?? null; - const payloadStatus = nonEmpty(payload?.status)?.toLowerCase() ?? null; - - if (trimmedData === "[DONE]") { - return { terminal: true, failed: false, errorMessage: null }; - } - - const failType = - normalizedType.includes("error") || - normalizedType.includes("failed") || - normalizedType.includes("cancel"); - if (failType) { - return { - terminal: true, - failed: true, - errorMessage: - nonEmpty(payload?.error) ?? - nonEmpty(payload?.message) ?? - (trimmedData.length > 0 ? trimmedData : "OpenClaw SSE error"), - }; - } - - const doneType = - normalizedType === "done" || - normalizedType.endsWith(".completed") || - normalizedType === "completed"; - if (doneType) { - return { terminal: true, failed: false, errorMessage: null }; - } - - if (payloadStatus) { - if ( - payloadStatus === "completed" || - payloadStatus === "succeeded" || - payloadStatus === "done" - ) { - return { terminal: true, failed: false, errorMessage: null }; - } - if ( - payloadStatus === "failed" || - payloadStatus === "cancelled" || - payloadStatus === "error" - ) { - return { - terminal: true, - failed: true, - errorMessage: - nonEmpty(payload?.error) ?? - nonEmpty(payload?.message) ?? - `OpenClaw SSE status ${payloadStatus}`, - }; - } - } - - if (payloadType) { - if (payloadType.endsWith(".completed")) { - return { terminal: true, failed: false, errorMessage: null }; - } - if ( - payloadType.endsWith(".failed") || - payloadType.endsWith(".cancelled") || - payloadType.endsWith(".error") - ) { - return { - terminal: true, - failed: true, - errorMessage: - nonEmpty(payload?.error) ?? - nonEmpty(payload?.message) ?? - `OpenClaw SSE type ${payloadType}`, - }; - } - } - - if (payload?.done === true) { - return { terminal: true, failed: false, errorMessage: null }; - } - - return { terminal: false, failed: false, errorMessage: null }; -} - -async function consumeSseResponse(params: { - response: Response; - onLog: AdapterExecutionContext["onLog"]; -}): Promise { - const reader = params.response.body?.getReader(); - if (!reader) { - throw new Error("OpenClaw SSE response body is missing"); - } - - const decoder = new TextDecoder(); - let buffer = ""; - let eventType = "message"; - let dataLines: string[] = []; - let eventCount = 0; - let lastEventType: string | null = null; - let lastData: string | null = null; - let lastPayload: Record | null = null; - let terminal = false; - let failed = false; - let errorMessage: string | null = null; - - const dispatchEvent = async (): Promise => { - if (dataLines.length === 0) { - eventType = "message"; - return false; - } - - const data = dataLines.join("\n"); - const trimmedData = data.trim(); - const parsedPayload = parseOpenClawResponse(trimmedData); - - eventCount += 1; - lastEventType = eventType; - lastData = data; - if (parsedPayload) lastPayload = parsedPayload; - - const preview = - trimmedData.length > 1000 ? `${trimmedData.slice(0, 1000)}...` : trimmedData; - await params.onLog("stdout", `[openclaw:sse] event=${eventType} data=${preview}\n`); - - const resolution = inferSseTerminal({ - eventType, - data, - parsedPayload, - }); - - dataLines = []; - eventType = "message"; - - if (resolution.terminal) { - terminal = true; - failed = resolution.failed; - errorMessage = resolution.errorMessage; - return true; - } - - return false; - }; - - let shouldStop = false; - while (!shouldStop) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - - while (!shouldStop) { - const newlineIndex = buffer.indexOf("\n"); - if (newlineIndex === -1) break; - - let line = buffer.slice(0, newlineIndex); - buffer = buffer.slice(newlineIndex + 1); - if (line.endsWith("\r")) line = line.slice(0, -1); - - if (line.length === 0) { - shouldStop = await dispatchEvent(); - continue; - } - - if (line.startsWith(":")) continue; - - const colonIndex = line.indexOf(":"); - const field = colonIndex === -1 ? line : line.slice(0, colonIndex); - const rawValue = - colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, ""); - - if (field === "event") { - eventType = rawValue || "message"; - } else if (field === "data") { - dataLines.push(rawValue); - } - } - } - - buffer += decoder.decode(); - if (!shouldStop && buffer.trim().length > 0) { - for (const rawLine of buffer.split(/\r?\n/)) { - const line = rawLine.trimEnd(); - if (line.length === 0) { - shouldStop = await dispatchEvent(); - if (shouldStop) break; - continue; - } - if (line.startsWith(":")) continue; - - const colonIndex = line.indexOf(":"); - const field = colonIndex === -1 ? line : line.slice(0, colonIndex); - const rawValue = - colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, ""); - - if (field === "event") { - eventType = rawValue || "message"; - } else if (field === "data") { - dataLines.push(rawValue); - } - } - } - - if (!shouldStop && dataLines.length > 0) { - await dispatchEvent(); - } - - return { - eventCount, - lastEventType, - lastData, - lastPayload, - terminal, - failed, - errorMessage, - }; +function normalizeTransport(value: unknown): "sse" | "webhook" | null { + const normalized = asString(value, "sse").trim().toLowerCase(); + if (!normalized || normalized === "sse") return "sse"; + if (normalized === "webhook") return "webhook"; + return null; } export async function execute(ctx: AdapterExecutionContext): Promise { - const { config, runId, agent, context, onLog, onMeta } = ctx; - const url = asString(config.url, "").trim(); + const url = asString(ctx.config.url, "").trim(); if (!url) { return { exitCode: 1, @@ -528,289 +23,31 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 ? Math.max(1, Math.floor(timeoutSecRaw)) : 0; - const headersConfig = parseObject(config.headers) as Record; - const payloadTemplate = parseObject(config.payloadTemplate); - const webhookAuthHeader = nonEmpty(config.webhookAuthHeader); - const sessionKeyStrategy = normalizeSessionKeyStrategy(config.sessionKeyStrategy); - - const headers: Record = { - "content-type": "application/json", - }; - for (const [key, value] of Object.entries(headersConfig)) { - if (typeof value === "string" && value.trim().length > 0) { - headers[key] = value; - } - } - const openClawAuthHeader = nonEmpty(headers["x-openclaw-auth"] ?? headers["X-OpenClaw-Auth"]); - if (openClawAuthHeader && !headers.authorization && !headers.Authorization) { - headers.authorization = toAuthorizationHeaderValue(openClawAuthHeader); - } - if (webhookAuthHeader && !headers.authorization && !headers.Authorization) { - headers.authorization = webhookAuthHeader; - } - - const wakePayload = { - runId, - agentId: agent.id, - companyId: agent.companyId, - taskId: nonEmpty(context.taskId) ?? nonEmpty(context.issueId), - issueId: nonEmpty(context.issueId), - wakeReason: nonEmpty(context.wakeReason), - wakeCommentId: nonEmpty(context.wakeCommentId) ?? nonEmpty(context.commentId), - approvalId: nonEmpty(context.approvalId), - approvalStatus: nonEmpty(context.approvalStatus), - issueIds: Array.isArray(context.issueIds) - ? context.issueIds.filter( - (value): value is string => typeof value === "string" && value.trim().length > 0, - ) - : [], - }; - - const sessionKey = resolveSessionKey({ - strategy: sessionKeyStrategy, - configuredSessionKey: nonEmpty(config.sessionKey), - runId, - issueId: wakePayload.issueId ?? wakePayload.taskId, - }); - - const templateText = nonEmpty(payloadTemplate.text); - const paperclipApiUrlOverride = resolvePaperclipApiUrlOverride(config.paperclipApiUrl); - const paperclipEnv: Record = { - ...buildPaperclipEnv(agent), - PAPERCLIP_RUN_ID: runId, - }; - if (paperclipApiUrlOverride) { - paperclipEnv.PAPERCLIP_API_URL = paperclipApiUrlOverride; - } - if (wakePayload.taskId) paperclipEnv.PAPERCLIP_TASK_ID = wakePayload.taskId; - if (wakePayload.wakeReason) paperclipEnv.PAPERCLIP_WAKE_REASON = wakePayload.wakeReason; - if (wakePayload.wakeCommentId) paperclipEnv.PAPERCLIP_WAKE_COMMENT_ID = wakePayload.wakeCommentId; - if (wakePayload.approvalId) paperclipEnv.PAPERCLIP_APPROVAL_ID = wakePayload.approvalId; - if (wakePayload.approvalStatus) paperclipEnv.PAPERCLIP_APPROVAL_STATUS = wakePayload.approvalStatus; - if (wakePayload.issueIds.length > 0) { - paperclipEnv.PAPERCLIP_LINKED_ISSUE_IDS = wakePayload.issueIds.join(","); - } - - const wakeText = buildWakeText(wakePayload, paperclipEnv); - const payloadText = templateText ? `${templateText}\n\n${wakeText}` : wakeText; - const isOpenResponses = isOpenResponsesEndpoint(url); - const openResponsesInput = Object.prototype.hasOwnProperty.call(payloadTemplate, "input") - ? appendWakeTextToOpenResponsesInput(payloadTemplate.input, wakeText) - : payloadText; - - const paperclipBody: Record = isOpenResponses - ? { - ...payloadTemplate, - stream: true, - model: - nonEmpty(payloadTemplate.model) ?? - nonEmpty(config.model) ?? - "openclaw", - input: openResponsesInput, - metadata: { - ...toStringRecord(payloadTemplate.metadata), - ...paperclipEnv, - paperclip_session_key: sessionKey, - }, - } - : { - ...payloadTemplate, - stream: true, - sessionKey, - text: payloadText, - paperclip: { - ...wakePayload, - sessionKey, - streamTransport: "sse", - env: paperclipEnv, - context, - }, - }; - - if (isOpenResponses) { - delete paperclipBody.text; - delete paperclipBody.sessionKey; - delete paperclipBody.paperclip; - if (!headers["x-openclaw-session-key"] && !headers["X-OpenClaw-Session-Key"]) { - headers["x-openclaw-session-key"] = sessionKey; - } - } - - if (onMeta) { - await onMeta({ - adapterType: "openclaw", - command: "sse", - commandArgs: [method, url], - context, - }); - } - - const outboundHeaderKeys = Array.from(new Set([...Object.keys(headers), "accept"])).sort(); - await onLog( - "stdout", - `[openclaw] outbound headers (redacted): ${stringifyForLog(redactForLog(headers), 4_000)}\n`, - ); - await onLog( - "stdout", - `[openclaw] outbound payload (redacted): ${stringifyForLog(redactForLog(paperclipBody), 12_000)}\n`, - ); - await onLog("stdout", `[openclaw] outbound header keys: ${outboundHeaderKeys.join(", ")}\n`); - await onLog("stdout", `[openclaw] invoking ${method} ${url} (transport=sse)\n`); - - const controller = new AbortController(); - const timeout = timeoutSec > 0 ? setTimeout(() => controller.abort(), timeoutSec * 1000) : null; - - try { - const response = await sendJsonRequest({ - url, - method, - headers: { - ...headers, - accept: "text/event-stream", - }, - payload: paperclipBody, - signal: controller.signal, - }); - - if (!response.ok) { - const responseText = await readAndLogResponseText({ response, onLog }); - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: - isTextRequiredResponse(responseText) - ? "OpenClaw endpoint rejected the payload as text-required." - : `OpenClaw SSE request failed with status ${response.status}`, - errorCode: isTextRequiredResponse(responseText) - ? "openclaw_text_required" - : "openclaw_http_error", - resultJson: { - status: response.status, - statusText: response.statusText, - response: parseOpenClawResponse(responseText) ?? responseText, - }, - }; - } - - const contentType = (response.headers.get("content-type") ?? "").toLowerCase(); - if (!contentType.includes("text/event-stream")) { - const responseText = await readAndLogResponseText({ response, onLog }); - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: "OpenClaw SSE endpoint did not return text/event-stream", - errorCode: "openclaw_sse_expected_event_stream", - resultJson: { - status: response.status, - statusText: response.statusText, - contentType, - response: parseOpenClawResponse(responseText) ?? responseText, - }, - }; - } - - const consumed = await consumeSseResponse({ response, onLog }); - if (consumed.failed) { - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed", - errorCode: "openclaw_sse_stream_failed", - resultJson: { - eventCount: consumed.eventCount, - terminal: consumed.terminal, - lastEventType: consumed.lastEventType, - lastData: consumed.lastData, - response: consumed.lastPayload ?? consumed.lastData, - }, - }; - } - - if (!consumed.terminal) { - return { - exitCode: 1, - signal: null, - timedOut: false, - errorMessage: "OpenClaw SSE stream closed without a terminal event", - errorCode: "openclaw_sse_stream_incomplete", - resultJson: { - eventCount: consumed.eventCount, - terminal: consumed.terminal, - lastEventType: consumed.lastEventType, - lastData: consumed.lastData, - response: consumed.lastPayload ?? consumed.lastData, - }, - }; - } - - return { - exitCode: 0, - signal: null, - timedOut: false, - provider: "openclaw", - model: null, - summary: `OpenClaw SSE ${method} ${url}`, - resultJson: { - eventCount: consumed.eventCount, - terminal: consumed.terminal, - lastEventType: consumed.lastEventType, - lastData: consumed.lastData, - response: consumed.lastPayload ?? consumed.lastData, - }, - }; - } catch (err) { - if (err instanceof Error && err.name === "AbortError") { - const timeoutMessage = - timeoutSec > 0 - ? `[openclaw] SSE request timed out after ${timeoutSec}s\n` - : "[openclaw] SSE request aborted\n"; - await onLog("stderr", timeoutMessage); - return { - exitCode: null, - signal: null, - timedOut: true, - errorMessage: timeoutSec > 0 ? `Timed out after ${timeoutSec}s` : "Request aborted", - errorCode: "openclaw_sse_timeout", - }; - } - - const message = err instanceof Error ? err.message : String(err); - await onLog("stderr", `[openclaw] request failed: ${message}\n`); + if (transport === "sse" && isWakeCompatibilityEndpoint(url)) { return { exitCode: 1, signal: null, timedOut: false, - errorMessage: message, - errorCode: "openclaw_request_failed", + errorMessage: "OpenClaw /hooks/wake is not stream-capable. Use SSE transport with a streaming endpoint.", + errorCode: "openclaw_sse_incompatible_endpoint", }; - } finally { - if (timeout) clearTimeout(timeout); } + + if (transport === "webhook") { + return executeWebhook(ctx, url); + } + + return executeSse(ctx, url); } diff --git a/packages/adapters/openclaw/src/server/test.ts b/packages/adapters/openclaw/src/server/test.ts index 83c02f3f..00e252ad 100644 --- a/packages/adapters/openclaw/src/server/test.ts +++ b/packages/adapters/openclaw/src/server/test.ts @@ -34,6 +34,13 @@ function isWakePath(pathname: string): boolean { return value === "/hooks/wake" || value.endsWith("/hooks/wake"); } +function normalizeTransport(value: unknown): "sse" | "webhook" | null { + const normalized = asString(value, "sse").trim().toLowerCase(); + if (!normalized || normalized === "sse") return "sse"; + if (normalized === "webhook") return "webhook"; + return null; +} + function pushDeploymentDiagnostics( checks: AdapterEnvironmentCheck[], ctx: AdapterEnvironmentTestContext, @@ -102,13 +109,15 @@ export async function testEnvironment( const checks: AdapterEnvironmentCheck[] = []; const config = parseObject(ctx.config); const urlValue = asString(config.url, ""); + const streamTransportValue = config.streamTransport ?? config.transport; + const streamTransport = normalizeTransport(streamTransportValue); if (!urlValue) { checks.push({ code: "openclaw_url_missing", level: "error", - message: "OpenClaw adapter requires a streaming endpoint URL.", - hint: "Set adapterConfig.url to your OpenClaw SSE endpoint.", + message: "OpenClaw adapter requires an endpoint URL.", + hint: "Set adapterConfig.url to your OpenClaw transport endpoint.", }); return { adapterType: ctx.adapterType, @@ -154,23 +163,28 @@ export async function testEnvironment( }); } - if (isWakePath(url.pathname)) { + if (streamTransport === "sse" && isWakePath(url.pathname)) { checks.push({ code: "openclaw_wake_endpoint_incompatible", level: "error", - message: "Endpoint targets /hooks/wake, which is not stream-capable for strict SSE mode.", + message: "Endpoint targets /hooks/wake, which is not stream-capable for SSE transport.", hint: "Use an endpoint that returns text/event-stream for the full run duration.", }); } } - const streamTransport = asString(config.streamTransport, "sse").trim().toLowerCase(); - if (streamTransport && streamTransport !== "sse") { + if (!streamTransport) { checks.push({ code: "openclaw_stream_transport_unsupported", level: "error", - message: `Unsupported streamTransport: ${streamTransport}`, - hint: "OpenClaw adapter now requires streamTransport=sse.", + message: `Unsupported streamTransport: ${String(streamTransportValue)}`, + hint: "Use streamTransport=sse or streamTransport=webhook.", + }); + } else { + checks.push({ + code: "openclaw_stream_transport_configured", + level: "info", + message: `Configured stream transport: ${streamTransport}`, }); } diff --git a/server/src/__tests__/openclaw-adapter.test.ts b/server/src/__tests__/openclaw-adapter.test.ts index 00bef759..aa5d4999 100644 --- a/server/src/__tests__/openclaw-adapter.test.ts +++ b/server/src/__tests__/openclaw-adapter.test.ts @@ -159,7 +159,7 @@ describe("openclaw ui stdout parser", () => { }); describe("openclaw adapter execute", () => { - it("uses strict SSE and includes canonical PAPERCLIP context in text payload", async () => { + it("uses SSE transport and includes canonical PAPERCLIP context in text payload", async () => { const fetchMock = vi.fn().mockResolvedValue( sseResponse([ "event: response.completed\n", @@ -534,14 +534,109 @@ describe("openclaw adapter execute", () => { expect(result.errorCode).toBe("openclaw_text_required"); }); - it("rejects non-sse transport configuration", async () => { + it("supports webhook transport and sends Paperclip webhook payloads", async () => { + const fetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ ok: true }), { + status: 200, + statusText: "OK", + headers: { + "content-type": "application/json", + }, + }), + ); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/webhook", + streamTransport: "webhook", + payloadTemplate: { foo: "bar" }, + }), + ); + + expect(result.exitCode).toBe(0); + expect(fetchMock).toHaveBeenCalledTimes(1); + const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; + expect(body.foo).toBe("bar"); + expect(body.stream).toBe(false); + expect(body.sessionKey).toBe("paperclip"); + expect(String(body.text ?? "")).toContain("PAPERCLIP_RUN_ID=run-123"); + expect((body.paperclip as Record).streamTransport).toBe("webhook"); + }); + + it("uses wake compatibility payloads for /hooks/wake when transport=webhook", async () => { + const fetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ ok: true }), { + status: 200, + statusText: "OK", + headers: { + "content-type": "application/json", + }, + }), + ); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/hooks/wake", + streamTransport: "webhook", + }), + ); + + expect(result.exitCode).toBe(0); + const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; + expect(body.mode).toBe("now"); + expect(String(body.text ?? "")).toContain("PAPERCLIP_RUN_ID=run-123"); + expect(body.paperclip).toBeUndefined(); + }); + + it("retries webhook payloads with wake compatibility format on text-required errors", async () => { + const fetchMock = vi + .fn() + .mockResolvedValueOnce( + new Response(JSON.stringify({ error: "text required" }), { + status: 400, + statusText: "Bad Request", + headers: { + "content-type": "application/json", + }, + }), + ) + .mockResolvedValueOnce( + new Response(JSON.stringify({ ok: true }), { + status: 200, + statusText: "OK", + headers: { + "content-type": "application/json", + }, + }), + ); + vi.stubGlobal("fetch", fetchMock); + + const result = await execute( + buildContext({ + url: "https://agent.example/v1/responses", + streamTransport: "webhook", + }), + ); + + expect(result.exitCode).toBe(0); + expect(fetchMock).toHaveBeenCalledTimes(2); + const firstBody = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record; + const secondBody = JSON.parse(String(fetchMock.mock.calls[1]?.[1]?.body ?? "{}")) as Record; + expect(firstBody.paperclip).toBeTypeOf("object"); + expect(secondBody.mode).toBe("now"); + expect(String(secondBody.text ?? "")).toContain("PAPERCLIP_RUN_ID=run-123"); + }); + + it("rejects unsupported transport configuration", async () => { const fetchMock = vi.fn(); vi.stubGlobal("fetch", fetchMock); const result = await execute( buildContext({ url: "https://agent.example/sse", - streamTransport: "webhook", + streamTransport: "invalid", }), ); @@ -550,7 +645,7 @@ describe("openclaw adapter execute", () => { expect(fetchMock).not.toHaveBeenCalled(); }); - it("rejects /hooks/wake compatibility endpoints in strict SSE mode", async () => { + it("rejects /hooks/wake compatibility endpoints in SSE mode", async () => { const fetchMock = vi.fn(); vi.stubGlobal("fetch", fetchMock); @@ -567,7 +662,7 @@ describe("openclaw adapter execute", () => { }); describe("openclaw adapter environment checks", () => { - it("reports /hooks/wake endpoints as incompatible for strict SSE mode", async () => { + it("reports /hooks/wake endpoints as incompatible for SSE mode", async () => { const fetchMock = vi .fn() .mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" })); @@ -602,13 +697,36 @@ describe("openclaw adapter environment checks", () => { adapterType: "openclaw", config: { url: "https://agent.example/sse", - streamTransport: "webhook", + streamTransport: "invalid", }, }); const check = result.checks.find((entry) => entry.code === "openclaw_stream_transport_unsupported"); expect(check?.level).toBe("error"); }); + + it("accepts webhook streamTransport settings", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" })); + vi.stubGlobal("fetch", fetchMock); + + const result = await testEnvironment({ + companyId: "company-123", + adapterType: "openclaw", + config: { + url: "https://agent.example/hooks/wake", + streamTransport: "webhook", + }, + }); + + const unsupported = result.checks.find((entry) => entry.code === "openclaw_stream_transport_unsupported"); + const configured = result.checks.find((entry) => entry.code === "openclaw_stream_transport_configured"); + const wakeIncompatible = result.checks.find((entry) => entry.code === "openclaw_wake_endpoint_incompatible"); + expect(unsupported).toBeUndefined(); + expect(configured?.level).toBe("info"); + expect(wakeIncompatible).toBeUndefined(); + }); }); describe("onHireApproved", () => { diff --git a/server/src/routes/access.ts b/server/src/routes/access.ts index 58cde03f..9f115299 100644 --- a/server/src/routes/access.ts +++ b/server/src/routes/access.ts @@ -135,6 +135,14 @@ function isWakePath(pathname: string): boolean { return value === "/hooks/wake" || value.endsWith("/hooks/wake"); } +function normalizeOpenClawTransport(value: unknown): "sse" | "webhook" | null { + if (typeof value !== "string") return "sse"; + const normalized = value.trim().toLowerCase(); + if (!normalized || normalized === "sse") return "sse"; + if (normalized === "webhook") return "webhook"; + return null; +} + function normalizeHostname(value: string | null | undefined): string | null { if (!value) return null; const trimmed = value.trim(); @@ -592,13 +600,25 @@ function normalizeAgentDefaultsForJoin(input: { level: "warn", message: "No OpenClaw callback config was provided in agentDefaultsPayload.", - hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw SSE endpoint immediately after approval." + hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw endpoint immediately after approval." }); return { normalized: null as Record | null, diagnostics }; } const defaults = input.defaultsPayload as Record; + const streamTransportInput = defaults.streamTransport ?? defaults.transport; + const streamTransport = normalizeOpenClawTransport(streamTransportInput); const normalized: Record = { streamTransport: "sse" }; + if (!streamTransport) { + diagnostics.push({ + code: "openclaw_stream_transport_unsupported", + level: "warn", + message: `Unsupported streamTransport: ${String(streamTransportInput)}`, + hint: "Use streamTransport=sse or streamTransport=webhook." + }); + } else { + normalized.streamTransport = streamTransport; + } let callbackUrl: URL | null = null; const rawUrl = typeof defaults.url === "string" ? defaults.url.trim() : ""; @@ -607,7 +627,7 @@ function normalizeAgentDefaultsForJoin(input: { code: "openclaw_callback_url_missing", level: "warn", message: "OpenClaw callback URL is missing.", - hint: "Set agentDefaultsPayload.url to your OpenClaw SSE endpoint." + hint: "Set agentDefaultsPayload.url to your OpenClaw endpoint." }); } else { try { @@ -630,12 +650,12 @@ function normalizeAgentDefaultsForJoin(input: { message: `Callback endpoint set to ${callbackUrl.toString()}` }); } - if (isWakePath(callbackUrl.pathname)) { + if ((streamTransport ?? "sse") === "sse" && isWakePath(callbackUrl.pathname)) { diagnostics.push({ code: "openclaw_callback_wake_path_incompatible", level: "warn", message: - "Configured callback path targets /hooks/wake, which is not stream-capable for strict SSE mode.", + "Configured callback path targets /hooks/wake, which is not stream-capable for SSE transport.", hint: "Use an endpoint that returns text/event-stream for the full run duration." }); } @@ -696,7 +716,7 @@ function normalizeAgentDefaultsForJoin(input: { code: "openclaw_auth_header_missing", level: "warn", message: "Gateway auth token is missing from agent defaults.", - hint: "Set agentDefaultsPayload.headers.x-openclaw-auth to the token your OpenClaw /v1/responses endpoint requires." + hint: "Set agentDefaultsPayload.headers.x-openclaw-auth to the token your OpenClaw endpoint requires." }); } @@ -943,10 +963,10 @@ function buildInviteOnboardingManifest( requiredFields: { requestType: "agent", agentName: "Display name for this agent", - adapterType: "Use 'openclaw' for OpenClaw streaming agents", + adapterType: "Use 'openclaw' for OpenClaw agents", capabilities: "Optional capability summary", agentDefaultsPayload: - "Adapter config for OpenClaw SSE endpoint. MUST include headers.x-openclaw-auth; also include url/method/paperclipApiUrl (and optional webhookAuthHeader/timeoutSec/payloadTemplate)." + "Adapter config for OpenClaw endpoint. MUST include headers.x-openclaw-auth; include streamTransport ('sse' or 'webhook') plus url/method/paperclipApiUrl (and optional webhookAuthHeader/timeoutSec/payloadTemplate)." }, registrationEndpoint: { method: "POST", @@ -1103,6 +1123,7 @@ export function buildInviteOnboardingTextDocument( IMPORTANT: You MUST include agentDefaultsPayload.headers.x-openclaw-auth with your gateway token. Without this token, Paperclip callback requests to your OpenClaw endpoint will fail with 401 Unauthorized. + Set "streamTransport" to "sse" for streaming /v1/responses endpoints, or "webhook" for wake-style callbacks. Body (JSON): {