Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using async/await #282

Merged
merged 2 commits into from
Jun 26, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Using async/await
dolezel committed Jun 25, 2018
commit c4272f29ad85bfaedd2a7c18e895f6453f5cc122
6 changes: 2 additions & 4 deletions bin/node-pg-migrate
Original file line number Diff line number Diff line change
@@ -6,10 +6,8 @@
const util = require("util");
const path = require("path");
const yargs = require("yargs");
const Migration = require("../dist/migration").default; // eslint-disable-line import/no-unresolved,import/extensions
const runner = require("../dist/runner"); // eslint-disable-line import/no-unresolved,import/extensions

const migrationRunner = runner.default;
const { default: Migration } = require("../dist/migration"); // eslint-disable-line import/no-unresolved,import/extensions
const { default: migrationRunner } = require("../dist/runner"); // eslint-disable-line import/no-unresolved,import/extensions

process.on("uncaughtException", err => {
console.log(err.stack);
76 changes: 40 additions & 36 deletions lib/db.js
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import pg from "pg";

export default (connectionString, log = console.error) => {
const client = new pg.Client(connectionString);

let clientActive = false;
const beforeCloseListeners = [];

@@ -26,38 +27,43 @@ export default (connectionString, log = console.error) => {
})
);

const query = (...args) =>
createConnection()
.then(() => client.query(...args))
.catch(err => {
const { message, position } = err;
const string = args[0].text || args[0];
if (message && position >= 1) {
const endLineWrapIndexOf = string.indexOf("\n", position);
const endLineWrapPos =
endLineWrapIndexOf >= 0 ? endLineWrapIndexOf : string.length;
const stringStart = string.substring(0, endLineWrapPos);
const stringEnd = string.substr(endLineWrapPos);
const startLineWrapPos = stringStart.lastIndexOf("\n") + 1;
const padding = " ".repeat(position - startLineWrapPos - 1);
log(`Error executing:
const query = async (...args) => {
await createConnection();
try {
return await client.query(...args);
} catch (err) {
const { message, position } = err;
const string = args[0].text || args[0];
if (message && position >= 1) {
const endLineWrapIndexOf = string.indexOf("\n", position);
const endLineWrapPos =
endLineWrapIndexOf >= 0 ? endLineWrapIndexOf : string.length;
const stringStart = string.substring(0, endLineWrapPos);
const stringEnd = string.substr(endLineWrapPos);
const startLineWrapPos = stringStart.lastIndexOf("\n") + 1;
const padding = " ".repeat(position - startLineWrapPos - 1);
log(`Error executing:
${stringStart}
${padding}^^^^${stringEnd}
${message}
`);
} else {
log(`Error executing:
} else {
log(`Error executing:
${string}
${err}
`);
}
throw err;
});
}
throw err;
}
};

const select = string => query(string).then(result => result && result.rows);
const column = (string, columnName) =>
select(string).then(rows => rows.map(r => r[columnName]));
const select = async string => {
const { rows } = await query(string);
return rows;
};
const column = async (string, columnName) =>
(await select(string)).map(r => r[columnName]);

return {
query,
@@ -66,18 +72,16 @@ ${err}

addBeforeCloseListener: listener => beforeCloseListeners.push(listener),

close: () =>
beforeCloseListeners
.reduce(
(promise, listener) =>
promise.then(listener).catch(err => log(err.stack || err)),
Promise.resolve()
)
.then(() => {
clientActive = false;
if (client) {
client.end();
}
})
close: async () => {
await beforeCloseListeners.reduce(
(promise, listener) =>
promise.then(listener).catch(err => log(err.stack || err)),
Promise.resolve()
);
clientActive = false;
if (client) {
client.end();
}
}
};
};
106 changes: 46 additions & 60 deletions lib/migration.js
Original file line number Diff line number Diff line change
@@ -53,69 +53,55 @@ class Migration {
this.log = log;
}

_apply(action, pgm) {
return new Promise((resolve, reject) => {
if (action.length === 2) {
action(pgm, resolve);
} else {
const result = action(pgm);
// result conforms to Promises/A+ spec
if (
result &&
typeof result === "object" &&
typeof result.then === "function"
) {
result.then(resolve).catch(reject);
} else {
resolve();
}
}
}).then(() => {
const sqlSteps = pgm.getSqlSteps();

const schema = getMigrationTableSchema(this.options);
const { migrationsTable } = this.options;
const { name } = this;
switch (action) {
case this.down:
this.log(`### MIGRATION ${this.name} (DOWN) ###`);
sqlSteps.push(
`DELETE FROM "${schema}"."${migrationsTable}" WHERE name='${name}';`
);
break;
case this.up:
this.log(`### MIGRATION ${this.name} (UP) ###`);
sqlSteps.push(
`INSERT INTO "${schema}"."${migrationsTable}" (name, run_on) VALUES ('${name}', NOW());`
);
break;
default:
throw new Error("Unknown direction");
}

if (!this.options.singleTransaction && pgm.isUsingTransaction()) {
// if not in singleTransaction mode we need to create our own transaction
sqlSteps.unshift("BEGIN;");
sqlSteps.push("COMMIT;");
} else if (this.options.singleTransaction && !pgm.isUsingTransaction()) {
// in singleTransaction mode we are already wrapped in a global transaction
this.log("#> WARNING: Need to break single transaction! <");
sqlSteps.unshift("COMMIT;");
sqlSteps.push("BEGIN;");
} else if (!this.options.singleTransaction || !pgm.isUsingTransaction()) {
this.log(
"#> WARNING: This migration is not wrapped in a transaction! <"
async _apply(action, pgm) {
if (action.length === 2) {
await new Promise(resolve => action(pgm, resolve));
} else {
await action(pgm);
}

const sqlSteps = pgm.getSqlSteps();

const schema = getMigrationTableSchema(this.options);
const { migrationsTable } = this.options;
const { name } = this;
switch (action) {
case this.down:
this.log(`### MIGRATION ${this.name} (DOWN) ###`);
sqlSteps.push(
`DELETE FROM "${schema}"."${migrationsTable}" WHERE name='${name}';`
);
break;
case this.up:
this.log(`### MIGRATION ${this.name} (UP) ###`);
sqlSteps.push(
`INSERT INTO "${schema}"."${migrationsTable}" (name, run_on) VALUES ('${name}', NOW());`
);
}
break;
default:
throw new Error("Unknown direction");
}

if (!this.options.singleTransaction && pgm.isUsingTransaction()) {
// if not in singleTransaction mode we need to create our own transaction
sqlSteps.unshift("BEGIN;");
sqlSteps.push("COMMIT;");
} else if (this.options.singleTransaction && !pgm.isUsingTransaction()) {
// in singleTransaction mode we are already wrapped in a global transaction
this.log("#> WARNING: Need to break single transaction! <");
sqlSteps.unshift("COMMIT;");
sqlSteps.push("BEGIN;");
} else if (!this.options.singleTransaction || !pgm.isUsingTransaction()) {
this.log("#> WARNING: This migration is not wrapped in a transaction! <");
}

this.log(`${sqlSteps.join("\n")}\n\n`);
this.log(`${sqlSteps.join("\n")}\n\n`);

return sqlSteps.reduce(
(promise, sql) =>
promise.then(() => this.options.dryRun || this.db.query(sql)),
Promise.resolve()
);
});
return sqlSteps.reduce(
(promise, sql) =>
promise.then(() => this.options.dryRun || this.db.query(sql)),
Promise.resolve()
);
}

applyUp() {
4 changes: 2 additions & 2 deletions lib/operations/other.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { t } from "../utils";

// eslint-disable-next-line import/prefer-default-export
export const sql = (...args) => {
export function sql(...args) {
// applies some very basic templating using the utils.p
let s = t(...args);
// add trailing ; if not present
if (s.lastIndexOf(";") !== s.length - 1) {
s += ";";
}
return s;
};
}
339 changes: 158 additions & 181 deletions lib/runner.js
Original file line number Diff line number Diff line change
@@ -2,129 +2,115 @@ import path from "path";
import fs from "fs";
import Db from "./db";
import Migration from "./migration";
import { getMigrationTableSchema, finallyPromise, template } from "./utils";
import { getMigrationTableSchema, template, promisify } from "./utils";

export { PgLiteral } from "./utils";

// Random but well-known identifier shared by all instances of node-pg-migrate
const PG_MIGRATE_LOCK_ID = 7241865325823964;

const readdir = (...args) =>
new Promise((resolve, reject) =>
// eslint-disable-next-line security/detect-non-literal-fs-filename
fs.readdir(...args, (err, files) => (err ? reject(err) : resolve(files)))
);
const readdir = promisify(fs.readdir);
const lstat = promisify(fs.lstat);

const idColumn = "id";
const nameColumn = "name";
const runOnColumn = "run_on";

const loadMigrationFiles = (db, options, log) =>
readdir(`${options.dir}/`)
.then(files =>
Promise.all(
files.map(
file =>
new Promise((resolve, reject) =>
// eslint-disable-next-line security/detect-non-literal-fs-filename
fs.lstat(
`${options.dir}/${file}`,
(err, stats) =>
err ? reject(err) : resolve(stats.isFile() ? file : null)
)
)
)
const loadMigrationFiles = async (db, options, log) => {
try {
const files = await Promise.all(
(await readdir(`${options.dir}/`)).map(async file => {
const stats = await lstat(`${options.dir}/${file}`);
return stats.isFile() ? file : null;
})
);
const filter = new RegExp(`^(${options.ignorePattern})$`); // eslint-disable-line security/detect-non-literal-regexp
let shorthands = {};
return files
.filter(i => i && !filter.test(i))
.sort(
(f1, f2) =>
f1 < f2 // eslint-disable-line no-nested-ternary
? -1
: f1 > f2
? 1
: 0
)
)
.then(files => {
const filter = new RegExp(`^(${options.ignorePattern})$`); // eslint-disable-line security/detect-non-literal-regexp
let shorthands = {};
return files
.filter(i => i && !filter.test(i))
.sort(
(f1, f2) =>
f1 < f2 // eslint-disable-line no-nested-ternary
? -1
: f1 > f2
? 1
: 0
)
.map(file => {
const filePath = `${options.dir}/${file}`;
const actions =
path.extname(filePath) === ".sql"
? // eslint-disable-next-line security/detect-non-literal-fs-filename
{ up: pgm => pgm.sql(fs.readFileSync(filePath, "utf8")) }
: // eslint-disable-next-line global-require,import/no-dynamic-require,security/detect-non-literal-require
require(path.relative(__dirname, filePath));
shorthands = { ...shorthands, ...actions.shorthands };
return new Migration(
db,
filePath,
actions,
options,
{
...shorthands
},
log
);
});
})
.catch(err => {
throw new Error(`Can't get migration files: ${err.stack}`);
});
.map(file => {
const filePath = `${options.dir}/${file}`;
const actions =
path.extname(filePath) === ".sql"
? // eslint-disable-next-line security/detect-non-literal-fs-filename
{ up: pgm => pgm.sql(fs.readFileSync(filePath, "utf8")) }
: // eslint-disable-next-line global-require,import/no-dynamic-require,security/detect-non-literal-require
require(path.relative(__dirname, filePath));
shorthands = { ...shorthands, ...actions.shorthands };
return new Migration(
db,
filePath,
actions,
options,
{
...shorthands
},
log
);
});
} catch (err) {
throw new Error(`Can't get migration files: ${err.stack}`);
}
};

const lock = db =>
db
.query(
`select pg_try_advisory_lock(${PG_MIGRATE_LOCK_ID}) as lock_obtained`
)
.then(result => {
if (!result.rows[0].lock_obtained) {
throw new Error("Another migration is already running");
}
});
const lock = async db => {
const {
rows: [lockObtained]
} = await db.query(
`select pg_try_advisory_lock(${PG_MIGRATE_LOCK_ID}) as "lockObtained"`
);
if (!lockObtained) {
throw new Error("Another migration is already running");
}
};

const getRunMigrations = async (db, options) => {
try {
const schema = getMigrationTableSchema(options);
const { migrationsTable } = options;
const fullTableName = {
schema,
name: migrationsTable
};

const getRunMigrations = (db, options) => {
const schema = getMigrationTableSchema(options);
const { migrationsTable } = options;
const fullTableName = {
schema,
name: migrationsTable
};
return db
.select(
const migrationTables = await db.select(
`SELECT table_name FROM information_schema.tables WHERE table_schema = '${schema}' AND table_name = '${migrationsTable}'`
)
.then(
migrationTables =>
migrationTables && migrationTables.length === 1
? db
.select(
`SELECT constraint_name FROM information_schema.table_constraints WHERE table_schema = '${schema}' AND table_name = '${migrationsTable}' AND constraint_type = 'PRIMARY KEY'`
)
.then(
primaryKeyConstraints =>
(primaryKeyConstraints &&
primaryKeyConstraints.length === 1) ||
db.query(
template`ALTER TABLE "${fullTableName}" ADD PRIMARY KEY (${idColumn})`
)
)
: db.query(
template`CREATE TABLE "${fullTableName}" ( ${idColumn} SERIAL PRIMARY KEY, ${nameColumn} varchar(255) NOT NULL, ${runOnColumn} timestamp NOT NULL)`
)
)
.then(() => (!options.noLock ? lock(db, options) : null))
.then(() =>
db.column(
template`SELECT ${nameColumn} FROM "${fullTableName}" ORDER BY ${runOnColumn}, ${idColumn}`,
nameColumn
)
)
.catch(err => {
throw new Error(`Unable to fetch migrations: ${err.stack}`);
});
);

if (migrationTables && migrationTables.length === 1) {
const primaryKeyConstraints = await db.select(
`SELECT constraint_name FROM information_schema.table_constraints WHERE table_schema = '${schema}' AND table_name = '${migrationsTable}' AND constraint_type = 'PRIMARY KEY'`
);
if (!primaryKeyConstraints || primaryKeyConstraints.length !== 1) {
await db.query(
template`ALTER TABLE "${fullTableName}" ADD PRIMARY KEY (${idColumn})`
);
}
} else {
await db.query(
template`CREATE TABLE "${fullTableName}" ( ${idColumn} SERIAL PRIMARY KEY, ${nameColumn} varchar(255) NOT NULL, ${runOnColumn} timestamp NOT NULL)`
);
}

if (!options.noLock) {
await lock(db, options);
}

return await db.column(
template`SELECT ${nameColumn} FROM "${fullTableName}" ORDER BY ${runOnColumn}, ${idColumn}`,
nameColumn
);
} catch (err) {
throw new Error(`Unable to fetch migrations: ${err.stack}`);
}
};

const getMigrationsToRun = (options, runNames, migrations) => {
@@ -164,87 +150,78 @@ const getMigrationsToRun = (options, runNames, migrations) => {
);
};

const ifSingleTransaction = (operation, options, db) =>
options.singleTransaction ? db.query(operation) : Promise.resolve();

export default options => {
export default async options => {
const log = options.log || console.log;
const db = Db(options.databaseUrl, log);
return Promise.resolve()
.then(() => {
let promise = Promise.resolve();
if (options.schema) {
if (options.createSchema) {
promise = promise.then(() =>
db.query(`CREATE SCHEMA IF NOT EXISTS "${options.schema}"`)
);
}
promise = promise.then(() =>
db.query(`SET SCHEMA '${options.schema}'`)
);
try {
if (options.schema) {
if (options.createSchema) {
await db.query(`CREATE SCHEMA IF NOT EXISTS "${options.schema}"`);
}
if (options.migrationsSchema && options.createMigrationsSchema) {
promise = promise.then(() =>
db.query(`CREATE SCHEMA IF NOT EXISTS "${options.migrationsSchema}"`)
);
}
return promise;
})
.then(() =>
Promise.all([
loadMigrationFiles(db, options, log),
getRunMigrations(db, options)
])
)
.then(([migrations, runNames]) => {
if (options.checkOrder) {
const len = Math.min(runNames.length, migrations.length);
for (let i = 0; i < len; i += 1) {
const runName = runNames[i];
const migrationName = migrations[i].name;
if (runName !== migrationName) {
throw new Error(
`Not run migration ${migrationName} is preceding already run migration ${runName}`
);
}
await db.query(`SET SCHEMA '${options.schema}'`);
}
if (options.migrationsSchema && options.createMigrationsSchema) {
await db.query(
`CREATE SCHEMA IF NOT EXISTS "${options.migrationsSchema}"`
);
}
const [migrations, runNames] = await Promise.all([
loadMigrationFiles(db, options, log),
getRunMigrations(db, options)
]);
if (options.checkOrder) {
const len = Math.min(runNames.length, migrations.length);
for (let i = 0; i < len; i += 1) {
const runName = runNames[i];
const migrationName = migrations[i].name;
if (runName !== migrationName) {
throw new Error(
`Not run migration ${migrationName} is preceding already run migration ${runName}`
);
}
}
}

const toRun = getMigrationsToRun(options, runNames, migrations);
const toRun = getMigrationsToRun(options, runNames, migrations);

if (!toRun.length) {
log("No migrations to run!");
return null;
}
if (!toRun.length) {
log("No migrations to run!");
return;
}

// TODO: add some fancy colors to logging
log("> Migrating files:");
toRun.forEach(m => {
log(`> - ${m.name}`);
});
// TODO: add some fancy colors to logging
log("> Migrating files:");
toRun.forEach(m => {
log(`> - ${m.name}`);
});

return ifSingleTransaction("BEGIN", options, db)
.then(() =>
toRun.reduce(
(promise, migration) =>
promise.then(
() =>
options.direction === "up"
? migration.applyUp()
: migration.applyDown()
),
Promise.resolve()
)
)
.then(() => ifSingleTransaction("COMMIT", options, db));
})
.catch(e => {
log("> Rolling back attempted migration ...");
return db.query("ROLLBACK").then(
...finallyPromise(() => {
throw e;
})
if (options.singleTransaction) {
await db.query("BEGIN");
}

try {
await toRun.reduce(
(promise, migration) =>
promise.then(
() =>
options.direction === "up"
? migration.applyUp()
: migration.applyDown()
),
Promise.resolve()
);
})
.then(...finallyPromise(db.close));

if (options.singleTransaction) {
await db.query("COMMIT");
}
} catch (err) {
if (options.singleTransaction) {
log("> Rolling back attempted migration ...");
await db.query("ROLLBACK");
}
throw err;
}
} finally {
db.close();
}
};
26 changes: 9 additions & 17 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -91,23 +91,6 @@ export const getMigrationTableSchema = options =>
? options.schema
: "public";

export const finallyPromise = func => [
func,
err => {
const errHandler = innerErr => {
console.error(innerErr.stack ? innerErr.stack : innerErr);
throw err;
};
try {
return Promise.resolve(func()).then(() => {
throw err;
}, errHandler);
} catch (innerErr) {
return errHandler(innerErr);
}
}
];

export const quote = array => array.map(item => template`"${item}"`);

const typeAdapters = {
@@ -175,3 +158,12 @@ export const formatLines = (lines, replace = " ", separator = ",") =>
.map(line => line.replace(/(?:\r\n|\r|\n)+/g, " "))
.join(`${separator}\n`)
.replace(/^/gm, replace);

export const promisify = fn => (...args) =>
new Promise((resolve, reject) =>
fn.call(
this,
...args,
(err, ...result) => (err ? reject(err) : resolve(...result))
)
);