-
Notifications
You must be signed in to change notification settings - Fork 0
/
cli.js
269 lines (234 loc) · 9.08 KB
/
cli.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#!/usr/bin/env node
const {resolve} = require("path");
const fs = require("fs");
// parse cli arguments
const args = require("./parse-argv");
/*
* Run the program in async
* */
(async function run() {
// get the full path to the migrations folder
const pathToMigrations = resolve(process.cwd(), args.migrations || "postgres_migrations");
// check if the migrations directory path is valid
if(!fs.existsSync(pathToMigrations)) {
console.error("Directory could not be found. Invalid path: %s", pathToMigrations)
return process.exit(1);
}
switch(args.cmd) {
case "create":
await create(pathToMigrations);
break;
case "commit":
await commitOrRollback({
pathToMigrations,
prompt: "Please select a migration point to commit up to:",
});
break;
case "rollback":
await commitOrRollback({
pathToMigrations,
prompt: "Please select a migration point to roll back past:",
});
break;
// process-argv will throw before we get here so no need to throw again
default: return;
}
})();
/*
* Create a new migration
* */
async function create(pathToMigrations) {
// prompt the user for a migration and timestamp it
process.stdout.write("Enter migration name: ");
let migrationName = (await promptUser()) || "new_migration";
migrationName = JSON.parse(JSON.stringify(migrationName))
.replace(/[\r\n]/g, "")
.replace(/\s/g, "_");
migrationName = migrationName || "new_migration";
migrationName = `${Date.now()}_${migrationName}`;
// check if the proposed new dir already exists
const newDir = resolve(pathToMigrations, migrationName);
if(fs.existsSync(newDir)) {
console.error("Cannot create %s: already exists", newDir);
process.exit(1);
}
try {
// create the root migrations directory if it doesn't exist yet
!fs.existsSync(pathToMigrations) && fs.mkdirSync(pathToMigrations);
// make the directory and files
fs.mkdirSync(newDir);
fs.writeFileSync(resolve(newDir, `${migrationName}_commit.sql`), "");
fs.writeFileSync(resolve(newDir, `${migrationName}_rollback.sql`), "");
console.info("Successfully created migration: %s", migrationName);
} catch (e) {
console.error("Error when attempting to create migration files: %s", e.toString());
process.exit(1);
} finally {
// pause stdin to cease execution
process.stdin.pause();
}
}
/*
* Commit or roll back all migrations up to a migration selected by the user
* */
async function commitOrRollback({ pathToMigrations, prompt }) {
const {Pool} = require("pg");
const pool = new Pool();
// initialise the dotenv config if there is one
const pathToEnv = resolve(process.cwd(), args.env || ".env");
require("dotenv").config({ path: pathToEnv });
// test the database connection
try { await pool.query("SELECT 1") }
catch (e) {
const reasons = verifyEnv(pathToEnv).join("\n\t");
console.error("\nCould not connect to database. %s\n\nPossible reasons:\n\t%s", e.toString(), reasons);
process.exit(1);
}
// create the migrations table and a cursor if it doesn't exist
await pool.query(
"CREATE TABLE IF NOT EXISTS _migrations(cursor int, completed date DEFAULT now());" +
"INSERT INTO _migrations(cursor) SELECT -1 WHERE NOT exists(SELECT 1 FROM _migrations)"
);
// get the migration directories
let dirNames;
try {
dirNames = fs
.readdirSync(pathToMigrations)
.filter(subDir => !subDir.startsWith("."));
} catch (e) {
console.error("Reading migrations directory failed. %s", e.toString());
process.exit(1);
}
// get the current migration cursor from the _migrations table
const {rows} = await pool.query("SELECT cursor FROM _migrations");
const cursor = rows && rows[0] ? rows[0].cursor : -1;
console.info("\nCursor currently at: %s", dirNames[cursor] || "clean");
// get the subset of applicable migrations for this database
const subset = args.cmd === "commit"
? dirNames.slice(cursor + 1)
: dirNames.slice(0).reverse().slice(dirNames.length - (cursor + 1));
// return early if the database is already at the desired point
if(subset < 1) {
const reason = args.cmd === "commit"
? "the database is already fully migrated"
: "the database is already clean";
console.info("Cannot perform %s: %s", args.cmd, reason);
return process.exit();
}
// if the user isn't just choosing to fast forward the migration
// default the migration point to all migrations in the subset
let migrateTo = subset.length - 1;
if(!args.all) {
try {
// prompt the user to select a migration
console.log(prompt);
[{value: migrateTo}] = await spawnSelection(subset);
} catch (e) {
console.error("Nothing was selected: You must select a migration to go to", e.toString());
process.exit(1);
}
}
// get the new cursor which will be written to the database
const newCursor = args.cmd === "commit"
? dirNames.findIndex(d => d === subset[migrateTo])
: dirNames.findIndex(d => d === subset[migrateTo]) - 1;
// select the migration sql files up to the specified point
const migrationDirPaths = subset
.slice(0, migrateTo + 1)
.map(p => resolve(pathToMigrations, p));
// attempt to read the sql files
let sql;
try {
// create the migration file map
const fileArrays = await Promise.all(migrationDirPaths.map(readDirAsync));
const migrations = fileArrays
.map(files => args.cmd === "commit" ? files[0] : files[1])
.map((file, i) => resolve(migrationDirPaths[i], file));
// concatenate the files together
sql = await Promise.all(migrations.map(readFileAsync));
sql = sql.reduce((sql, content) => sql.concat(content, "\n"), "");
} catch(e) {
console.error("Error reading SQL migration files. %s", e.toString());
process.exit(1);
}
// declare the transaction client ahead of the try
const transaction = await debugClient(pool);
try {
console.info("\nStarting %s...", args.cmd);
// execute the migration transaction, updating the cursor
await transaction.query("BEGIN");
await transaction.query(sql);
await transaction.query("UPDATE _migrations SET cursor = $1, completed = now()", [newCursor]);
await transaction.query("COMMIT");
console.info("\nMigration successful. Cursor now at: %s", dirNames[newCursor] || "clean");
} catch(e) {
await transaction.query("ROLLBACK");
console.info("\nMigration unsuccessful - view the log above for details. Cursor remains unchanged");
console.error(`Error during migration transaction: ${e.toString()}`);
process.exit(1);
} finally {
transaction.release();
process.exit();
}
}
/*
* Helper functions
* */
// Prompt user for input
function promptUser() {
return new Promise(res => {
process.stdin.resume();
process.stdin.setEncoding("utf8");
process.stdin.on("data", res);
});
}
// Generate selection config from migration directory names
function spawnSelection(dirs) {
return new Promise((res, rej) => {
const select = require("select-shell")({ multiSelect: false });
select.on("select", res);
select.on("cancel", rej);
dirs.forEach((d, i) => select.option(d, i));
select.list();
});
}
// Promisifed filesystem helpers
function readFileAsync(path) {
return new Promise((res, rej) => {
fs.readFile(path, "utf8", (err, data) => {
if(err) return rej(err);
return res(data);
});
});
}
function readDirAsync(dir) {
return new Promise((res, rej) => {
fs.readdir(dir, "utf8", (err, files) => {
if(err) return rej(err);
return res(files);
});
});
}
// Postgres client which logs all messages it receives
async function debugClient(pool) {
const client = await pool.connect();
client.connection.on("message", m => {
if(m instanceof Error)
return console.error("postgres[error:%s] %s", m.code, m.toString());
if(!m.text) return;
console.info("postgres[message:%s]", m.name, m.text || "message");
});
return client;
}
// Verify env vars are correct
function verifyEnv(pathToEnv) {
let reasons = [];
const pgVars = ["PGDATABASE", "PGHOST", "PGPASSWORD", "PGPORT", "PGUSER"];
const varsNotPresent = pgVars.filter(pgv => !process.env[pgv] );
if(varsNotPresent.length > 0)
reasons.push(`Some pg environment variables are not present. Missing variables: ${varsNotPresent.join(", ")}`);
const envPathExists = fs.existsSync(pathToEnv);
if(!envPathExists)
reasons.push(`No env file was found at path ${pathToEnv}`);
return reasons;
}