Fix legacy migration reconciliation
This commit is contained in:
157
packages/db/src/client.test.ts
Normal file
157
packages/db/src/client.test.ts
Normal file
@@ -0,0 +1,157 @@
|
|||||||
|
import { createHash } from "node:crypto";
|
||||||
|
import fs from "node:fs";
|
||||||
|
import net from "node:net";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
import { afterEach, describe, expect, it } from "vitest";
|
||||||
|
import postgres from "postgres";
|
||||||
|
import {
|
||||||
|
applyPendingMigrations,
|
||||||
|
ensurePostgresDatabase,
|
||||||
|
inspectMigrations,
|
||||||
|
} from "./client.js";
|
||||||
|
|
||||||
|
type EmbeddedPostgresInstance = {
|
||||||
|
initialise(): Promise<void>;
|
||||||
|
start(): Promise<void>;
|
||||||
|
stop(): Promise<void>;
|
||||||
|
};
|
||||||
|
|
||||||
|
type EmbeddedPostgresCtor = new (opts: {
|
||||||
|
databaseDir: string;
|
||||||
|
user: string;
|
||||||
|
password: string;
|
||||||
|
port: number;
|
||||||
|
persistent: boolean;
|
||||||
|
initdbFlags?: string[];
|
||||||
|
onLog?: (message: unknown) => void;
|
||||||
|
onError?: (message: unknown) => void;
|
||||||
|
}) => EmbeddedPostgresInstance;
|
||||||
|
|
||||||
|
const tempPaths: string[] = [];
|
||||||
|
const runningInstances: EmbeddedPostgresInstance[] = [];
|
||||||
|
|
||||||
|
async function getEmbeddedPostgresCtor(): Promise<EmbeddedPostgresCtor> {
|
||||||
|
const mod = await import("embedded-postgres");
|
||||||
|
return mod.default as EmbeddedPostgresCtor;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getAvailablePort(): Promise<number> {
|
||||||
|
return await new Promise((resolve, reject) => {
|
||||||
|
const server = net.createServer();
|
||||||
|
server.unref();
|
||||||
|
server.on("error", reject);
|
||||||
|
server.listen(0, "127.0.0.1", () => {
|
||||||
|
const address = server.address();
|
||||||
|
if (!address || typeof address === "string") {
|
||||||
|
server.close(() => reject(new Error("Failed to allocate test port")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const { port } = address;
|
||||||
|
server.close((error) => {
|
||||||
|
if (error) reject(error);
|
||||||
|
else resolve(port);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createTempDatabase(): Promise<string> {
|
||||||
|
const dataDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-db-client-"));
|
||||||
|
tempPaths.push(dataDir);
|
||||||
|
const port = await getAvailablePort();
|
||||||
|
const EmbeddedPostgres = await getEmbeddedPostgresCtor();
|
||||||
|
const instance = new EmbeddedPostgres({
|
||||||
|
databaseDir: dataDir,
|
||||||
|
user: "paperclip",
|
||||||
|
password: "paperclip",
|
||||||
|
port,
|
||||||
|
persistent: true,
|
||||||
|
initdbFlags: ["--encoding=UTF8", "--locale=C"],
|
||||||
|
onLog: () => {},
|
||||||
|
onError: () => {},
|
||||||
|
});
|
||||||
|
await instance.initialise();
|
||||||
|
await instance.start();
|
||||||
|
runningInstances.push(instance);
|
||||||
|
|
||||||
|
const adminUrl = `postgres://paperclip:paperclip@127.0.0.1:${port}/postgres`;
|
||||||
|
await ensurePostgresDatabase(adminUrl, "paperclip");
|
||||||
|
return `postgres://paperclip:paperclip@127.0.0.1:${port}/paperclip`;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function migrationHash(migrationFile: string): Promise<string> {
|
||||||
|
const content = await fs.promises.readFile(
|
||||||
|
new URL(`./migrations/${migrationFile}`, import.meta.url),
|
||||||
|
"utf8",
|
||||||
|
);
|
||||||
|
return createHash("sha256").update(content).digest("hex");
|
||||||
|
}
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
while (runningInstances.length > 0) {
|
||||||
|
const instance = runningInstances.pop();
|
||||||
|
if (!instance) continue;
|
||||||
|
await instance.stop();
|
||||||
|
}
|
||||||
|
while (tempPaths.length > 0) {
|
||||||
|
const tempPath = tempPaths.pop();
|
||||||
|
if (!tempPath) continue;
|
||||||
|
fs.rmSync(tempPath, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("applyPendingMigrations", () => {
|
||||||
|
it(
|
||||||
|
"applies an inserted earlier migration without replaying later legacy migrations",
|
||||||
|
async () => {
|
||||||
|
const connectionString = await createTempDatabase();
|
||||||
|
|
||||||
|
await applyPendingMigrations(connectionString);
|
||||||
|
|
||||||
|
const sql = postgres(connectionString, { max: 1, onnotice: () => {} });
|
||||||
|
try {
|
||||||
|
const richMagnetoHash = await migrationHash("0030_rich_magneto.sql");
|
||||||
|
|
||||||
|
await sql.unsafe(
|
||||||
|
`DELETE FROM "drizzle"."__drizzle_migrations" WHERE hash = '${richMagnetoHash}'`,
|
||||||
|
);
|
||||||
|
await sql.unsafe(`DROP TABLE "company_logos"`);
|
||||||
|
} finally {
|
||||||
|
await sql.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
const pendingState = await inspectMigrations(connectionString);
|
||||||
|
expect(pendingState).toMatchObject({
|
||||||
|
status: "needsMigrations",
|
||||||
|
pendingMigrations: ["0030_rich_magneto.sql"],
|
||||||
|
reason: "pending-migrations",
|
||||||
|
});
|
||||||
|
|
||||||
|
await applyPendingMigrations(connectionString);
|
||||||
|
|
||||||
|
const finalState = await inspectMigrations(connectionString);
|
||||||
|
expect(finalState.status).toBe("upToDate");
|
||||||
|
|
||||||
|
const verifySql = postgres(connectionString, { max: 1, onnotice: () => {} });
|
||||||
|
try {
|
||||||
|
const rows = await verifySql.unsafe<{ table_name: string }[]>(
|
||||||
|
`
|
||||||
|
SELECT table_name
|
||||||
|
FROM information_schema.tables
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name IN ('company_logos', 'execution_workspaces')
|
||||||
|
ORDER BY table_name
|
||||||
|
`,
|
||||||
|
);
|
||||||
|
expect(rows.map((row) => row.table_name)).toEqual([
|
||||||
|
"company_logos",
|
||||||
|
"execution_workspaces",
|
||||||
|
]);
|
||||||
|
} finally {
|
||||||
|
await verifySql.end();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
20_000,
|
||||||
|
);
|
||||||
|
});
|
||||||
@@ -646,13 +646,26 @@ export async function applyPendingMigrations(url: string): Promise<void> {
|
|||||||
const initialState = await inspectMigrations(url);
|
const initialState = await inspectMigrations(url);
|
||||||
if (initialState.status === "upToDate") return;
|
if (initialState.status === "upToDate") return;
|
||||||
|
|
||||||
const sql = createUtilitySql(url);
|
if (initialState.reason === "no-migration-journal-empty-db") {
|
||||||
|
const sql = createUtilitySql(url);
|
||||||
|
try {
|
||||||
|
const db = drizzlePg(sql);
|
||||||
|
await migratePg(db, { migrationsFolder: MIGRATIONS_FOLDER });
|
||||||
|
} finally {
|
||||||
|
await sql.end();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
const bootstrappedState = await inspectMigrations(url);
|
||||||
const db = drizzlePg(sql);
|
if (bootstrappedState.status === "upToDate") return;
|
||||||
await migratePg(db, { migrationsFolder: MIGRATIONS_FOLDER });
|
throw new Error(
|
||||||
} finally {
|
`Failed to bootstrap migrations: ${bootstrappedState.pendingMigrations.join(", ")}`,
|
||||||
await sql.end();
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (initialState.reason === "no-migration-journal-non-empty-db") {
|
||||||
|
throw new Error(
|
||||||
|
"Database has tables but no migration journal; automatic migration is unsafe. Initialize migration history manually.",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let state = await inspectMigrations(url);
|
let state = await inspectMigrations(url);
|
||||||
@@ -665,7 +678,7 @@ export async function applyPendingMigrations(url: string): Promise<void> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (state.status !== "needsMigrations" || state.reason !== "pending-migrations") {
|
if (state.status !== "needsMigrations" || state.reason !== "pending-migrations") {
|
||||||
throw new Error("Migrations are still pending after attempted apply; run inspectMigrations for details.");
|
throw new Error("Migrations are still pending after migration-history reconciliation; run inspectMigrations for details.");
|
||||||
}
|
}
|
||||||
|
|
||||||
await applyPendingMigrationsManually(url, state.pendingMigrations);
|
await applyPendingMigrationsManually(url, state.pendingMigrations);
|
||||||
|
|||||||
Reference in New Issue
Block a user