import { createReadStream, promises as fs } from "node:fs"; import path from "node:path"; import { createHash } from "node:crypto"; import { notFound } from "../errors.js"; import { resolvePaperclipInstanceRoot } from "../home-paths.js"; export type RunLogStoreType = "local_file"; export interface RunLogHandle { store: RunLogStoreType; logRef: string; } export interface RunLogReadOptions { offset?: number; limitBytes?: number; } export interface RunLogReadResult { content: string; nextOffset?: number; } export interface RunLogFinalizeSummary { bytes: number; sha256?: string; compressed: boolean; } export interface RunLogStore { begin(input: { companyId: string; agentId: string; runId: string }): Promise; append( handle: RunLogHandle, event: { stream: "stdout" | "stderr" | "system"; chunk: string; ts: string }, ): Promise; finalize(handle: RunLogHandle): Promise; read(handle: RunLogHandle, opts?: RunLogReadOptions): Promise; } function safeSegments(...segments: string[]) { return segments.map((segment) => segment.replace(/[^a-zA-Z0-9._-]/g, "_")); } function resolveWithin(basePath: string, relativePath: string) { const resolved = path.resolve(basePath, relativePath); const base = path.resolve(basePath) + path.sep; if (!resolved.startsWith(base) && resolved !== path.resolve(basePath)) { throw new Error("Invalid log path"); } return resolved; } function createLocalFileRunLogStore(basePath: string): RunLogStore { async function ensureDir(relativeDir: string) { const dir = resolveWithin(basePath, relativeDir); await fs.mkdir(dir, { recursive: true }); } async function readFileRange(filePath: string, offset: number, limitBytes: number): Promise { const stat = await fs.stat(filePath).catch(() => null); if (!stat) throw notFound("Run log not found"); const start = Math.max(0, Math.min(offset, stat.size)); const end = Math.max(start, Math.min(start + limitBytes - 1, stat.size - 1)); if (start > end) { return { content: "", nextOffset: start }; } const chunks: Buffer[] = []; await new Promise((resolve, reject) => { const stream = createReadStream(filePath, { start, end }); stream.on("data", (chunk) => { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); }); stream.on("error", reject); stream.on("end", () => resolve()); }); const content = Buffer.concat(chunks).toString("utf8"); const nextOffset = end + 1 < stat.size ? end + 1 : undefined; return { content, nextOffset }; } async function sha256File(filePath: string): Promise { return new Promise((resolve, reject) => { const hash = createHash("sha256"); const stream = createReadStream(filePath); stream.on("data", (chunk) => hash.update(chunk)); stream.on("error", reject); stream.on("end", () => resolve(hash.digest("hex"))); }); } return { async begin(input) { const [companyId, agentId] = safeSegments(input.companyId, input.agentId); const runId = safeSegments(input.runId)[0]!; const relDir = path.join(companyId, agentId); const relPath = path.join(relDir, `${runId}.ndjson`); await ensureDir(relDir); const absPath = resolveWithin(basePath, relPath); await fs.writeFile(absPath, "", "utf8"); return { store: "local_file", logRef: relPath }; }, async append(handle, event) { if (handle.store !== "local_file") return; const absPath = resolveWithin(basePath, handle.logRef); const line = JSON.stringify({ ts: event.ts, stream: event.stream, chunk: event.chunk, }); await fs.appendFile(absPath, `${line}\n`, "utf8"); }, async finalize(handle) { if (handle.store !== "local_file") { return { bytes: 0, compressed: false }; } const absPath = resolveWithin(basePath, handle.logRef); const stat = await fs.stat(absPath).catch(() => null); if (!stat) throw notFound("Run log not found"); const hash = await sha256File(absPath); return { bytes: stat.size, sha256: hash, compressed: false, }; }, async read(handle, opts) { if (handle.store !== "local_file") { throw notFound("Run log not found"); } const absPath = resolveWithin(basePath, handle.logRef); const offset = opts?.offset ?? 0; const limitBytes = opts?.limitBytes ?? 256_000; return readFileRange(absPath, offset, limitBytes); }, }; } let cachedStore: RunLogStore | null = null; export function getRunLogStore() { if (cachedStore) return cachedStore; const basePath = process.env.RUN_LOG_BASE_PATH ?? path.resolve(resolvePaperclipInstanceRoot(), "data", "run-logs"); cachedStore = createLocalFileRunLogStore(basePath); return cachedStore; }