Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the migration script more robust #675

Merged
merged 6 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bin/u-wave-core.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const envSchema = {
type: 'number',
default: 6042,
},
MONGODB_URL: {
type: 'string',
format: 'uri',
},
REDIS_URL: {
type: 'string',
format: 'uri',
Expand Down Expand Up @@ -108,6 +112,8 @@ const uw = uwave({
redis: config.REDIS_URL,
sqlite: config.SQLITE_PATH,
secret,
// This property is untyped, it is propagated to the also-untyped MongoDB -> SQL migration
mongo: config.MONGODB_URL,
});

uw.on('redisError', (err) => {
Expand Down
112 changes: 84 additions & 28 deletions src/migrations/003-populate-sql.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,42 @@ const userSchema = new mongoose.Schema({
minimize: false,
});

async function* asyncChunks(iter, chunkSize) {
let chunk = [];
for await (const element of iter) {
chunk.push(element);
if (chunk.length >= chunkSize) {
yield chunk;
chunk = [];
}
}
if (chunk.length > 0) {
yield chunk;
}
}

function zip(a, b) {
const iterA = a[Symbol.iterator]();
const iterB = b[Symbol.iterator]();
const iter = {
next() {
const itemA = iterA.next();
const itemB = iterB.next();

if (itemA.done !== itemB.done) {
throw new Error('zip: iterators have different lengths');
}

return {
value: [itemA.value, itemB.value],
done: itemA.done,
};
},
};
iter[Symbol.iterator] = () => iter;
return iter;
}

/**
* @param {import('umzug').MigrationParams<import('../Uwave').default>} params
*/
Expand Down Expand Up @@ -427,7 +463,13 @@ async function up({ context: uw }) {
const motd = await uw.redis.get('motd');

/** @type {Map<string, string>} */
const idMap = new Map();
const mediaIDs = new Map();
/** @type {Map<string, string>} */
const userIDs = new Map();
/** @type {Map<string, string>} */
const playlistIDs = new Map();
/** @type {Map<string, string>} */
const playlistItemIDs = new Map();

await db.transaction().execute(async (tx) => {
for await (const config of models.Config.find().lean()) {
Expand All @@ -443,27 +485,29 @@ async function up({ context: uw }) {
.execute();
}

for await (const media of models.Media.find().lean()) {
const id = randomUUID();
await tx.insertInto('media')
.values({
id,
for await (const medias of asyncChunks(models.Media.find().lean(), 50)) {
const rows = await tx.insertInto('media')
.values(medias.map((media) => ({
id: randomUUID(),
sourceType: media.sourceType,
sourceID: media.sourceID,
sourceData: jsonb(media.sourceData),
artist: media.artist,
title: media.title,
title: media.title ?? '',
duration: media.duration,
thumbnail: media.thumbnail,
createdAt: media.createdAt.toISOString(),
updatedAt: media.updatedAt.toISOString(),
})
createdAt: (media.createdAt ?? media.updatedAt ?? new Date()).toISOString(),
updatedAt: (media.updatedAt ?? new Date()).toISOString(),
})))
.onConflict((conflict) => conflict.columns(['sourceType', 'sourceID']).doUpdateSet({
updatedAt: (eb) => eb.ref('excluded.updatedAt'),
}))
.returning('id')
.execute();

idMap.set(media._id.toString(), id);
for (const [media, row] of zip(medias, rows)) {
mediaIDs.set(media._id.toString(), row.id);
}
}

const roles = await models.AclRole.find().lean();
Expand Down Expand Up @@ -491,15 +535,15 @@ async function up({ context: uw }) {

for await (const user of models.User.find().lean()) {
const userID = randomUUID();
idMap.set(user._id.toString(), userID);
userIDs.set(user._id.toString(), userID);

await tx.insertInto('users')
.values({
id: userID,
username: user.username,
slug: user.slug,
createdAt: user.createdAt.toISOString(),
updatedAt: user.updatedAt.toISOString(),
updatedAt: (user.updatedAt ?? user.createdAt).toISOString(),
})
.execute();

Expand All @@ -511,35 +555,38 @@ async function up({ context: uw }) {

for await (const playlist of models.Playlist.where('author', user._id).lean()) {
const playlistID = randomUUID();
idMap.set(playlist._id.toString(), playlistID);
playlistIDs.set(playlist._id.toString(), playlistID);

await tx.insertInto('playlists')
.values({
id: playlistID,
name: playlist.name,
userID,
createdAt: playlist.createdAt.toISOString(),
updatedAt: playlist.updatedAt.toISOString(),
// Old objects use the `.created` property
createdAt: (playlist.createdAt ?? playlist.created).toISOString(),
updatedAt: (playlist.updatedAt ?? playlist.created).toISOString(),
})
.execute();

const items = [];
for (const itemMongoID of playlist.media) {
const itemID = randomUUID();
idMap.set(itemMongoID.toString(), itemID);
playlistItemIDs.set(itemMongoID.toString(), itemID);

const item = await models.PlaylistItem.findById(itemMongoID).lean();
const mediaID = mediaIDs.get(item.media.toString());

await tx.insertInto('playlistItems')
.values({
id: itemID,
playlistID,
mediaID: idMap.get(item.media.toString()),
mediaID,
artist: item.artist,
title: item.title,
start: item.start,
end: item.end,
createdAt: item.createdAt.toISOString(),
updatedAt: item.updatedAt.toISOString(),
end: item.end ?? 0, // Not ideal, but what can we do
createdAt: (item.createdAt ?? item.updatedAt ?? new Date()).toISOString(),
updatedAt: (item.updatedAt ?? new Date()).toISOString(),
})
.execute();

Expand All @@ -551,10 +598,20 @@ async function up({ context: uw }) {
.set({ items: jsonb(items) })
.execute();
}

if (user.activePlaylist != null) {
const activePlaylistID = playlistIDs.get(user.activePlaylist.toString());
if (activePlaylistID != null) {
await tx.updateTable('users')
.where('id', '=', userID)
.set({ activePlaylistID })
.execute();
}
}
}

for await (const entry of models.Authentication.find().lean()) {
const userID = idMap.get(entry.user.toString());
const userID = userIDs.get(entry.user.toString());
if (userID == null) {
throw new Error('Migration failure: unknown user ID');
}
Expand All @@ -576,9 +633,8 @@ async function up({ context: uw }) {

for await (const entry of models.HistoryEntry.find().lean()) {
const entryID = randomUUID();
idMap.set(entry._id.toString(), entryID);
const userID = idMap.get(entry.user.toString());
const mediaID = idMap.get(entry.media.media.toString());
const userID = userIDs.get(entry.user.toString());
const mediaID = mediaIDs.get(entry.media.media.toString());
await tx.insertInto('historyEntries')
.values({
id: entryID,
Expand All @@ -597,14 +653,14 @@ async function up({ context: uw }) {
for (const id of entry.upvotes) {
feedback.set(id.toString(), {
historyEntryID: entryID,
userID: idMap.get(id.toString()),
userID: userIDs.get(id.toString()),
vote: 1,
});
}
for (const id of entry.downvotes) {
feedback.set(id.toString(), {
historyEntryID: entryID,
userID: idMap.get(id.toString()),
userID: userIDs.get(id.toString()),
vote: -1,
});
}
Expand All @@ -615,7 +671,7 @@ async function up({ context: uw }) {
} else {
feedback.set(id.toString(), {
historyEntryID: entryID,
userID: idMap.get(id.toString()),
userID: userIDs.get(id.toString()),
favorite: 1,
});
}
Expand Down