From c4272f29ad85bfaedd2a7c18e895f6453f5cc122 Mon Sep 17 00:00:00 2001 From: Jan Dolezel Date: Mon, 25 Jun 2018 11:19:35 +0200 Subject: [PATCH 1/2] Using async/await --- bin/node-pg-migrate | 6 +- lib/db.js | 76 ++++----- lib/migration.js | 106 ++++++------- lib/operations/other.js | 4 +- lib/runner.js | 339 +++++++++++++++++++--------------------- lib/utils.js | 26 ++- 6 files changed, 257 insertions(+), 300 deletions(-) diff --git a/bin/node-pg-migrate b/bin/node-pg-migrate index b6ffc4a5..db663e8b 100755 --- a/bin/node-pg-migrate +++ b/bin/node-pg-migrate @@ -6,10 +6,8 @@ const util = require("util"); const path = require("path"); const yargs = require("yargs"); -const Migration = require("../dist/migration").default; // eslint-disable-line import/no-unresolved,import/extensions -const runner = require("../dist/runner"); // eslint-disable-line import/no-unresolved,import/extensions - -const migrationRunner = runner.default; +const { default: Migration } = require("../dist/migration"); // eslint-disable-line import/no-unresolved,import/extensions +const { default: migrationRunner } = require("../dist/runner"); // eslint-disable-line import/no-unresolved,import/extensions process.on("uncaughtException", err => { console.log(err.stack); diff --git a/lib/db.js b/lib/db.js index 32aa78d1..df45b4f5 100644 --- a/lib/db.js +++ b/lib/db.js @@ -8,6 +8,7 @@ import pg from "pg"; export default (connectionString, log = console.error) => { const client = new pg.Client(connectionString); + let clientActive = false; const beforeCloseListeners = []; @@ -26,38 +27,43 @@ export default (connectionString, log = console.error) => { }) ); - const query = (...args) => - createConnection() - .then(() => client.query(...args)) - .catch(err => { - const { message, position } = err; - const string = args[0].text || args[0]; - if (message && position >= 1) { - const endLineWrapIndexOf = string.indexOf("\n", position); - const endLineWrapPos = - endLineWrapIndexOf >= 0 ? endLineWrapIndexOf : string.length; - const stringStart = string.substring(0, endLineWrapPos); - const stringEnd = string.substr(endLineWrapPos); - const startLineWrapPos = stringStart.lastIndexOf("\n") + 1; - const padding = " ".repeat(position - startLineWrapPos - 1); - log(`Error executing: + const query = async (...args) => { + await createConnection(); + try { + return await client.query(...args); + } catch (err) { + const { message, position } = err; + const string = args[0].text || args[0]; + if (message && position >= 1) { + const endLineWrapIndexOf = string.indexOf("\n", position); + const endLineWrapPos = + endLineWrapIndexOf >= 0 ? endLineWrapIndexOf : string.length; + const stringStart = string.substring(0, endLineWrapPos); + const stringEnd = string.substr(endLineWrapPos); + const startLineWrapPos = stringStart.lastIndexOf("\n") + 1; + const padding = " ".repeat(position - startLineWrapPos - 1); + log(`Error executing: ${stringStart} ${padding}^^^^${stringEnd} ${message} `); - } else { - log(`Error executing: + } else { + log(`Error executing: ${string} ${err} `); - } - throw err; - }); + } + throw err; + } + }; - const select = string => query(string).then(result => result && result.rows); - const column = (string, columnName) => - select(string).then(rows => rows.map(r => r[columnName])); + const select = async string => { + const { rows } = await query(string); + return rows; + }; + const column = async (string, columnName) => + (await select(string)).map(r => r[columnName]); return { query, @@ -66,18 +72,16 @@ ${err} addBeforeCloseListener: listener => beforeCloseListeners.push(listener), - close: () => - beforeCloseListeners - .reduce( - (promise, listener) => - promise.then(listener).catch(err => log(err.stack || err)), - Promise.resolve() - ) - .then(() => { - clientActive = false; - if (client) { - client.end(); - } - }) + close: async () => { + await beforeCloseListeners.reduce( + (promise, listener) => + promise.then(listener).catch(err => log(err.stack || err)), + Promise.resolve() + ); + clientActive = false; + if (client) { + client.end(); + } + } }; }; diff --git a/lib/migration.js b/lib/migration.js index 580d8974..a5f52e2c 100644 --- a/lib/migration.js +++ b/lib/migration.js @@ -53,69 +53,55 @@ class Migration { this.log = log; } - _apply(action, pgm) { - return new Promise((resolve, reject) => { - if (action.length === 2) { - action(pgm, resolve); - } else { - const result = action(pgm); - // result conforms to Promises/A+ spec - if ( - result && - typeof result === "object" && - typeof result.then === "function" - ) { - result.then(resolve).catch(reject); - } else { - resolve(); - } - } - }).then(() => { - const sqlSteps = pgm.getSqlSteps(); - - const schema = getMigrationTableSchema(this.options); - const { migrationsTable } = this.options; - const { name } = this; - switch (action) { - case this.down: - this.log(`### MIGRATION ${this.name} (DOWN) ###`); - sqlSteps.push( - `DELETE FROM "${schema}"."${migrationsTable}" WHERE name='${name}';` - ); - break; - case this.up: - this.log(`### MIGRATION ${this.name} (UP) ###`); - sqlSteps.push( - `INSERT INTO "${schema}"."${migrationsTable}" (name, run_on) VALUES ('${name}', NOW());` - ); - break; - default: - throw new Error("Unknown direction"); - } - - if (!this.options.singleTransaction && pgm.isUsingTransaction()) { - // if not in singleTransaction mode we need to create our own transaction - sqlSteps.unshift("BEGIN;"); - sqlSteps.push("COMMIT;"); - } else if (this.options.singleTransaction && !pgm.isUsingTransaction()) { - // in singleTransaction mode we are already wrapped in a global transaction - this.log("#> WARNING: Need to break single transaction! <"); - sqlSteps.unshift("COMMIT;"); - sqlSteps.push("BEGIN;"); - } else if (!this.options.singleTransaction || !pgm.isUsingTransaction()) { - this.log( - "#> WARNING: This migration is not wrapped in a transaction! <" + async _apply(action, pgm) { + if (action.length === 2) { + await new Promise(resolve => action(pgm, resolve)); + } else { + await action(pgm); + } + + const sqlSteps = pgm.getSqlSteps(); + + const schema = getMigrationTableSchema(this.options); + const { migrationsTable } = this.options; + const { name } = this; + switch (action) { + case this.down: + this.log(`### MIGRATION ${this.name} (DOWN) ###`); + sqlSteps.push( + `DELETE FROM "${schema}"."${migrationsTable}" WHERE name='${name}';` + ); + break; + case this.up: + this.log(`### MIGRATION ${this.name} (UP) ###`); + sqlSteps.push( + `INSERT INTO "${schema}"."${migrationsTable}" (name, run_on) VALUES ('${name}', NOW());` ); - } + break; + default: + throw new Error("Unknown direction"); + } + + if (!this.options.singleTransaction && pgm.isUsingTransaction()) { + // if not in singleTransaction mode we need to create our own transaction + sqlSteps.unshift("BEGIN;"); + sqlSteps.push("COMMIT;"); + } else if (this.options.singleTransaction && !pgm.isUsingTransaction()) { + // in singleTransaction mode we are already wrapped in a global transaction + this.log("#> WARNING: Need to break single transaction! <"); + sqlSteps.unshift("COMMIT;"); + sqlSteps.push("BEGIN;"); + } else if (!this.options.singleTransaction || !pgm.isUsingTransaction()) { + this.log("#> WARNING: This migration is not wrapped in a transaction! <"); + } - this.log(`${sqlSteps.join("\n")}\n\n`); + this.log(`${sqlSteps.join("\n")}\n\n`); - return sqlSteps.reduce( - (promise, sql) => - promise.then(() => this.options.dryRun || this.db.query(sql)), - Promise.resolve() - ); - }); + return sqlSteps.reduce( + (promise, sql) => + promise.then(() => this.options.dryRun || this.db.query(sql)), + Promise.resolve() + ); } applyUp() { diff --git a/lib/operations/other.js b/lib/operations/other.js index 87deb4fc..feaf335b 100644 --- a/lib/operations/other.js +++ b/lib/operations/other.js @@ -1,7 +1,7 @@ import { t } from "../utils"; // eslint-disable-next-line import/prefer-default-export -export const sql = (...args) => { +export function sql(...args) { // applies some very basic templating using the utils.p let s = t(...args); // add trailing ; if not present @@ -9,4 +9,4 @@ export const sql = (...args) => { s += ";"; } return s; -}; +} diff --git a/lib/runner.js b/lib/runner.js index c2ee4da5..93618eb6 100644 --- a/lib/runner.js +++ b/lib/runner.js @@ -2,129 +2,115 @@ import path from "path"; import fs from "fs"; import Db from "./db"; import Migration from "./migration"; -import { getMigrationTableSchema, finallyPromise, template } from "./utils"; +import { getMigrationTableSchema, template, promisify } from "./utils"; export { PgLiteral } from "./utils"; // Random but well-known identifier shared by all instances of node-pg-migrate const PG_MIGRATE_LOCK_ID = 7241865325823964; -const readdir = (...args) => - new Promise((resolve, reject) => - // eslint-disable-next-line security/detect-non-literal-fs-filename - fs.readdir(...args, (err, files) => (err ? reject(err) : resolve(files))) - ); +const readdir = promisify(fs.readdir); +const lstat = promisify(fs.lstat); const idColumn = "id"; const nameColumn = "name"; const runOnColumn = "run_on"; -const loadMigrationFiles = (db, options, log) => - readdir(`${options.dir}/`) - .then(files => - Promise.all( - files.map( - file => - new Promise((resolve, reject) => - // eslint-disable-next-line security/detect-non-literal-fs-filename - fs.lstat( - `${options.dir}/${file}`, - (err, stats) => - err ? reject(err) : resolve(stats.isFile() ? file : null) - ) - ) - ) +const loadMigrationFiles = async (db, options, log) => { + try { + const files = await Promise.all( + (await readdir(`${options.dir}/`)).map(async file => { + const stats = await lstat(`${options.dir}/${file}`); + return stats.isFile() ? file : null; + }) + ); + const filter = new RegExp(`^(${options.ignorePattern})$`); // eslint-disable-line security/detect-non-literal-regexp + let shorthands = {}; + return files + .filter(i => i && !filter.test(i)) + .sort( + (f1, f2) => + f1 < f2 // eslint-disable-line no-nested-ternary + ? -1 + : f1 > f2 + ? 1 + : 0 ) - ) - .then(files => { - const filter = new RegExp(`^(${options.ignorePattern})$`); // eslint-disable-line security/detect-non-literal-regexp - let shorthands = {}; - return files - .filter(i => i && !filter.test(i)) - .sort( - (f1, f2) => - f1 < f2 // eslint-disable-line no-nested-ternary - ? -1 - : f1 > f2 - ? 1 - : 0 - ) - .map(file => { - const filePath = `${options.dir}/${file}`; - const actions = - path.extname(filePath) === ".sql" - ? // eslint-disable-next-line security/detect-non-literal-fs-filename - { up: pgm => pgm.sql(fs.readFileSync(filePath, "utf8")) } - : // eslint-disable-next-line global-require,import/no-dynamic-require,security/detect-non-literal-require - require(path.relative(__dirname, filePath)); - shorthands = { ...shorthands, ...actions.shorthands }; - return new Migration( - db, - filePath, - actions, - options, - { - ...shorthands - }, - log - ); - }); - }) - .catch(err => { - throw new Error(`Can't get migration files: ${err.stack}`); - }); + .map(file => { + const filePath = `${options.dir}/${file}`; + const actions = + path.extname(filePath) === ".sql" + ? // eslint-disable-next-line security/detect-non-literal-fs-filename + { up: pgm => pgm.sql(fs.readFileSync(filePath, "utf8")) } + : // eslint-disable-next-line global-require,import/no-dynamic-require,security/detect-non-literal-require + require(path.relative(__dirname, filePath)); + shorthands = { ...shorthands, ...actions.shorthands }; + return new Migration( + db, + filePath, + actions, + options, + { + ...shorthands + }, + log + ); + }); + } catch (err) { + throw new Error(`Can't get migration files: ${err.stack}`); + } +}; -const lock = db => - db - .query( - `select pg_try_advisory_lock(${PG_MIGRATE_LOCK_ID}) as lock_obtained` - ) - .then(result => { - if (!result.rows[0].lock_obtained) { - throw new Error("Another migration is already running"); - } - }); +const lock = async db => { + const { + rows: [lockObtained] + } = await db.query( + `select pg_try_advisory_lock(${PG_MIGRATE_LOCK_ID}) as "lockObtained"` + ); + if (!lockObtained) { + throw new Error("Another migration is already running"); + } +}; + +const getRunMigrations = async (db, options) => { + try { + const schema = getMigrationTableSchema(options); + const { migrationsTable } = options; + const fullTableName = { + schema, + name: migrationsTable + }; -const getRunMigrations = (db, options) => { - const schema = getMigrationTableSchema(options); - const { migrationsTable } = options; - const fullTableName = { - schema, - name: migrationsTable - }; - return db - .select( + const migrationTables = await db.select( `SELECT table_name FROM information_schema.tables WHERE table_schema = '${schema}' AND table_name = '${migrationsTable}'` - ) - .then( - migrationTables => - migrationTables && migrationTables.length === 1 - ? db - .select( - `SELECT constraint_name FROM information_schema.table_constraints WHERE table_schema = '${schema}' AND table_name = '${migrationsTable}' AND constraint_type = 'PRIMARY KEY'` - ) - .then( - primaryKeyConstraints => - (primaryKeyConstraints && - primaryKeyConstraints.length === 1) || - db.query( - template`ALTER TABLE "${fullTableName}" ADD PRIMARY KEY (${idColumn})` - ) - ) - : db.query( - template`CREATE TABLE "${fullTableName}" ( ${idColumn} SERIAL PRIMARY KEY, ${nameColumn} varchar(255) NOT NULL, ${runOnColumn} timestamp NOT NULL)` - ) - ) - .then(() => (!options.noLock ? lock(db, options) : null)) - .then(() => - db.column( - template`SELECT ${nameColumn} FROM "${fullTableName}" ORDER BY ${runOnColumn}, ${idColumn}`, - nameColumn - ) - ) - .catch(err => { - throw new Error(`Unable to fetch migrations: ${err.stack}`); - }); + ); + + if (migrationTables && migrationTables.length === 1) { + const primaryKeyConstraints = await db.select( + `SELECT constraint_name FROM information_schema.table_constraints WHERE table_schema = '${schema}' AND table_name = '${migrationsTable}' AND constraint_type = 'PRIMARY KEY'` + ); + if (!primaryKeyConstraints || primaryKeyConstraints.length !== 1) { + await db.query( + template`ALTER TABLE "${fullTableName}" ADD PRIMARY KEY (${idColumn})` + ); + } + } else { + await db.query( + template`CREATE TABLE "${fullTableName}" ( ${idColumn} SERIAL PRIMARY KEY, ${nameColumn} varchar(255) NOT NULL, ${runOnColumn} timestamp NOT NULL)` + ); + } + + if (!options.noLock) { + await lock(db, options); + } + + return await db.column( + template`SELECT ${nameColumn} FROM "${fullTableName}" ORDER BY ${runOnColumn}, ${idColumn}`, + nameColumn + ); + } catch (err) { + throw new Error(`Unable to fetch migrations: ${err.stack}`); + } }; const getMigrationsToRun = (options, runNames, migrations) => { @@ -164,87 +150,78 @@ const getMigrationsToRun = (options, runNames, migrations) => { ); }; -const ifSingleTransaction = (operation, options, db) => - options.singleTransaction ? db.query(operation) : Promise.resolve(); - -export default options => { +export default async options => { const log = options.log || console.log; const db = Db(options.databaseUrl, log); - return Promise.resolve() - .then(() => { - let promise = Promise.resolve(); - if (options.schema) { - if (options.createSchema) { - promise = promise.then(() => - db.query(`CREATE SCHEMA IF NOT EXISTS "${options.schema}"`) - ); - } - promise = promise.then(() => - db.query(`SET SCHEMA '${options.schema}'`) - ); + try { + if (options.schema) { + if (options.createSchema) { + await db.query(`CREATE SCHEMA IF NOT EXISTS "${options.schema}"`); } - if (options.migrationsSchema && options.createMigrationsSchema) { - promise = promise.then(() => - db.query(`CREATE SCHEMA IF NOT EXISTS "${options.migrationsSchema}"`) - ); - } - return promise; - }) - .then(() => - Promise.all([ - loadMigrationFiles(db, options, log), - getRunMigrations(db, options) - ]) - ) - .then(([migrations, runNames]) => { - if (options.checkOrder) { - const len = Math.min(runNames.length, migrations.length); - for (let i = 0; i < len; i += 1) { - const runName = runNames[i]; - const migrationName = migrations[i].name; - if (runName !== migrationName) { - throw new Error( - `Not run migration ${migrationName} is preceding already run migration ${runName}` - ); - } + await db.query(`SET SCHEMA '${options.schema}'`); + } + if (options.migrationsSchema && options.createMigrationsSchema) { + await db.query( + `CREATE SCHEMA IF NOT EXISTS "${options.migrationsSchema}"` + ); + } + const [migrations, runNames] = await Promise.all([ + loadMigrationFiles(db, options, log), + getRunMigrations(db, options) + ]); + if (options.checkOrder) { + const len = Math.min(runNames.length, migrations.length); + for (let i = 0; i < len; i += 1) { + const runName = runNames[i]; + const migrationName = migrations[i].name; + if (runName !== migrationName) { + throw new Error( + `Not run migration ${migrationName} is preceding already run migration ${runName}` + ); } } + } - const toRun = getMigrationsToRun(options, runNames, migrations); + const toRun = getMigrationsToRun(options, runNames, migrations); - if (!toRun.length) { - log("No migrations to run!"); - return null; - } + if (!toRun.length) { + log("No migrations to run!"); + return; + } - // TODO: add some fancy colors to logging - log("> Migrating files:"); - toRun.forEach(m => { - log(`> - ${m.name}`); - }); + // TODO: add some fancy colors to logging + log("> Migrating files:"); + toRun.forEach(m => { + log(`> - ${m.name}`); + }); - return ifSingleTransaction("BEGIN", options, db) - .then(() => - toRun.reduce( - (promise, migration) => - promise.then( - () => - options.direction === "up" - ? migration.applyUp() - : migration.applyDown() - ), - Promise.resolve() - ) - ) - .then(() => ifSingleTransaction("COMMIT", options, db)); - }) - .catch(e => { - log("> Rolling back attempted migration ..."); - return db.query("ROLLBACK").then( - ...finallyPromise(() => { - throw e; - }) + if (options.singleTransaction) { + await db.query("BEGIN"); + } + + try { + await toRun.reduce( + (promise, migration) => + promise.then( + () => + options.direction === "up" + ? migration.applyUp() + : migration.applyDown() + ), + Promise.resolve() ); - }) - .then(...finallyPromise(db.close)); + + if (options.singleTransaction) { + await db.query("COMMIT"); + } + } catch (err) { + if (options.singleTransaction) { + log("> Rolling back attempted migration ..."); + await db.query("ROLLBACK"); + } + throw err; + } + } finally { + db.close(); + } }; diff --git a/lib/utils.js b/lib/utils.js index 56896261..5afc5dc7 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -91,23 +91,6 @@ export const getMigrationTableSchema = options => ? options.schema : "public"; -export const finallyPromise = func => [ - func, - err => { - const errHandler = innerErr => { - console.error(innerErr.stack ? innerErr.stack : innerErr); - throw err; - }; - try { - return Promise.resolve(func()).then(() => { - throw err; - }, errHandler); - } catch (innerErr) { - return errHandler(innerErr); - } - } -]; - export const quote = array => array.map(item => template`"${item}"`); const typeAdapters = { @@ -175,3 +158,12 @@ export const formatLines = (lines, replace = " ", separator = ",") => .map(line => line.replace(/(?:\r\n|\r|\n)+/g, " ")) .join(`${separator}\n`) .replace(/^/gm, replace); + +export const promisify = fn => (...args) => + new Promise((resolve, reject) => + fn.call( + this, + ...args, + (err, ...result) => (err ? reject(err) : resolve(...result)) + ) + ); From ed5a350446139411ffc1f599455276df3e34131e Mon Sep 17 00:00:00 2001 From: Jan Dolezel Date: Mon, 25 Jun 2018 12:20:27 +0200 Subject: [PATCH 2/2] Refactored --- lib/runner.js | 92 +++++++++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 40 deletions(-) diff --git a/lib/runner.js b/lib/runner.js index 93618eb6..6758b2ed 100644 --- a/lib/runner.js +++ b/lib/runner.js @@ -72,7 +72,7 @@ const lock = async db => { } }; -const getRunMigrations = async (db, options) => { +const ensureMigrationsTable = async (db, options) => { try { const schema = getMigrationTableSchema(options); const { migrationsTable } = options; @@ -99,20 +99,24 @@ const getRunMigrations = async (db, options) => { template`CREATE TABLE "${fullTableName}" ( ${idColumn} SERIAL PRIMARY KEY, ${nameColumn} varchar(255) NOT NULL, ${runOnColumn} timestamp NOT NULL)` ); } - - if (!options.noLock) { - await lock(db, options); - } - - return await db.column( - template`SELECT ${nameColumn} FROM "${fullTableName}" ORDER BY ${runOnColumn}, ${idColumn}`, - nameColumn - ); } catch (err) { - throw new Error(`Unable to fetch migrations: ${err.stack}`); + throw new Error(`Unable to ensure migrations table: ${err.stack}`); } }; +const getRunMigrations = async (db, options) => { + const schema = getMigrationTableSchema(options); + const { migrationsTable } = options; + const fullTableName = { + schema, + name: migrationsTable + }; + return db.column( + template`SELECT ${nameColumn} FROM "${fullTableName}" ORDER BY ${runOnColumn}, ${idColumn}`, + nameColumn + ); +}; + const getMigrationsToRun = (options, runNames, migrations) => { if (options.direction === "down") { const downMigrations = runNames @@ -150,6 +154,28 @@ const getMigrationsToRun = (options, runNames, migrations) => { ); }; +const checkOrder = (runNames, migrations) => { + const len = Math.min(runNames.length, migrations.length); + for (let i = 0; i < len; i += 1) { + const runName = runNames[i]; + const migrationName = migrations[i].name; + if (runName !== migrationName) { + throw new Error( + `Not run migration ${migrationName} is preceding already run migration ${runName}` + ); + } + } +}; + +const runMigrations = (toRun, direction) => + toRun.reduce( + (promise, migration) => + promise.then( + () => (direction === "up" ? migration.applyUp() : migration.applyDown()) + ), + Promise.resolve() + ); + export default async options => { const log = options.log || console.log; const db = Db(options.databaseUrl, log); @@ -165,21 +191,20 @@ export default async options => { `CREATE SCHEMA IF NOT EXISTS "${options.migrationsSchema}"` ); } + + await ensureMigrationsTable(db, options); + + if (!options.noLock) { + await lock(db, options); + } + const [migrations, runNames] = await Promise.all([ loadMigrationFiles(db, options, log), getRunMigrations(db, options) ]); + if (options.checkOrder) { - const len = Math.min(runNames.length, migrations.length); - for (let i = 0; i < len; i += 1) { - const runName = runNames[i]; - const migrationName = migrations[i].name; - if (runName !== migrationName) { - throw new Error( - `Not run migration ${migrationName} is preceding already run migration ${runName}` - ); - } - } + checkOrder(runNames, migrations); } const toRun = getMigrationsToRun(options, runNames, migrations); @@ -197,29 +222,16 @@ export default async options => { if (options.singleTransaction) { await db.query("BEGIN"); - } - - try { - await toRun.reduce( - (promise, migration) => - promise.then( - () => - options.direction === "up" - ? migration.applyUp() - : migration.applyDown() - ), - Promise.resolve() - ); - - if (options.singleTransaction) { + try { + await runMigrations(toRun, options.direction); await db.query("COMMIT"); - } - } catch (err) { - if (options.singleTransaction) { + } catch (err) { log("> Rolling back attempted migration ..."); await db.query("ROLLBACK"); + throw err; } - throw err; + } else { + await runMigrations(toRun, options.direction); } } finally { db.close();