Skip to content

Commit

Permalink
Using async/await
Browse files Browse the repository at this point in the history
  • Loading branch information
dolezel committed Jun 25, 2018
1 parent a4b6b7b commit 79fc20d
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 299 deletions.
6 changes: 2 additions & 4 deletions bin/node-pg-migrate
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
const util = require("util");
const path = require("path");
const yargs = require("yargs");
const Migration = require("../dist/migration").default; // eslint-disable-line import/no-unresolved,import/extensions
const runner = require("../dist/runner"); // eslint-disable-line import/no-unresolved,import/extensions

const migrationRunner = runner.default;
const { default: Migration } = require("../dist/migration"); // eslint-disable-line import/no-unresolved,import/extensions
const { default: migrationRunner } = require("../dist/runner"); // eslint-disable-line import/no-unresolved,import/extensions

process.on("uncaughtException", err => {
console.log(err.stack);
Expand Down
76 changes: 40 additions & 36 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import pg from "pg";

export default (connectionString, log = console.error) => {
const client = new pg.Client(connectionString);

let clientActive = false;
const beforeCloseListeners = [];

Expand All @@ -26,38 +27,43 @@ export default (connectionString, log = console.error) => {
})
);

const query = (...args) =>
createConnection()
.then(() => client.query(...args))
.catch(err => {
const { message, position } = err;
const string = args[0].text || args[0];
if (message && position >= 1) {
const endLineWrapIndexOf = string.indexOf("\n", position);
const endLineWrapPos =
endLineWrapIndexOf >= 0 ? endLineWrapIndexOf : string.length;
const stringStart = string.substring(0, endLineWrapPos);
const stringEnd = string.substr(endLineWrapPos);
const startLineWrapPos = stringStart.lastIndexOf("\n") + 1;
const padding = " ".repeat(position - startLineWrapPos - 1);
log(`Error executing:
const query = async (...args) => {
await createConnection();
try {
return await client.query(...args);
} catch (err) {
const { message, position } = err;
const string = args[0].text || args[0];
if (message && position >= 1) {
const endLineWrapIndexOf = string.indexOf("\n", position);
const endLineWrapPos =
endLineWrapIndexOf >= 0 ? endLineWrapIndexOf : string.length;
const stringStart = string.substring(0, endLineWrapPos);
const stringEnd = string.substr(endLineWrapPos);
const startLineWrapPos = stringStart.lastIndexOf("\n") + 1;
const padding = " ".repeat(position - startLineWrapPos - 1);
log(`Error executing:
${stringStart}
${padding}^^^^${stringEnd}
${message}
`);
} else {
log(`Error executing:
} else {
log(`Error executing:
${string}
${err}
`);
}
throw err;
});
}
throw err;
}
};

const select = string => query(string).then(result => result && result.rows);
const column = (string, columnName) =>
select(string).then(rows => rows.map(r => r[columnName]));
const select = async string => {
const { rows } = await query(string);
return rows;
};
const column = async (string, columnName) =>
(await select(string)).map(r => r[columnName]);

return {
query,
Expand All @@ -66,18 +72,16 @@ ${err}

addBeforeCloseListener: listener => beforeCloseListeners.push(listener),

close: () =>
beforeCloseListeners
.reduce(
(promise, listener) =>
promise.then(listener).catch(err => log(err.stack || err)),
Promise.resolve()
)
.then(() => {
clientActive = false;
if (client) {
client.end();
}
})
close: async () => {
await beforeCloseListeners.reduce(
(promise, listener) =>
promise.then(listener).catch(err => log(err.stack || err)),
Promise.resolve()
);
clientActive = false;
if (client) {
client.end();
}
}
};
};
106 changes: 46 additions & 60 deletions lib/migration.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,69 +53,55 @@ class Migration {
this.log = log;
}

_apply(action, pgm) {
return new Promise((resolve, reject) => {
if (action.length === 2) {
action(pgm, resolve);
} else {
const result = action(pgm);
// result conforms to Promises/A+ spec
if (
result &&
typeof result === "object" &&
typeof result.then === "function"
) {
result.then(resolve).catch(reject);
} else {
resolve();
}
}
}).then(() => {
const sqlSteps = pgm.getSqlSteps();

const schema = getMigrationTableSchema(this.options);
const { migrationsTable } = this.options;
const { name } = this;
switch (action) {
case this.down:
this.log(`### MIGRATION ${this.name} (DOWN) ###`);
sqlSteps.push(
`DELETE FROM "${schema}"."${migrationsTable}" WHERE name='${name}';`
);
break;
case this.up:
this.log(`### MIGRATION ${this.name} (UP) ###`);
sqlSteps.push(
`INSERT INTO "${schema}"."${migrationsTable}" (name, run_on) VALUES ('${name}', NOW());`
);
break;
default:
throw new Error("Unknown direction");
}

if (!this.options.singleTransaction && pgm.isUsingTransaction()) {
// if not in singleTransaction mode we need to create our own transaction
sqlSteps.unshift("BEGIN;");
sqlSteps.push("COMMIT;");
} else if (this.options.singleTransaction && !pgm.isUsingTransaction()) {
// in singleTransaction mode we are already wrapped in a global transaction
this.log("#> WARNING: Need to break single transaction! <");
sqlSteps.unshift("COMMIT;");
sqlSteps.push("BEGIN;");
} else if (!this.options.singleTransaction || !pgm.isUsingTransaction()) {
this.log(
"#> WARNING: This migration is not wrapped in a transaction! <"
async _apply(action, pgm) {
if (action.length === 2) {
await new Promise(resolve => action(pgm, resolve));
} else {
await action(pgm);
}

const sqlSteps = pgm.getSqlSteps();

const schema = getMigrationTableSchema(this.options);
const { migrationsTable } = this.options;
const { name } = this;
switch (action) {
case this.down:
this.log(`### MIGRATION ${this.name} (DOWN) ###`);
sqlSteps.push(
`DELETE FROM "${schema}"."${migrationsTable}" WHERE name='${name}';`
);
break;
case this.up:
this.log(`### MIGRATION ${this.name} (UP) ###`);
sqlSteps.push(
`INSERT INTO "${schema}"."${migrationsTable}" (name, run_on) VALUES ('${name}', NOW());`
);
}
break;
default:
throw new Error("Unknown direction");
}

if (!this.options.singleTransaction && pgm.isUsingTransaction()) {
// if not in singleTransaction mode we need to create our own transaction
sqlSteps.unshift("BEGIN;");
sqlSteps.push("COMMIT;");
} else if (this.options.singleTransaction && !pgm.isUsingTransaction()) {
// in singleTransaction mode we are already wrapped in a global transaction
this.log("#> WARNING: Need to break single transaction! <");
sqlSteps.unshift("COMMIT;");
sqlSteps.push("BEGIN;");
} else if (!this.options.singleTransaction || !pgm.isUsingTransaction()) {
this.log("#> WARNING: This migration is not wrapped in a transaction! <");
}

this.log(`${sqlSteps.join("\n")}\n\n`);
this.log(`${sqlSteps.join("\n")}\n\n`);

return sqlSteps.reduce(
(promise, sql) =>
promise.then(() => this.options.dryRun || this.db.query(sql)),
Promise.resolve()
);
});
return sqlSteps.reduce(
(promise, sql) =>
promise.then(() => this.options.dryRun || this.db.query(sql)),
Promise.resolve()
);
}

applyUp() {
Expand Down
4 changes: 2 additions & 2 deletions lib/operations/other.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { t } from "../utils";

// eslint-disable-next-line import/prefer-default-export
export const sql = (...args) => {
export function sql(...args) {
// applies some very basic templating using the utils.p
let s = t(...args);
// add trailing ; if not present
if (s.lastIndexOf(";") !== s.length - 1) {
s += ";";
}
return s;
};
}
Loading

0 comments on commit 79fc20d

Please sign in to comment.