From a90063415e176b3cfa91c94a3bc26115224caca7 Mon Sep 17 00:00:00 2001 From: Forgotten Date: Thu, 19 Feb 2026 09:09:40 -0600 Subject: [PATCH] Server: migration prompts, structured logging, heartbeat reaping, and issue-run tracking Replace auto-migrate-if-empty with interactive migration flow that inspects pending migrations and prompts before applying. Add pino-pretty for structured console + file logging. Add reapOrphanedRuns to clean up stuck heartbeat runs on startup and periodically. Track runId through auth middleware, activity logs, and all mutation routes. Add issue-run cross-reference queries, live-run and active-run endpoints for issues, issue identifier lookup, reopen-via-comment flow, and done/cancelled -> todo status transitions. Co-Authored-By: Claude Opus 4.6 --- pnpm-lock.yaml | 87 +++++++++++++++++++++ server/package.json | 3 +- server/src/index.ts | 113 +++++++++++++++++++++------- server/src/middleware/auth.ts | 5 ++ server/src/middleware/logger.ts | 40 +++++++++- server/src/routes/activity.ts | 32 ++++++++ server/src/routes/agents.ts | 80 +++++++++++++++++++- server/src/routes/authz.ts | 2 + server/src/routes/issues.ts | 88 ++++++++++++++++++++-- server/src/services/activity-log.ts | 3 + server/src/services/activity.ts | 57 +++++++++++++- server/src/services/heartbeat.ts | 102 +++++++++++++++++++++++-- server/src/services/issues.ts | 59 ++++++++++----- server/src/types/express.d.ts | 1 + 14 files changed, 605 insertions(+), 67 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 05f947cc..0793b9d9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -157,6 +157,9 @@ importers: pino-http: specifier: ^10.4.0 version: 10.5.0 + pino-pretty: + specifier: ^13.1.3 + version: 13.1.3 ws: specifier: ^8.19.0 version: 8.19.0 @@ -2027,6 +2030,9 @@ packages: react: ^18 || ^19 || ^19.0.0-rc react-dom: ^18 || ^19 || ^19.0.0-rc + colorette@2.0.20: + resolution: {integrity: sha512-IfEDxwoWIjkeXL1eXcDiow4UbKjhLdq6/EuSVR9GMN7KVH3r9gQ83e73hsz1Nd1T3ijd5xv1wcWRYO+D6kCI2w==} + combined-stream@1.0.8: resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} engines: {node: '>= 0.8'} @@ -2067,6 +2073,9 @@ packages: csstype@3.2.3: resolution: {integrity: sha512-z1HGKcYy2xA8AGQfwrn0PAy+PB7X/GSj3UVJW9qKyn43xWa+gl5nXmU4qqLMRzWVLFC8KusUX8T/0kCiOYpAIQ==} + dateformat@4.6.3: + resolution: {integrity: sha512-2P0p0pFGzHS5EMnhdxQi7aJN+iMheud0UhG4dlE1DLAlvL8JHjJJTX/CSm4JXwV0Ka5nGk3zC5mcb5bUQUxxMA==} + debug@4.4.3: resolution: {integrity: sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==} engines: {node: '>=6.0'} @@ -2221,6 +2230,9 @@ packages: resolution: {integrity: sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==} engines: {node: '>= 0.8'} + end-of-stream@1.4.5: + resolution: {integrity: sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==} + enhanced-resolve@5.19.0: resolution: {integrity: sha512-phv3E1Xl4tQOShqSte26C7Fl84EwUdZsyOuSSk9qtAGyyQs2s3jJzComh+Abf4g187lUUAvH+H26omrqia2aGg==} engines: {node: '>=10.13.0'} @@ -2286,6 +2298,9 @@ packages: resolution: {integrity: sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==} engines: {node: '>= 18'} + fast-copy@4.0.2: + resolution: {integrity: sha512-ybA6PDXIXOXivLJK/z9e+Otk7ve13I4ckBvGO5I2RRmBU1gMHLVDJYEuJYhGwez7YNlYji2M2DvVU+a9mSFDlw==} + fast-safe-stringify@2.1.1: resolution: {integrity: sha512-W+KJc2dmILlPplD/H4K9l9LcAHAfPtP6BY84uVLXQ6Evcz9Lcg33Y2z1IVblT6xdY54PXYVHEv+0Wpq8Io6zkA==} @@ -2368,6 +2383,9 @@ packages: resolution: {integrity: sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ==} engines: {node: '>= 0.4'} + help-me@5.0.0: + resolution: {integrity: sha512-7xgomUX6ADmcYzFik0HzAxh/73YlKR9bmFzf51CZwR+b6YtzU2m0u49hQCqV6SvlqIqsaxovfwdvbnsw3b/zpg==} + http-errors@2.0.1: resolution: {integrity: sha512-4FbRdAX+bSdmo4AUFuS0WNiPz8NgFt+r8ThgNWmlrjQjt1Q7ZR9+zTlce2859x4KSXrwIsaeTqDoKQmtP8pLmQ==} engines: {node: '>= 0.8'} @@ -2390,6 +2408,10 @@ packages: resolution: {integrity: sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==} hasBin: true + joycon@3.1.1: + resolution: {integrity: sha512-34wB/Y7MW7bzjKRjUKTa46I2Z7eV62Rkhva+KkopW7Qvv/OSWBqvkSY7vusOPrNuZcUG3tApvdVgNB8POj3SPw==} + engines: {node: '>=10'} + js-tokens@4.0.0: resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==} @@ -2527,6 +2549,9 @@ packages: engines: {node: '>=4.0.0'} hasBin: true + minimist@1.2.8: + resolution: {integrity: sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==} + ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} @@ -2615,9 +2640,16 @@ packages: pino-abstract-transport@2.0.0: resolution: {integrity: sha512-F63x5tizV6WCh4R6RHyi2Ml+M70DNRXt/+HANowMflpgGFMAym/VKm6G7ZOQRjqN7XbGxK1Lg9t6ZrtzOaivMw==} + pino-abstract-transport@3.0.0: + resolution: {integrity: sha512-wlfUczU+n7Hy/Ha5j9a/gZNy7We5+cXp8YL+X+PG8S0KXxw7n/JXA3c46Y0zQznIJ83URJiwy7Lh56WLokNuxg==} + pino-http@10.5.0: resolution: {integrity: sha512-hD91XjgaKkSsdn8P7LaebrNzhGTdB086W3pyPihX0EzGPjq5uBJBXo4N5guqNaK6mUjg9aubMF7wDViYek9dRA==} + pino-pretty@13.1.3: + resolution: {integrity: sha512-ttXRkkOz6WWC95KeY9+xxWL6AtImwbyMHrL1mSwqwW9u+vLp/WIElvHvCSDg0xO/Dzrggz1zv3rN5ovTRVowKg==} + hasBin: true + pino-std-serializers@7.1.0: resolution: {integrity: sha512-BndPH67/JxGExRgiX1dX0w1FvZck5Wa4aal9198SrRhZjH3GxKQUKIBnYJTdj2HDN3UQAS06HlfcSbQj2OHmaw==} @@ -2656,6 +2688,9 @@ packages: resolution: {integrity: sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==} engines: {node: '>= 0.10'} + pump@3.0.3: + resolution: {integrity: sha512-todwxLMY7/heScKmntwQG8CXVkWUOdYxIvY2s0VWAAMh/nd8SoYiRaKjlr7+iCs984f2P8zvrfWcDDYVb73NfA==} + qs@6.15.0: resolution: {integrity: sha512-mAZTtNCeetKMH+pSjrb76NAM8V9a05I9aBZOHztWy/UqcJdQYNsf59vrRKWnojAT9Y+GbIvoTBC++CPHqpDBhQ==} engines: {node: '>=0.6'} @@ -2770,6 +2805,9 @@ packages: scheduler@0.27.0: resolution: {integrity: sha512-eNv+WrVbKu1f3vbYJT/xtiF5syA5HPIMtf9IgY/nKg0sWqzAUEvqY/xm7OcZc/qafLx/iO9FgOmeSAp4v5ti/Q==} + secure-json-parse@4.1.0: + resolution: {integrity: sha512-l4KnYfEyqYJxDwlNVyRfO2E4NTHfMKAWdUuA8J0yve2Dz/E/PdBepY03RvyJpssIpRFwJoCD55wA+mEDs6ByWA==} + semver@6.3.1: resolution: {integrity: sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==} hasBin: true @@ -2838,6 +2876,10 @@ packages: std-env@3.10.0: resolution: {integrity: sha512-5GS12FdOZNliM5mAOxFRg7Ir0pWz8MdpYm6AY6VPkGpbA7ZzmbzNcBJQ0GPvvyWgcY7QAhCgf9Uy89I03faLkg==} + strip-json-comments@5.0.3: + resolution: {integrity: sha512-1tB5mhVo7U+ETBKNf92xT4hrQa3pm0MZ0PQvuDnWgAAGHDsfp4lPSpiS6psrSiet87wyGPh9ft6wmhOMQ0hDiw==} + engines: {node: '>=14.16'} + strip-literal@3.1.0: resolution: {integrity: sha512-8r3mkIM/2+PpjHoOtiAW8Rg3jJLHaV7xPwG+YRGrv6FP0wwk/toTpATxWYOW0BKdWwl82VT2tFYi5DlROa0Mxg==} @@ -4674,6 +4716,8 @@ snapshots: - '@types/react' - '@types/react-dom' + colorette@2.0.20: {} + combined-stream@1.0.8: dependencies: delayed-stream: 1.0.0 @@ -4698,6 +4742,8 @@ snapshots: csstype@3.2.3: {} + dateformat@4.6.3: {} + debug@4.4.3: dependencies: ms: 2.1.3 @@ -4769,6 +4815,10 @@ snapshots: encodeurl@2.0.0: {} + end-of-stream@1.4.5: + dependencies: + once: 1.4.0 + enhanced-resolve@5.19.0: dependencies: graceful-fs: 4.2.11 @@ -4926,6 +4976,8 @@ snapshots: transitivePeerDependencies: - supports-color + fast-copy@4.0.2: {} + fast-safe-stringify@2.1.1: {} fdir@6.5.0(picomatch@4.0.3): @@ -5008,6 +5060,8 @@ snapshots: dependencies: function-bind: 1.1.2 + help-me@5.0.0: {} + http-errors@2.0.1: dependencies: depd: 2.0.0 @@ -5028,6 +5082,8 @@ snapshots: jiti@2.6.1: {} + joycon@3.1.1: {} + js-tokens@4.0.0: {} js-tokens@9.0.1: {} @@ -5121,6 +5177,8 @@ snapshots: mime@2.6.0: {} + minimist@1.2.8: {} + ms@2.1.3: {} nanoid@3.3.11: {} @@ -5199,6 +5257,10 @@ snapshots: dependencies: split2: 4.2.0 + pino-abstract-transport@3.0.0: + dependencies: + split2: 4.2.0 + pino-http@10.5.0: dependencies: get-caller-file: 2.0.5 @@ -5206,6 +5268,22 @@ snapshots: pino-std-serializers: 7.1.0 process-warning: 5.0.0 + pino-pretty@13.1.3: + dependencies: + colorette: 2.0.20 + dateformat: 4.6.3 + fast-copy: 4.0.2 + fast-safe-stringify: 2.1.1 + help-me: 5.0.0 + joycon: 3.1.1 + minimist: 1.2.8 + on-exit-leak-free: 2.1.2 + pino-abstract-transport: 3.0.0 + pump: 3.0.3 + secure-json-parse: 4.1.0 + sonic-boom: 4.2.1 + strip-json-comments: 5.0.3 + pino-std-serializers@7.1.0: {} pino@9.14.0: @@ -5251,6 +5329,11 @@ snapshots: forwarded: 0.2.0 ipaddr.js: 1.9.1 + pump@3.0.3: + dependencies: + end-of-stream: 1.4.5 + once: 1.4.0 + qs@6.15.0: dependencies: side-channel: 1.1.0 @@ -5430,6 +5513,8 @@ snapshots: scheduler@0.27.0: {} + secure-json-parse@4.1.0: {} + semver@6.3.1: {} send@1.2.1: @@ -5514,6 +5599,8 @@ snapshots: std-env@3.10.0: {} + strip-json-comments@5.0.3: {} + strip-literal@3.1.0: dependencies: js-tokens: 9.0.1 diff --git a/server/package.json b/server/package.json index 095a0b0b..e352e6fd 100644 --- a/server/package.json +++ b/server/package.json @@ -15,12 +15,13 @@ "@paperclip/adapter-utils": "workspace:*", "@paperclip/db": "workspace:*", "@paperclip/shared": "workspace:*", - "dotenv": "^17.0.1", "detect-port": "^2.1.0", + "dotenv": "^17.0.1", "drizzle-orm": "^0.38.4", "express": "^5.1.0", "pino": "^9.6.0", "pino-http": "^10.4.0", + "pino-pretty": "^13.1.3", "ws": "^8.19.0", "zod": "^3.24.2" }, diff --git a/server/src/index.ts b/server/src/index.ts index 71f02c54..fa9c7d10 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -1,10 +1,13 @@ import { existsSync, readFileSync, rmSync } from "node:fs"; import { createServer } from "node:http"; import { resolve } from "node:path"; +import { createInterface } from "node:readline/promises"; +import { stdin, stdout } from "node:process"; import { createDb, ensurePostgresDatabase, - migratePostgresIfEmpty, + inspectMigrations, + applyPendingMigrations, } from "@paperclip/db"; import detectPort from "detect-port"; import { createApp } from "./app.js"; @@ -30,27 +33,80 @@ type EmbeddedPostgresCtor = new (opts: { const config = loadConfig(); +type MigrationSummary = + | "skipped" + | "already applied" + | "applied (empty database)" + | "applied (pending migrations)" + | "pending migrations skipped"; + +function formatPendingMigrationSummary(migrations: string[]): string { + if (migrations.length === 0) return "none"; + return migrations.length > 3 + ? `${migrations.slice(0, 3).join(", ")} (+${migrations.length - 3} more)` + : migrations.join(", "); +} + +async function promptApplyMigrations(migrations: string[]): Promise { + if (!stdin.isTTY || !stdout.isTTY) return true; + if (process.env.PAPERCLIP_MIGRATION_AUTO_APPLY === "true") return true; + + const prompt = createInterface({ input: stdin, output: stdout }); + try { + const answer = (await prompt.question( + `Apply pending migrations (${formatPendingMigrationSummary(migrations)}) now? (y/N): `, + )).trim().toLowerCase(); + return answer === "y" || answer === "yes"; + } finally { + prompt.close(); + } +} + +async function ensureMigrations(connectionString: string, label: string): Promise { + const state = await inspectMigrations(connectionString); + if (state.status === "upToDate") return "already applied"; + if (state.status === "needsMigrations" && state.reason === "no-migration-journal-non-empty-db") { + logger.warn( + { tableCount: state.tableCount }, + `${label} has existing tables but no migration journal. Run migrations manually to sync schema.`, + ); + const apply = await promptApplyMigrations(state.pendingMigrations); + if (!apply) { + logger.warn( + { pendingMigrations: state.pendingMigrations }, + `${label} has pending migrations; continuing without applying. Run pnpm db:migrate to apply before startup.`, + ); + return "pending migrations skipped"; + } + + logger.info({ pendingMigrations: state.pendingMigrations }, `Applying ${state.pendingMigrations.length} pending migrations for ${label}`); + await applyPendingMigrations(connectionString); + return "applied (pending migrations)"; + } + + const apply = await promptApplyMigrations(state.pendingMigrations); + if (!apply) { + logger.warn( + { pendingMigrations: state.pendingMigrations }, + `${label} has pending migrations; continuing without applying. Run pnpm db:migrate to apply before startup.`, + ); + return "pending migrations skipped"; + } + + logger.info({ pendingMigrations: state.pendingMigrations }, `Applying ${state.pendingMigrations.length} pending migrations for ${label}`); + await applyPendingMigrations(connectionString); + return "applied (pending migrations)"; +} + let db; let embeddedPostgres: EmbeddedPostgresInstance | null = null; let embeddedPostgresStartedByThisProcess = false; -let migrationSummary = "skipped"; +let migrationSummary: MigrationSummary = "skipped"; let startupDbInfo: | { mode: "external-postgres"; connectionString: string } | { mode: "embedded-postgres"; dataDir: string; port: number }; if (config.databaseUrl) { - const migration = await migratePostgresIfEmpty(config.databaseUrl); - if (migration.migrated) { - logger.info("Empty PostgreSQL database detected; applied migrations"); - migrationSummary = "applied (empty database)"; - } else if (migration.reason === "not-empty-no-migration-journal") { - logger.warn( - { tableCount: migration.tableCount }, - "PostgreSQL has existing tables but no migration journal; skipped auto-migrate", - ); - migrationSummary = "skipped (existing schema, no migration journal)"; - } else { - migrationSummary = "already applied"; - } + migrationSummary = await ensureMigrations(config.databaseUrl, "PostgreSQL"); db = createDb(config.databaseUrl); logger.info("Using external PostgreSQL via DATABASE_URL/config"); @@ -131,19 +187,7 @@ if (config.databaseUrl) { } const embeddedConnectionString = `postgres://paperclip:paperclip@127.0.0.1:${port}/paperclip`; - const migration = await migratePostgresIfEmpty(embeddedConnectionString); - if (migration.migrated) { - logger.info("Empty embedded PostgreSQL database detected; applied migrations"); - migrationSummary = "applied (empty database)"; - } else if (migration.reason === "not-empty-no-migration-journal") { - logger.warn( - { tableCount: migration.tableCount }, - "Embedded PostgreSQL has existing tables but no migration journal; skipped auto-migrate", - ); - migrationSummary = "skipped (existing schema, no migration journal)"; - } else { - migrationSummary = "already applied"; - } + migrationSummary = await ensureMigrations(embeddedConnectionString, "Embedded PostgreSQL"); db = createDb(embeddedConnectionString); logger.info("Embedded PostgreSQL ready"); @@ -163,6 +207,12 @@ setupLiveEventsWebSocketServer(server, db as any); if (config.heartbeatSchedulerEnabled) { const heartbeat = heartbeatService(db as any); + + // Reap orphaned runs at startup (no threshold -- runningProcesses is empty) + void heartbeat.reapOrphanedRuns().catch((err) => { + logger.error({ err }, "startup reap of orphaned heartbeat runs failed"); + }); + setInterval(() => { void heartbeat .tickTimers(new Date()) @@ -174,6 +224,13 @@ if (config.heartbeatSchedulerEnabled) { .catch((err) => { logger.error({ err }, "heartbeat timer tick failed"); }); + + // Periodically reap orphaned runs (5-min staleness threshold) + void heartbeat + .reapOrphanedRuns({ staleThresholdMs: 5 * 60 * 1000 }) + .catch((err) => { + logger.error({ err }, "periodic reap of orphaned heartbeat runs failed"); + }); }, config.heartbeatSchedulerIntervalMs); } diff --git a/server/src/middleware/auth.ts b/server/src/middleware/auth.ts index 74b07dd3..a3d3d16e 100644 --- a/server/src/middleware/auth.ts +++ b/server/src/middleware/auth.ts @@ -13,8 +13,11 @@ export function actorMiddleware(db: Db): RequestHandler { return async (req, _res, next) => { req.actor = { type: "board", userId: "board" }; + const runIdHeader = req.header("x-paperclip-run-id"); + const authHeader = req.header("authorization"); if (!authHeader?.toLowerCase().startsWith("bearer ")) { + if (runIdHeader) req.actor.runId = runIdHeader; next(); return; } @@ -60,6 +63,7 @@ export function actorMiddleware(db: Db): RequestHandler { agentId: claims.sub, companyId: claims.company_id, keyId: undefined, + runId: runIdHeader || undefined, }; next(); return; @@ -75,6 +79,7 @@ export function actorMiddleware(db: Db): RequestHandler { agentId: key.agentId, companyId: key.companyId, keyId: key.id, + runId: runIdHeader || undefined, }; next(); diff --git a/server/src/middleware/logger.ts b/server/src/middleware/logger.ts index 5f2fbea8..5ccb989b 100644 --- a/server/src/middleware/logger.ts +++ b/server/src/middleware/logger.ts @@ -1,5 +1,41 @@ +import path from "node:path"; +import fs from "node:fs"; import pino from "pino"; import { pinoHttp } from "pino-http"; -export const logger = pino(); -export const httpLogger = pinoHttp({ logger }); +const logDir = path.resolve(process.cwd(), ".paperclip", "logs"); +fs.mkdirSync(logDir, { recursive: true }); + +const logFile = path.join(logDir, "server.log"); + +const sharedOpts = { + translateTime: "HH:MM:ss", + ignore: "pid,hostname", +}; + +export const logger = pino({ + level: "debug", +}, pino.transport({ + targets: [ + { + target: "pino-pretty", + options: { ...sharedOpts, ignore: "pid,hostname,req,res", hideObject: true, colorize: true, destination: 1 }, + level: "info", + }, + { + target: "pino-pretty", + options: { ...sharedOpts, colorize: false, destination: logFile, mkdir: true }, + level: "debug", + }, + ], +})); + +export const httpLogger = pinoHttp({ + logger, + customSuccessMessage(req, res) { + return `${req.method} ${req.url} ${res.statusCode}`; + }, + customErrorMessage(req, res) { + return `${req.method} ${req.url} ${res.statusCode}`; + }, +}); diff --git a/server/src/routes/activity.ts b/server/src/routes/activity.ts index 84a467f0..2c421367 100644 --- a/server/src/routes/activity.ts +++ b/server/src/routes/activity.ts @@ -4,6 +4,7 @@ import type { Db } from "@paperclip/db"; import { validate } from "../middleware/validate.js"; import { activityService } from "../services/activity.js"; import { assertBoard, assertCompanyAccess } from "./authz.js"; +import { issueService } from "../services/index.js"; const createActivitySchema = z.object({ actorType: z.enum(["agent", "user", "system"]).optional().default("system"), @@ -18,6 +19,7 @@ const createActivitySchema = z.object({ export function activityRoutes(db: Db) { const router = Router(); const svc = activityService(db); + const issueSvc = issueService(db); router.get("/companies/:companyId/activity", async (req, res) => { const companyId = req.params.companyId as string; @@ -43,5 +45,35 @@ export function activityRoutes(db: Db) { res.status(201).json(event); }); + router.get("/issues/:id/activity", async (req, res) => { + const id = req.params.id as string; + const issue = await issueSvc.getById(id); + if (!issue) { + res.status(404).json({ error: "Issue not found" }); + return; + } + assertCompanyAccess(req, issue.companyId); + const result = await svc.forIssue(id); + res.json(result); + }); + + router.get("/issues/:id/runs", async (req, res) => { + const id = req.params.id as string; + const issue = await issueSvc.getById(id); + if (!issue) { + res.status(404).json({ error: "Issue not found" }); + return; + } + assertCompanyAccess(req, issue.companyId); + const result = await svc.runsForIssue(id); + res.json(result); + }); + + router.get("/heartbeat-runs/:runId/issues", async (req, res) => { + const runId = req.params.runId as string; + const result = await svc.issuesForRun(runId); + res.json(result); + }); + return router; } diff --git a/server/src/routes/agents.ts b/server/src/routes/agents.ts index dffae2dc..508a6ed8 100644 --- a/server/src/routes/agents.ts +++ b/server/src/routes/agents.ts @@ -1,5 +1,7 @@ import { Router } from "express"; import type { Db } from "@paperclip/db"; +import { agents as agentsTable, heartbeatRuns } from "@paperclip/db"; +import { and, desc, eq, inArray, sql } from "drizzle-orm"; import { createAgentKeySchema, createAgentSchema, @@ -7,7 +9,7 @@ import { updateAgentSchema, } from "@paperclip/shared"; import { validate } from "../middleware/validate.js"; -import { agentService, heartbeatService, logActivity } from "../services/index.js"; +import { agentService, heartbeatService, issueService, logActivity } from "../services/index.js"; import { assertBoard, assertCompanyAccess, getActorInfo } from "./authz.js"; import { listAdapterModels } from "../adapters/index.js"; @@ -160,6 +162,7 @@ export function agentRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "agent.created", entityType: "agent", entityId: agent.id, @@ -195,6 +198,7 @@ export function agentRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "agent.updated", entityType: "agent", entityId: agent.id, @@ -349,6 +353,7 @@ export function agentRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "heartbeat.invoked", entityType: "heartbeat_run", entityId: run.id, @@ -397,6 +402,7 @@ export function agentRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "heartbeat.invoked", entityType: "heartbeat_run", entityId: run.id, @@ -472,5 +478,77 @@ export function agentRoutes(db: Db) { res.json(result); }); + router.get("/issues/:id/live-runs", async (req, res) => { + const id = req.params.id as string; + const issueSvc = issueService(db); + const issue = await issueSvc.getById(id); + if (!issue) { + res.status(404).json({ error: "Issue not found" }); + return; + } + assertCompanyAccess(req, issue.companyId); + + const liveRuns = await db + .select({ + id: heartbeatRuns.id, + status: heartbeatRuns.status, + invocationSource: heartbeatRuns.invocationSource, + triggerDetail: heartbeatRuns.triggerDetail, + startedAt: heartbeatRuns.startedAt, + finishedAt: heartbeatRuns.finishedAt, + createdAt: heartbeatRuns.createdAt, + agentId: heartbeatRuns.agentId, + agentName: agentsTable.name, + adapterType: agentsTable.adapterType, + }) + .from(heartbeatRuns) + .innerJoin(agentsTable, eq(heartbeatRuns.agentId, agentsTable.id)) + .where( + and( + eq(heartbeatRuns.companyId, issue.companyId), + inArray(heartbeatRuns.status, ["queued", "running"]), + sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${id}`, + ), + ) + .orderBy(desc(heartbeatRuns.createdAt)); + + res.json(liveRuns); + }); + + router.get("/issues/:id/active-run", async (req, res) => { + const id = req.params.id as string; + const issueSvc = issueService(db); + const issue = await issueSvc.getById(id); + if (!issue) { + res.status(404).json({ error: "Issue not found" }); + return; + } + assertCompanyAccess(req, issue.companyId); + + if (!issue.assigneeAgentId || issue.status !== "in_progress") { + res.json(null); + return; + } + + const agent = await svc.getById(issue.assigneeAgentId); + if (!agent) { + res.json(null); + return; + } + + const run = await heartbeat.getActiveRunForAgent(issue.assigneeAgentId); + if (!run) { + res.json(null); + return; + } + + res.json({ + ...run, + agentId: agent.id, + agentName: agent.name, + adapterType: agent.adapterType, + }); + }); + return router; } diff --git a/server/src/routes/authz.ts b/server/src/routes/authz.ts index 5f5eef91..048ded48 100644 --- a/server/src/routes/authz.ts +++ b/server/src/routes/authz.ts @@ -19,6 +19,7 @@ export function getActorInfo(req: Request) { actorType: "agent" as const, actorId: req.actor.agentId ?? "unknown-agent", agentId: req.actor.agentId ?? null, + runId: req.actor.runId ?? null, }; } @@ -26,5 +27,6 @@ export function getActorInfo(req: Request) { actorType: "user" as const, actorId: req.actor.userId ?? "board", agentId: null, + runId: req.actor.runId ?? null, }; } diff --git a/server/src/routes/issues.ts b/server/src/routes/issues.ts index ba241254..f7fd3af7 100644 --- a/server/src/routes/issues.ts +++ b/server/src/routes/issues.ts @@ -29,13 +29,14 @@ export function issueRoutes(db: Db) { router.get("/issues/:id", async (req, res) => { const id = req.params.id as string; - const issue = await svc.getById(id); + const isIdentifier = /^[A-Z]+-\d+$/i.test(id); + const issue = isIdentifier ? await svc.getByIdentifier(id) : await svc.getById(id); if (!issue) { res.status(404).json({ error: "Issue not found" }); return; } assertCompanyAccess(req, issue.companyId); - const ancestors = await svc.getAncestors(id); + const ancestors = await svc.getAncestors(issue.id); res.json({ ...issue, ancestors }); }); @@ -55,6 +56,7 @@ export function issueRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "issue.created", entityType: "issue", entityId: issue.id, @@ -100,6 +102,7 @@ export function issueRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "issue.updated", entityType: "issue", entityId: issue.id, @@ -118,6 +121,7 @@ export function issueRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "issue.comment_added", entityType: "issue", entityId: issue.id, @@ -142,16 +146,20 @@ export function issueRoutes(db: Db) { const assigneeChanged = req.body.assigneeAgentId !== undefined && req.body.assigneeAgentId !== existing.assigneeAgentId; - if (assigneeChanged && issue.assigneeAgentId) { + const reopened = + (existing.status === "done" || existing.status === "cancelled") && + issue.status !== "done" && issue.status !== "cancelled"; + + if ((assigneeChanged || reopened) && issue.assigneeAgentId) { void heartbeat .wakeup(issue.assigneeAgentId, { - source: "assignment", + source: reopened ? "automation" : "assignment", triggerDetail: "system", - reason: "issue_assigned", + reason: reopened ? "issue_reopened" : "issue_assigned", payload: { issueId: issue.id, mutation: "update" }, requestedByActorType: actor.actorType, requestedByActorId: actor.actorId, - contextSnapshot: { issueId: issue.id, source: "issue.update" }, + contextSnapshot: { issueId: issue.id, source: reopened ? "issue.reopen" : "issue.update" }, }) .catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue update")); } @@ -180,6 +188,7 @@ export function issueRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "issue.deleted", entityType: "issue", entityId: issue.id, @@ -210,6 +219,7 @@ export function issueRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "issue.checked_out", entityType: "issue", entityId: issue.id, @@ -252,6 +262,7 @@ export function issueRoutes(db: Db) { actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "issue.released", entityType: "issue", entityId: released.id, @@ -282,19 +293,54 @@ export function issueRoutes(db: Db) { assertCompanyAccess(req, issue.companyId); const actor = getActorInfo(req); + const reopenRequested = req.body.reopen === true; + const isClosed = issue.status === "done" || issue.status === "cancelled"; + let reopened = false; + let reopenFromStatus: string | null = null; + let currentIssue = issue; + + if (reopenRequested && isClosed) { + const reopenedIssue = await svc.update(id, { status: "todo" }); + if (!reopenedIssue) { + res.status(404).json({ error: "Issue not found" }); + return; + } + reopened = true; + reopenFromStatus = issue.status; + currentIssue = reopenedIssue; + + await logActivity(db, { + companyId: currentIssue.companyId, + actorType: actor.actorType, + actorId: actor.actorId, + agentId: actor.agentId, + runId: actor.runId, + action: "issue.updated", + entityType: "issue", + entityId: currentIssue.id, + details: { + status: "todo", + reopened: true, + reopenedFrom: reopenFromStatus, + source: "comment", + }, + }); + } + const comment = await svc.addComment(id, req.body.body, { agentId: actor.agentId ?? undefined, userId: actor.actorType === "user" ? actor.actorId : undefined, }); await logActivity(db, { - companyId: issue.companyId, + companyId: currentIssue.companyId, actorType: actor.actorType, actorId: actor.actorId, agentId: actor.agentId, + runId: actor.runId, action: "issue.comment_added", entityType: "issue", - entityId: issue.id, + entityId: currentIssue.id, details: { commentId: comment.id }, }); @@ -313,6 +359,32 @@ export function issueRoutes(db: Db) { } }).catch((err) => logger.warn({ err, issueId: id }, "failed to resolve @-mentions")); + if (reopened && currentIssue.assigneeAgentId) { + void heartbeat + .wakeup(currentIssue.assigneeAgentId, { + source: "automation", + triggerDetail: "system", + reason: "issue_reopened_via_comment", + payload: { + issueId: currentIssue.id, + commentId: comment.id, + reopenedFrom: reopenFromStatus, + mutation: "comment", + }, + requestedByActorType: actor.actorType, + requestedByActorId: actor.actorId, + contextSnapshot: { + issueId: currentIssue.id, + taskId: currentIssue.id, + commentId: comment.id, + source: "issue.comment.reopen", + wakeReason: "issue_reopened_via_comment", + reopenedFrom: reopenFromStatus, + }, + }) + .catch((err) => logger.warn({ err, issueId: currentIssue.id }, "failed to wake assignee on issue reopen comment")); + } + res.status(201).json(comment); }); diff --git a/server/src/services/activity-log.ts b/server/src/services/activity-log.ts index b6f0e6e5..3b37a5e6 100644 --- a/server/src/services/activity-log.ts +++ b/server/src/services/activity-log.ts @@ -10,6 +10,7 @@ export interface LogActivityInput { entityType: string; entityId: string; agentId?: string | null; + runId?: string | null; details?: Record | null; } @@ -22,6 +23,7 @@ export async function logActivity(db: Db, input: LogActivityInput) { entityType: input.entityType, entityId: input.entityId, agentId: input.agentId ?? null, + runId: input.runId ?? null, details: input.details ?? null, }); @@ -35,6 +37,7 @@ export async function logActivity(db: Db, input: LogActivityInput) { entityType: input.entityType, entityId: input.entityId, agentId: input.agentId ?? null, + runId: input.runId ?? null, details: input.details ?? null, }, }); diff --git a/server/src/services/activity.ts b/server/src/services/activity.ts index 760c973a..4ff036c1 100644 --- a/server/src/services/activity.ts +++ b/server/src/services/activity.ts @@ -1,6 +1,6 @@ -import { and, desc, eq } from "drizzle-orm"; +import { and, desc, eq, isNotNull, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; -import { activityLog } from "@paperclip/db"; +import { activityLog, heartbeatRuns, issues } from "@paperclip/db"; export interface ActivityFilters { companyId: string; @@ -10,6 +10,7 @@ export interface ActivityFilters { } export function activityService(db: Db) { + const issueIdAsText = sql`${issues.id}::text`; return { list: (filters: ActivityFilters) => { const conditions = [eq(activityLog.companyId, filters.companyId)]; @@ -27,6 +28,58 @@ export function activityService(db: Db) { return db.select().from(activityLog).where(and(...conditions)).orderBy(desc(activityLog.createdAt)); }, + forIssue: (issueId: string) => + db + .select() + .from(activityLog) + .where( + and( + eq(activityLog.entityType, "issue"), + eq(activityLog.entityId, issueId), + ), + ) + .orderBy(desc(activityLog.createdAt)), + + runsForIssue: (issueId: string) => + db + .selectDistinctOn([activityLog.runId], { + runId: activityLog.runId, + status: heartbeatRuns.status, + agentId: heartbeatRuns.agentId, + startedAt: heartbeatRuns.startedAt, + finishedAt: heartbeatRuns.finishedAt, + createdAt: heartbeatRuns.createdAt, + invocationSource: heartbeatRuns.invocationSource, + }) + .from(activityLog) + .innerJoin(heartbeatRuns, eq(activityLog.runId, heartbeatRuns.id)) + .where( + and( + eq(activityLog.entityType, "issue"), + eq(activityLog.entityId, issueId), + isNotNull(activityLog.runId), + ), + ) + .orderBy(activityLog.runId, desc(heartbeatRuns.createdAt)), + + issuesForRun: (runId: string) => + db + .selectDistinctOn([issueIdAsText], { + issueId: issues.id, + title: issues.title, + status: issues.status, + priority: issues.priority, + }) + .from(activityLog) + .innerJoin(issues, eq(activityLog.entityId, issueIdAsText)) + .where( + and( + eq(activityLog.runId, runId), + eq(activityLog.entityType, "issue"), + ), + ) + .orderBy(issueIdAsText), + create: (data: typeof activityLog.$inferInsert) => db .insert(activityLog) diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 68e562a7..0402d9ce 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -34,6 +34,10 @@ interface WakeupOptions { contextSnapshot?: Record; } +function readNonEmptyString(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value : null; +} + export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); @@ -216,6 +220,56 @@ export function heartbeatService(db: Db) { } } + async function reapOrphanedRuns(opts?: { staleThresholdMs?: number }) { + const staleThresholdMs = opts?.staleThresholdMs ?? 0; + const now = new Date(); + + // Find all runs in "queued" or "running" state + const activeRuns = await db + .select() + .from(heartbeatRuns) + .where(inArray(heartbeatRuns.status, ["queued", "running"])); + + const reaped: string[] = []; + + for (const run of activeRuns) { + if (runningProcesses.has(run.id)) continue; + + // Apply staleness threshold to avoid false positives + if (staleThresholdMs > 0) { + const refTime = run.updatedAt ? new Date(run.updatedAt).getTime() : 0; + if (now.getTime() - refTime < staleThresholdMs) continue; + } + + await setRunStatus(run.id, "failed", { + error: "Process lost -- server may have restarted", + errorCode: "process_lost", + finishedAt: now, + }); + await setWakeupStatus(run.wakeupRequestId, "failed", { + finishedAt: now, + error: "Process lost -- server may have restarted", + }); + const updatedRun = await getRun(run.id); + if (updatedRun) { + await appendRunEvent(updatedRun, 1, { + eventType: "lifecycle", + stream: "system", + level: "error", + message: "Process lost -- server may have restarted", + }); + } + await finalizeAgentStatus(run.agentId, "failed"); + runningProcesses.delete(run.id); + reaped.push(run.id); + } + + if (reaped.length > 0) { + logger.warn({ reapedCount: reaped.length, runIds: reaped }, "reaped orphaned heartbeat runs"); + } + return { reaped: reaped.length, runIds: reaped }; + } + async function updateRuntimeState( agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, @@ -543,7 +597,26 @@ export function heartbeatService(db: Db) { async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) { const source = opts.source ?? "on_demand"; const triggerDetail = opts.triggerDetail ?? null; - const contextSnapshot = opts.contextSnapshot ?? {}; + const contextSnapshot: Record = { ...(opts.contextSnapshot ?? {}) }; + const reason = opts.reason ?? null; + const payload = opts.payload ?? null; + const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]); + + if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) { + contextSnapshot.wakeReason = reason; + } + if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) { + contextSnapshot.issueId = issueIdFromPayload; + } + if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) { + contextSnapshot.taskId = issueIdFromPayload; + } + if (!readNonEmptyString(contextSnapshot["wakeSource"])) { + contextSnapshot.wakeSource = source; + } + if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) { + contextSnapshot.wakeTriggerDetail = triggerDetail; + } const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); @@ -560,7 +633,7 @@ export function heartbeatService(db: Db) { source, triggerDetail, reason, - payload: opts.payload ?? null, + payload, status: "skipped", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, @@ -591,8 +664,8 @@ export function heartbeatService(db: Db) { agentId, source, triggerDetail, - reason: opts.reason ?? null, - payload: opts.payload ?? null, + reason, + payload, status: "coalesced", coalescedCount: 1, requestedByActorType: opts.requestedByActorType ?? null, @@ -611,8 +684,8 @@ export function heartbeatService(db: Db) { agentId, source, triggerDetail, - reason: opts.reason ?? null, - payload: opts.payload ?? null, + reason, + payload, status: "queued", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, @@ -757,6 +830,8 @@ export function heartbeatService(db: Db) { wakeup: enqueueWakeup, + reapOrphanedRuns, + tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0; @@ -860,5 +935,20 @@ export function heartbeatService(db: Db) { return runs.length; }, + + getActiveRunForAgent: async (agentId: string) => { + const [run] = await db + .select() + .from(heartbeatRuns) + .where( + and( + eq(heartbeatRuns.agentId, agentId), + eq(heartbeatRuns.status, "running"), + ), + ) + .orderBy(desc(heartbeatRuns.startedAt)) + .limit(1); + return run ?? null; + }, }; } diff --git a/server/src/services/issues.ts b/server/src/services/issues.ts index 5955fe35..160c3ddd 100644 --- a/server/src/services/issues.ts +++ b/server/src/services/issues.ts @@ -1,6 +1,6 @@ import { and, asc, desc, eq, inArray, isNull, or, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; -import { agents, issues, issueComments } from "@paperclip/db"; +import { agents, companies, issues, issueComments } from "@paperclip/db"; import { conflict, notFound, unprocessable } from "../errors.js"; const ISSUE_TRANSITIONS: Record = { @@ -9,8 +9,8 @@ const ISSUE_TRANSITIONS: Record = { in_progress: ["in_review", "blocked", "done", "cancelled"], in_review: ["in_progress", "done", "cancelled"], blocked: ["todo", "in_progress", "cancelled"], - done: [], - cancelled: [], + done: ["todo"], + cancelled: ["todo"], }; function assertTransition(from: string, to: string) { @@ -69,23 +69,38 @@ export function issueService(db: Db) { .where(eq(issues.id, id)) .then((rows) => rows[0] ?? null), - create: (companyId: string, data: Omit) => { - const values = { ...data, companyId } as typeof issues.$inferInsert; - if (values.status === "in_progress" && !values.startedAt) { - values.startedAt = new Date(); - } - if (values.status === "done") { - values.completedAt = new Date(); - } - if (values.status === "cancelled") { - values.cancelledAt = new Date(); - } + getByIdentifier: (identifier: string) => + db + .select() + .from(issues) + .where(eq(issues.identifier, identifier.toUpperCase())) + .then((rows) => rows[0] ?? null), - return db - .insert(issues) - .values(values) - .returning() - .then((rows) => rows[0]); + create: async (companyId: string, data: Omit) => { + return db.transaction(async (tx) => { + const [company] = await tx + .update(companies) + .set({ issueCounter: sql`${companies.issueCounter} + 1` }) + .where(eq(companies.id, companyId)) + .returning({ issueCounter: companies.issueCounter, issuePrefix: companies.issuePrefix }); + + const issueNumber = company.issueCounter; + const identifier = `${company.issuePrefix}-${issueNumber}`; + + const values = { ...data, companyId, issueNumber, identifier } as typeof issues.$inferInsert; + if (values.status === "in_progress" && !values.startedAt) { + values.startedAt = new Date(); + } + if (values.status === "done") { + values.completedAt = new Date(); + } + if (values.status === "cancelled") { + values.cancelledAt = new Date(); + } + + const [issue] = await tx.insert(issues).values(values).returning(); + return issue; + }); }, update: async (id: string, data: Partial) => { @@ -110,6 +125,12 @@ export function issueService(db: Db) { } applyStatusSideEffects(data.status, patch); + if (data.status && data.status !== "done") { + patch.completedAt = null; + } + if (data.status && data.status !== "cancelled") { + patch.cancelledAt = null; + } return db .update(issues) diff --git a/server/src/types/express.d.ts b/server/src/types/express.d.ts index fe176623..7401a0c1 100644 --- a/server/src/types/express.d.ts +++ b/server/src/types/express.d.ts @@ -9,6 +9,7 @@ declare global { agentId?: string; companyId?: string; keyId?: string; + runId?: string; }; } }