Skip to content

Commit

Permalink
feat(mongo): support migrate virtualKey (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hieuzest authored Aug 21, 2023
1 parent 0bdc47e commit 4e4472f
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 14 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class Model<S = any> {

this.primary = primary || this.primary
this.autoInc = autoInc || this.autoInc
this.unique.push(...unique)
unique.forEach(key => this.unique.includes(key) || this.unique.push(key))
Object.assign(this.foreign, foreign)

if (callback) this.migrations.set(callback, Object.keys(fields))
Expand Down
78 changes: 65 additions & 13 deletions packages/mongo/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BSONType, Collection, Db, IndexDescription, MongoClient, MongoError } from 'mongodb'
import { Dict, makeArray, noop, omit, pick } from 'cosmokit'
import { Dict, isNullable, makeArray, noop, omit, pick } from 'cosmokit'
import { Database, Driver, Eval, executeEval, executeUpdate, Query, RuntimeError, Selection } from '@minatojs/core'
import { URLSearchParams } from 'url'
import { Transformer } from './utils'
Expand Down Expand Up @@ -125,7 +125,9 @@ export class MongoDriver extends Driver {
const { fields } = this.model(table)
const coll = this.db.collection(table)
const bulk = coll.initializeOrderedBulkOp()
const virtualKey = this.getVirtualKey(table)
for (const key in fields) {
if (virtualKey === key) continue
const { initial, legacy = [], deprecated } = fields[key]!
if (deprecated) continue
const filter = { [key]: { $exists: false } }
Expand All @@ -144,26 +146,75 @@ export class MongoDriver extends Driver {
if (bulk.batches.length) await bulk.execute()
}

private async _migrateVirtual(table: string) {
const { primary } = this.model(table)
if (Array.isArray(primary)) return
const fields = this.db.collection('_fields')
const meta: Dict = { table, field: primary }
const found = await fields.findOne(meta)
let virtual = !!found?.virtual
// If _fields table was missing for any reason
// Test the type of _id to get its possible preference
if (!found) {
const doc = await this.db.collection(table).findOne()
if (doc) virtual = typeof doc._id !== 'object'
else {
// Empty collection, just set meta and return
fields.updateOne(meta, { $set: { virtual: this.config.optimizeIndex } }, { upsert: true })
logger.info('Successfully reconfigured table %s', table)
return
}
}

if (virtual === !!this.config.optimizeIndex) return
logger.info('Start migrating table %s', table)

if (found?.migrate && await this.db.listCollections({ name: '_migrate_' + table }).hasNext()) {
logger.info('Last time crashed, recover')
} else {
await this.db.dropCollection('_migrate_' + table).catch(noop)
await this.db.collection(table).aggregate([
{ $addFields: { _temp_id: '$_id' } },
{ $unset: ['_id'] },
{ $addFields: this.config.optimizeIndex ? { _id: '$' + primary } : { [primary]: '$_temp_id' } },
{ $unset: ['_temp_id', ...this.config.optimizeIndex ? [primary] : []] },
{ $out: '_migrate_' + table },
]).toArray()
await fields.updateOne(meta, { $set: { migrate: true } }, { upsert: true })
}
await this.db.dropCollection(table).catch(noop)
await this.db.renameCollection('_migrate_' + table, table)
await fields.updateOne(meta,
{ $set: { virtual: this.config.optimizeIndex, migrate: false } },
{ upsert: true },
)
logger.info('Successfully migrated table %s', table)
}

private async _migratePrimary(table: string) {
const { primary, autoInc } = this.model(table)
if (Array.isArray(primary) || !autoInc) return
const fields = this.db.collection('_fields')
const meta: Dict = { table, field: primary }
const found = await fields.findOne(meta)
if (found) return
if (!isNullable(found?.autoInc)) return

const coll = this.db.collection(table)
const bulk = coll.initializeOrderedBulkOp()
await coll.find().forEach((data) => {
bulk
.find({ [primary]: data[primary] })
.update({ $set: { [primary]: +data[primary] } })
})
if (bulk.batches.length) await bulk.execute()
// Primary _id cannot be modified thus should always meet the requirements
if (!this.config.optimizeIndex) {
const bulk = coll.initializeOrderedBulkOp()
await coll.find().forEach((data) => {
bulk
.find({ [primary]: data[primary] })
.update({ $set: { [primary]: +data[primary] } })
})
if (bulk.batches.length) await bulk.execute()
}

const [latest] = await coll.find().sort(primary, -1).limit(1).toArray()
meta.autoInc = latest ? +latest[primary] : 0
await fields.insertOne(meta)
const [latest] = await coll.find().sort(this.config.optimizeIndex ? '_id' : primary, -1).limit(1).toArray()
await fields.updateOne(meta, {
$set: { autoInc: latest ? +latest[this.config.optimizeIndex ? '_id' : primary] : 0 },
}, { upsert: true })
}

private _internalTableTask?: Promise<Collection<Document>>
Expand All @@ -179,6 +230,7 @@ export class MongoDriver extends Driver {
this.db.createCollection(table).catch(noop),
])

await this._migrateVirtual(table)
await Promise.all([
this._createIndexes(table),
this._createFields(table),
Expand Down Expand Up @@ -427,7 +479,7 @@ export class MongoDriver extends Driver {
{ upsert: true },
)
for (let i = 1; i <= missing.length; i++) {
missing[i - 1][primary] = value!.autoInc + i
missing[i - 1][primary] = (value!.autoInc ?? 0) + i
}
}
}
Expand Down
95 changes: 95 additions & 0 deletions packages/mongo/tests/migration.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { Database } from 'minato'
import Logger from 'reggol'
import { expect } from 'chai'
import { } from 'chai-shape'
import { MongoDriver } from '@minatojs/driver-mongo'

const logger = new Logger('mongo')

interface Foo {
id?: number
text?: string
value?: number
bool?: boolean
list?: number[]
timestamp?: Date
date?: Date
time?: Date
regex?: string
}

interface Tables {
temp1: Foo
}

describe('@minatojs/driver-mongo/migrate-virtualKey', () => {
const database: Database<Tables> = new Database()

const initialize = async (optimizeIndex: boolean) => {
logger.level = 3
await database.connect('mongo', {
host: 'localhost',
port: 27017,
database: 'test',
optimizeIndex: optimizeIndex,
})
}

const finalize = async () => {
await database.stopAll()
logger.level = 2
}

after(async () => {
await database.dropAll()
await database.stopAll()
logger.level = 2
})

it('reset optimizeIndex', async () => {
await initialize(false)

database.extend('temp1', {
id: 'unsigned',
text: 'string',
value: 'integer',
bool: 'boolean',
list: 'list',
timestamp: 'timestamp',
date: 'date',
time: 'time',
regex: 'string',
}, {
autoInc: true,
})

const table: Foo[] = []
table.push(await database.create('temp1', {
text: 'awesome foo',
timestamp: new Date('2000-01-01'),
date: new Date('2020-01-01'),
time: new Date('2020-01-01 12:00:00'),
}))
table.push(await database.create('temp1', { text: 'awesome bar' }))
table.push(await database.create('temp1', { text: 'awesome baz' }))
await expect(database.get('temp1', {})).to.eventually.have.shape(table)

await finalize()
await initialize(true)
await expect(database.get('temp1', {})).to.eventually.have.shape(table)

await finalize()
await initialize(false)
await expect(database.get('temp1', {})).to.eventually.have.shape(table)

await (Object.values(database.drivers)[0] as MongoDriver).drop('_fields')
await finalize()
await initialize(true)
await expect(database.get('temp1', {})).to.eventually.have.shape(table)

await (Object.values(database.drivers)[0] as MongoDriver).drop('_fields')
await finalize()
await initialize(false)
await expect(database.get('temp1', {})).to.eventually.have.shape(table)
})
})

0 comments on commit 4e4472f

Please sign in to comment.