From 81bc8c7313cd860f05506a738546f78224e63ac2 Mon Sep 17 00:00:00 2001 From: Dotta Date: Thu, 5 Mar 2026 17:26:00 -0600 Subject: [PATCH] Improve OpenClaw SSE transcript parsing and stream readability --- packages/adapter-utils/src/types.ts | 2 +- .../adapters/openclaw/src/ui/parse-stdout.ts | 114 ++++++++++++++++++ server/src/__tests__/openclaw-adapter.test.ts | 73 +++++++++++ ui/src/adapters/transcript.ts | 4 +- ui/src/components/LiveRunWidget.tsx | 17 ++- 5 files changed, 206 insertions(+), 4 deletions(-) diff --git a/packages/adapter-utils/src/types.ts b/packages/adapter-utils/src/types.ts index 95143b51..5170d9cd 100644 --- a/packages/adapter-utils/src/types.ts +++ b/packages/adapter-utils/src/types.ts @@ -135,7 +135,7 @@ export interface ServerAdapterModule { // --------------------------------------------------------------------------- export type TranscriptEntry = - | { kind: "assistant"; ts: string; text: string } + | { kind: "assistant"; ts: string; text: string; delta?: boolean } | { kind: "thinking"; ts: string; text: string; delta?: boolean } | { kind: "user"; ts: string; text: string } | { kind: "tool_call"; ts: string; name: string; input: unknown } diff --git a/packages/adapters/openclaw/src/ui/parse-stdout.ts b/packages/adapters/openclaw/src/ui/parse-stdout.ts index 4be215e8..c462027b 100644 --- a/packages/adapters/openclaw/src/ui/parse-stdout.ts +++ b/packages/adapters/openclaw/src/ui/parse-stdout.ts @@ -1,5 +1,119 @@ import type { TranscriptEntry } from "@paperclipai/adapter-utils"; +function safeJsonParse(text: string): unknown { + try { + return JSON.parse(text); + } catch { + return null; + } +} + +function asRecord(value: unknown): Record | null { + if (typeof value !== "object" || value === null || Array.isArray(value)) return null; + return value as Record; +} + +function asString(value: unknown, fallback = ""): string { + return typeof value === "string" ? value : fallback; +} + +function asNumber(value: unknown, fallback = 0): number { + return typeof value === "number" && Number.isFinite(value) ? value : fallback; +} + +function extractResponseOutputText(response: Record | null): string { + if (!response) return ""; + + const output = Array.isArray(response.output) ? response.output : []; + const parts: string[] = []; + for (const itemRaw of output) { + const item = asRecord(itemRaw); + if (!item) continue; + const content = Array.isArray(item.content) ? item.content : []; + for (const partRaw of content) { + const part = asRecord(partRaw); + if (!part) continue; + const type = asString(part.type).trim().toLowerCase(); + if (type !== "output_text" && type !== "text" && type !== "refusal") continue; + const text = asString(part.text).trim(); + if (text) parts.push(text); + } + } + return parts.join("\n\n").trim(); +} + +function parseOpenClawSseLine(line: string, ts: string): TranscriptEntry[] { + const match = line.match(/^\[openclaw:sse\]\s+event=([^\s]+)\s+data=(.*)$/s); + if (!match) return [{ kind: "stdout", ts, text: line }]; + + const eventType = (match[1] ?? "").trim(); + const dataText = (match[2] ?? "").trim(); + const parsed = asRecord(safeJsonParse(dataText)); + const normalizedEventType = eventType.toLowerCase(); + + if (dataText === "[DONE]") { + return []; + } + + const delta = asString(parsed?.delta); + if (normalizedEventType.endsWith(".delta") && delta) { + return [{ kind: "assistant", ts, text: delta, delta: true }]; + } + + if ( + normalizedEventType.includes("error") || + normalizedEventType.includes("failed") || + normalizedEventType.includes("cancel") + ) { + const message = + asString(parsed?.error).trim() || + asString(parsed?.message).trim() || + dataText; + return message ? [{ kind: "stderr", ts, text: message }] : []; + } + + if (normalizedEventType === "response.completed" || normalizedEventType.endsWith(".completed")) { + const response = asRecord(parsed?.response); + const usage = asRecord(response?.usage); + const status = asString(response?.status, asString(parsed?.status, eventType)); + const statusLower = status.trim().toLowerCase(); + const errorText = + asString(response?.error).trim() || + asString(parsed?.error).trim() || + asString(parsed?.message).trim(); + const isError = + statusLower === "failed" || + statusLower === "error" || + statusLower === "cancelled"; + + return [{ + kind: "result", + ts, + text: extractResponseOutputText(response), + inputTokens: asNumber(usage?.input_tokens), + outputTokens: asNumber(usage?.output_tokens), + cachedTokens: asNumber(usage?.cached_input_tokens), + costUsd: asNumber(usage?.cost_usd, asNumber(usage?.total_cost_usd)), + subtype: status || eventType, + isError, + errors: errorText ? [errorText] : [], + }]; + } + + return []; +} + export function parseOpenClawStdoutLine(line: string, ts: string): TranscriptEntry[] { + const trimmed = line.trim(); + if (!trimmed) return []; + + if (trimmed.startsWith("[openclaw:sse]")) { + return parseOpenClawSseLine(trimmed, ts); + } + + if (trimmed.startsWith("[openclaw]")) { + return [{ kind: "system", ts, text: trimmed.replace(/^\[openclaw\]\s*/, "") }]; + } + return [{ kind: "stdout", ts, text: line }]; } diff --git a/server/src/__tests__/openclaw-adapter.test.ts b/server/src/__tests__/openclaw-adapter.test.ts index b11b1ffb..f0d9246c 100644 --- a/server/src/__tests__/openclaw-adapter.test.ts +++ b/server/src/__tests__/openclaw-adapter.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { execute, testEnvironment } from "@paperclipai/adapter-openclaw/server"; +import { parseOpenClawStdoutLine } from "@paperclipai/adapter-openclaw/ui"; import type { AdapterExecutionContext } from "@paperclipai/adapter-utils"; function buildContext( @@ -57,6 +58,78 @@ afterEach(() => { vi.unstubAllGlobals(); }); +describe("openclaw ui stdout parser", () => { + it("parses SSE deltas into assistant streaming entries", () => { + const ts = "2026-03-05T23:07:16.296Z"; + const line = + '[openclaw:sse] event=response.output_text.delta data={"type":"response.output_text.delta","delta":"hello"}'; + + expect(parseOpenClawStdoutLine(line, ts)).toEqual([ + { + kind: "assistant", + ts, + text: "hello", + delta: true, + }, + ]); + }); + + it("parses response.completed into usage-aware result entries", () => { + const ts = "2026-03-05T23:07:20.269Z"; + const line = JSON.stringify({ + type: "response.completed", + response: { + status: "completed", + usage: { + input_tokens: 12, + output_tokens: 34, + cached_input_tokens: 5, + }, + output: [ + { + type: "message", + content: [ + { + type: "output_text", + text: "All done", + }, + ], + }, + ], + }, + }); + + expect(parseOpenClawStdoutLine(`[openclaw:sse] event=response.completed data=${line}`, ts)).toEqual([ + { + kind: "result", + ts, + text: "All done", + inputTokens: 12, + outputTokens: 34, + cachedTokens: 5, + costUsd: 0, + subtype: "completed", + isError: false, + errors: [], + }, + ]); + }); + + it("maps SSE errors to stderr entries", () => { + const ts = "2026-03-05T23:07:20.269Z"; + const line = + '[openclaw:sse] event=response.failed data={"type":"response.failed","error":"timeout"}'; + + expect(parseOpenClawStdoutLine(line, ts)).toEqual([ + { + kind: "stderr", + ts, + text: "timeout", + }, + ]); + }); +}); + describe("openclaw adapter execute", () => { it("uses strict SSE and includes canonical PAPERCLIP context in text payload", async () => { const fetchMock = vi.fn().mockResolvedValue( diff --git a/ui/src/adapters/transcript.ts b/ui/src/adapters/transcript.ts index 58cff1b5..143f472a 100644 --- a/ui/src/adapters/transcript.ts +++ b/ui/src/adapters/transcript.ts @@ -3,9 +3,9 @@ import type { TranscriptEntry, StdoutLineParser } from "./types"; type RunLogChunk = { ts: string; stream: "stdout" | "stderr" | "system"; chunk: string }; function appendTranscriptEntry(entries: TranscriptEntry[], entry: TranscriptEntry) { - if (entry.kind === "thinking" && entry.delta) { + if ((entry.kind === "thinking" || entry.kind === "assistant") && entry.delta) { const last = entries[entries.length - 1]; - if (last && last.kind === "thinking" && last.delta) { + if (last && last.kind === entry.kind && last.delta) { last.text += entry.text; last.ts = entry.ts; return; diff --git a/ui/src/components/LiveRunWidget.tsx b/ui/src/components/LiveRunWidget.tsx index 253152d0..cb4a7dcf 100644 --- a/ui/src/components/LiveRunWidget.tsx +++ b/ui/src/components/LiveRunWidget.tsx @@ -108,8 +108,20 @@ function parseStdoutChunk( pendingByRun.set(pendingKey, split.pop() ?? ""); const adapter = getUIAdapter(run.adapterType); - const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean }> = []; + const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean; assistantDelta?: boolean }> = []; const appendSummary = (entry: TranscriptEntry) => { + if (entry.kind === "assistant" && entry.delta) { + const text = entry.text; + if (!text.trim()) return; + const last = summarized[summarized.length - 1]; + if (last && last.assistantDelta) { + last.text += text; + } else { + summarized.push({ text, tone: "assistant", assistantDelta: true }); + } + return; + } + if (entry.kind === "thinking" && entry.delta) { const text = entry.text; if (!text.trim()) return; @@ -133,6 +145,9 @@ function parseStdoutChunk( if (!trimmed) continue; const parsed = adapter.parseStdoutLine(trimmed, ts); if (parsed.length === 0) { + if (run.adapterType === "openclaw") { + continue; + } const fallback = createFeedItem(run, ts, trimmed, "info", nextIdRef.current++); if (fallback) items.push(fallback); continue;