From 005f720f7b990964832281aeca685b1ad2c022f3 Mon Sep 17 00:00:00 2001 From: Laurent Cozic Date: Thu, 11 Nov 2021 13:59:05 +0000 Subject: [PATCH] Server: Added command to migrate content to different storage --- .../src/commands/DeleteOldChangesCommand.ts | 2 +- .../src/commands/ImportContentCommand.ts | 54 +++++++++++++ packages/server/src/config.ts | 2 +- .../20211111134329_storage_index.ts | 14 ++++ packages/server/src/models/ItemModel.test.ts | 74 +++++++++++++++++- packages/server/src/models/ItemModel.ts | 76 ++++++++++++++++++- packages/server/src/models/StorageModel.ts | 4 + .../models/items/storage/loadStorageDriver.ts | 33 +++++--- ...s => parseStorageConnectionString.test.ts} | 16 ++-- ...ing.ts => parseStorageConnectionString.ts} | 1 + .../items/storage/serializeStorageConfig.ts | 1 + packages/server/src/utils/setupCommands.ts | 2 + .../server/src/utils/testing/testUtils.ts | 10 +-- 13 files changed, 261 insertions(+), 28 deletions(-) create mode 100644 packages/server/src/commands/ImportContentCommand.ts create mode 100644 packages/server/src/migrations/20211111134329_storage_index.ts rename packages/server/src/models/items/storage/{parseStorageDriverConnectionString.test.ts => parseStorageConnectionString.test.ts} (57%) rename packages/server/src/models/items/storage/{parseStorageDriverConnectionString.ts => parseStorageConnectionString.ts} (97%) diff --git a/packages/server/src/commands/DeleteOldChangesCommand.ts b/packages/server/src/commands/DeleteOldChangesCommand.ts index 6378d27fa41..87ceae92521 100644 --- a/packages/server/src/commands/DeleteOldChangesCommand.ts +++ b/packages/server/src/commands/DeleteOldChangesCommand.ts @@ -9,7 +9,7 @@ interface Argv { export default class DeleteOldChangesCommand extends BaseCommand { public command() { - return 'deleteOldChanges'; + return 'delete-old-changes'; } public description() { diff --git a/packages/server/src/commands/ImportContentCommand.ts b/packages/server/src/commands/ImportContentCommand.ts new file mode 100644 index 00000000000..05b7d036dd0 --- /dev/null +++ b/packages/server/src/commands/ImportContentCommand.ts @@ -0,0 +1,54 @@ +import { PositionalOptions, Options } from 'yargs'; +import Logger from '@joplin/lib/Logger'; +import BaseCommand, { RunContext } from './BaseCommand'; +import parseStorageConnectionString from '../models/items/storage/parseStorageConnectionString'; + +const logger = Logger.create('ImportContentCommand'); + +interface Argv { + toStorage: string; + batchSize?: number; +} + +export default class ImportContentCommand extends BaseCommand { + + public command() { + return 'import-content '; + } + + public description() { + return 'import content to storage'; + } + + public positionals(): Record { + return { + 'to-storage': { + description: 'storage connection string', + type: 'string', + }, + }; + } + + public options(): Record { + return { + 'batch-size': { + type: 'number', + description: 'Item batch size', + }, + }; + } + + public async run(argv: Argv, runContext: RunContext): Promise { + const toStorageConfig = parseStorageConnectionString(argv.toStorage); + const batchSize = argv.batchSize || 1000; + + logger.info('Importing to storage:', toStorageConfig); + logger.info(`Batch size: ${batchSize}`); + + await runContext.models.item().importContentToStorage(toStorageConfig, { + batchSize: batchSize || 1000, + logger: logger as Logger, + }); + } + +} diff --git a/packages/server/src/config.ts b/packages/server/src/config.ts index 89a03744957..1fc4bb339ae 100644 --- a/packages/server/src/config.ts +++ b/packages/server/src/config.ts @@ -3,7 +3,7 @@ import { Config, DatabaseConfig, DatabaseConfigClient, Env, MailerConfig, RouteT import * as pathUtils from 'path'; import { loadStripeConfig, StripePublicConfig } from '@joplin/lib/utils/joplinCloud'; import { EnvVariables } from './env'; -import parseStorageDriverConnectionString from './models/items/storage/parseStorageDriverConnectionString'; +import parseStorageDriverConnectionString from './models/items/storage/parseStorageConnectionString'; interface PackageJson { version: string; diff --git a/packages/server/src/migrations/20211111134329_storage_index.ts b/packages/server/src/migrations/20211111134329_storage_index.ts new file mode 100644 index 00000000000..8a4215be9b1 --- /dev/null +++ b/packages/server/src/migrations/20211111134329_storage_index.ts @@ -0,0 +1,14 @@ +import { Knex } from 'knex'; +import { DbConnection } from '../db'; + +export async function up(db: DbConnection): Promise { + await db.schema.alterTable('items', (table: Knex.CreateTableBuilder) => { + table.index('content_storage_id'); + }); +} + +export async function down(db: DbConnection): Promise { + await db.schema.alterTable('items', (table: Knex.CreateTableBuilder) => { + table.dropIndex('content_storage_id'); + }); +} diff --git a/packages/server/src/models/ItemModel.test.ts b/packages/server/src/models/ItemModel.test.ts index b4efbfd311a..dd673583790 100644 --- a/packages/server/src/models/ItemModel.test.ts +++ b/packages/server/src/models/ItemModel.test.ts @@ -1,6 +1,11 @@ -import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, createItem, createItemTree, createResource, createNote, createFolder, createItemTree3 } from '../utils/testing/testUtils'; +import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, createItem, createItemTree, createResource, createNote, createFolder, createItemTree3, db, tempDir } from '../utils/testing/testUtils'; import { shareFolderWithUser } from '../utils/testing/shareApiUtils'; import { resourceBlobPath } from '../utils/joplinUtils'; +import newModelFactory from './factory'; +import { StorageDriverType } from '../utils/types'; +import config from '../config'; +import { msleep } from '../utils/time'; +import loadStorageDriver from './items/storage/loadStorageDriver'; describe('ItemModel', function() { @@ -265,4 +270,71 @@ describe('ItemModel', function() { expect((await models().user().load(user3.id)).total_item_size).toBe(expected3); }); + test('should allow importing content to item storage', async function() { + const { user: user1 } = await createUserAndSession(1); + + const tempDir1 = await tempDir('storage1'); + const tempDir2 = await tempDir('storage2'); + + const fromStorageConfig = { + type: StorageDriverType.Filesystem, + path: tempDir1, + }; + + const models = newModelFactory(db(), { + ...config(), + storageDriver: fromStorageConfig, + }); + + await models.item().saveFromRawContent(user1, { + body: Buffer.from(JSON.stringify({ 'version': 1 })), + name: 'info.json', + }); + + const itemBefore = (await models.item().all())[0]; + + const fromDriver = await loadStorageDriver(fromStorageConfig, db()); + const fromContent = await fromDriver.read(itemBefore.id, { models }); + + expect(fromContent.toString()).toBe('{"version":1}'); + + expect(itemBefore.content_storage_id).toBe(1); + + await msleep(2); + + const toStorageConfig = { + type: StorageDriverType.Filesystem, + path: tempDir2, + }; + + const toModels = newModelFactory(db(), { + ...config(), + storageDriver: toStorageConfig, + }); + + const result = await toModels.item().saveFromRawContent(user1, { + body: Buffer.from(JSON.stringify({ 'version': 2 })), + name: 'info2.json', + }); + + const itemBefore2 = result['info2.json'].item; + + await models.item().importContentToStorage(toStorageConfig); + + const itemAfter = (await models.item().all()).find(it => it.id === itemBefore.id); + expect(itemAfter.content_storage_id).toBe(2); + expect(itemAfter.updated_time).toBe(itemBefore.updated_time); + + // Just check the second item has not been processed since it was + // already on the right storage + const itemAfter2 = (await models.item().all()).find(it => it.id === itemBefore2.id); + expect(itemAfter2.content_storage_id).toBe(2); + expect(itemAfter2.updated_time).toBe(itemBefore2.updated_time); + + const toDriver = await loadStorageDriver(toStorageConfig, db()); + const toContent = await toDriver.read(itemAfter.id, { models }); + + expect(toContent.toString()).toBe(fromContent.toString()); + }); + }); diff --git a/packages/server/src/models/ItemModel.ts b/packages/server/src/models/ItemModel.ts index a5f4bee4f79..e57739f0225 100644 --- a/packages/server/src/models/ItemModel.ts +++ b/packages/server/src/models/ItemModel.ts @@ -8,16 +8,23 @@ import { Knex } from 'knex'; import { ChangePreviousItem } from './ChangeModel'; import { unique } from '../utils/array'; import StorageDriverBase, { Context } from './items/storage/StorageDriverBase'; -import { DbConnection } from '../db'; +import { DbConnection, returningSupported } from '../db'; import { Config, StorageDriverConfig, StorageDriverMode } from '../utils/types'; import { NewModelFactoryHandler } from './factory'; import loadStorageDriver from './items/storage/loadStorageDriver'; +import { msleep } from '../utils/time'; +import Logger from '@joplin/lib/Logger'; const mimeUtils = require('@joplin/lib/mime-utils.js').mime; // Converts "root:/myfile.txt:" to "myfile.txt" const extractNameRegex = /^root:\/(.*):$/; +export interface ImportContentToStorageOptions { + batchSize?: number; + logger?: Logger; +} + export interface SaveFromRawContentItem { name: string; body: Buffer; @@ -268,6 +275,73 @@ export default class ItemModel extends BaseModel { } } + private async atomicMoveContent(item: Item, toDriver: StorageDriverBase, drivers: Record) { + for (let i = 0; i < 10; i++) { + let fromDriver: StorageDriverBase = drivers[item.content_storage_id]; + + if (!fromDriver) { + fromDriver = await loadStorageDriver(item.content_storage_id, this.db); + drivers[item.content_storage_id] = fromDriver; + } + + const content = await fromDriver.read(item.id, { models: this.models() }); + await toDriver.write(item.id, content, { models: this.models() }); + + const updatedRows = await this + .db(this.tableName) + .where('id', '=', item.id) + .where('updated_time', '=', item.updated_time) // Check that the row hasn't changed while we were transferring the content + .update({ content_storage_id: toDriver.storageId }, returningSupported(this.db) ? ['id'] : undefined); + + if (!returningSupported(this.db) || updatedRows.length) return; + + await msleep(1000 + 1000 * i); + } + + throw new Error(`Could not atomically update content for item: ${JSON.stringify(item)}`); + } + + // Loop throught the items in the database and import their content to the + // target storage. Only items not already in that storage will be processed. + public async importContentToStorage(toStorageConfig: StorageDriverConfig, options: ImportContentToStorageOptions = null) { + options = { + batchSize: 1000, + logger: new Logger(), + ...options, + }; + + const toStorageDriver = await this.loadStorageDriver(toStorageConfig); + const fromDrivers: Record = {}; + + const itemCount = (await this.db(this.tableName) + .count('id', { as: 'total' }) + .where('content_storage_id', '!=', toStorageDriver.storageId) + .first())['total']; + + let totalDone = 0; + + while (true) { + const items: Item[] = await this + .db(this.tableName) + .select(['id', 'content_storage_id', 'updated_time']) + .where('content_storage_id', '!=', toStorageDriver.storageId) + .limit(options.batchSize); + + options.logger.info(`Processing items ${totalDone} / ${itemCount}`); + + if (!items.length) { + options.logger.info(`All items have been processed. Total: ${totalDone}`); + return; + } + + for (const item of items) { + await this.atomicMoveContent(item, toStorageDriver, fromDrivers); + } + + totalDone += items.length; + } + } + public async sharedFolderChildrenItems(shareUserIds: Uuid[], folderId: string, includeResources: boolean = true): Promise { if (!shareUserIds.length) throw new Error('User IDs must be specified'); diff --git a/packages/server/src/models/StorageModel.ts b/packages/server/src/models/StorageModel.ts index ae0ac1f853d..70e82ddccac 100644 --- a/packages/server/src/models/StorageModel.ts +++ b/packages/server/src/models/StorageModel.ts @@ -15,4 +15,8 @@ export default class StorageModel extends BaseModel { return this.db(this.tableName).where('connection_string', connectionString).first(); } + public async byId(id: number): Promise { + return this.db(this.tableName).where('id', id).first(); + } + } diff --git a/packages/server/src/models/items/storage/loadStorageDriver.ts b/packages/server/src/models/items/storage/loadStorageDriver.ts index 0c168a8feb4..853d3ec4aad 100644 --- a/packages/server/src/models/items/storage/loadStorageDriver.ts +++ b/packages/server/src/models/items/storage/loadStorageDriver.ts @@ -2,6 +2,7 @@ import globalConfig from '../../../config'; import { clientType, DbConnection } from '../../../db'; import { StorageDriverConfig, StorageDriverType } from '../../../utils/types'; import newModelFactory from '../../factory'; +import parseStorageDriverConnectionString from './parseStorageConnectionString'; import serializeStorageConfig from './serializeStorageConfig'; import StorageDriverBase from './StorageDriverBase'; import StorageDriverDatabase from './StorageDriverDatabase'; @@ -12,7 +13,7 @@ export interface Options { assignDriverId?: boolean; } -export default async function(config: StorageDriverConfig, db: DbConnection, options: Options = null): Promise { +export default async function(config: StorageDriverConfig | number, db: DbConnection, options: Options = null): Promise { if (!config) return null; options = { @@ -22,20 +23,30 @@ export default async function(config: StorageDriverConfig, db: DbConnection, opt let storageId: number = 0; - if (options.assignDriverId) { + if (typeof config === 'number') { + storageId = config; + const models = newModelFactory(db, globalConfig()); + const storage = await models.storage().byId(storageId); + if (!storage) throw new Error(`No such storage ID: ${storageId}`); - const connectionString = serializeStorageConfig(config); - let storage = await models.storage().byConnectionString(connectionString); + config = parseStorageDriverConnectionString(storage.connection_string); + } else { + if (options.assignDriverId) { + const models = newModelFactory(db, globalConfig()); - if (!storage) { - await models.storage().save({ - connection_string: connectionString, - }); - storage = await models.storage().byConnectionString(connectionString); - } + const connectionString = serializeStorageConfig(config); + let storage = await models.storage().byConnectionString(connectionString); - storageId = storage.id; + if (!storage) { + await models.storage().save({ + connection_string: connectionString, + }); + storage = await models.storage().byConnectionString(connectionString); + } + + storageId = storage.id; + } } if (config.type === StorageDriverType.Database) { diff --git a/packages/server/src/models/items/storage/parseStorageDriverConnectionString.test.ts b/packages/server/src/models/items/storage/parseStorageConnectionString.test.ts similarity index 57% rename from packages/server/src/models/items/storage/parseStorageDriverConnectionString.test.ts rename to packages/server/src/models/items/storage/parseStorageConnectionString.test.ts index eab6af335b9..f1a73277162 100644 --- a/packages/server/src/models/items/storage/parseStorageDriverConnectionString.test.ts +++ b/packages/server/src/models/items/storage/parseStorageConnectionString.test.ts @@ -1,7 +1,7 @@ import { StorageDriverConfig, StorageDriverType } from '../../../utils/types'; -import parseStorageDriverConnectionString from './parseStorageDriverConnectionString'; +import parseStorageConnectionString from './parseStorageConnectionString'; -describe('parseStorageDriverConnectionString', function() { +describe('parseStorageConnectionString', function() { test('should parse a connection string', async function() { const testCases: Record = { @@ -26,17 +26,17 @@ describe('parseStorageDriverConnectionString', function() { }; for (const [connectionString, config] of Object.entries(testCases)) { - const actual = parseStorageDriverConnectionString(connectionString); + const actual = parseStorageConnectionString(connectionString); expect(actual).toEqual(config); } }); test('should detect errors', async function() { - expect(() => parseStorageDriverConnectionString('Path=/path/to/dir')).toThrow(); // Type is missing - expect(() => parseStorageDriverConnectionString('Type=')).toThrow(); - expect(() => parseStorageDriverConnectionString('Type;')).toThrow(); - expect(() => parseStorageDriverConnectionString('Type=DoesntExist')).toThrow(); - expect(() => parseStorageDriverConnectionString('Type=Filesystem')).toThrow(); + expect(() => parseStorageConnectionString('Path=/path/to/dir')).toThrow(); // Type is missing + expect(() => parseStorageConnectionString('Type=')).toThrow(); + expect(() => parseStorageConnectionString('Type;')).toThrow(); + expect(() => parseStorageConnectionString('Type=DoesntExist')).toThrow(); + expect(() => parseStorageConnectionString('Type=Filesystem')).toThrow(); }); }); diff --git a/packages/server/src/models/items/storage/parseStorageDriverConnectionString.ts b/packages/server/src/models/items/storage/parseStorageConnectionString.ts similarity index 97% rename from packages/server/src/models/items/storage/parseStorageDriverConnectionString.ts rename to packages/server/src/models/items/storage/parseStorageConnectionString.ts index 06abd7ca692..5dd3e1b7553 100644 --- a/packages/server/src/models/items/storage/parseStorageDriverConnectionString.ts +++ b/packages/server/src/models/items/storage/parseStorageConnectionString.ts @@ -6,6 +6,7 @@ const parseType = (type: string): StorageDriverType => { if (type === 'Database') return StorageDriverType.Database; if (type === 'Filesystem') return StorageDriverType.Filesystem; if (type === 'Memory') return StorageDriverType.Memory; + if (type === 'S3') return StorageDriverType.S3; throw new Error(`Invalid type: "${type}"`); }; diff --git a/packages/server/src/models/items/storage/serializeStorageConfig.ts b/packages/server/src/models/items/storage/serializeStorageConfig.ts index a3ba55b3176..6eae315ea0e 100644 --- a/packages/server/src/models/items/storage/serializeStorageConfig.ts +++ b/packages/server/src/models/items/storage/serializeStorageConfig.ts @@ -4,6 +4,7 @@ const serializeType = (type: StorageDriverType): string => { if (type === StorageDriverType.Database) return 'Database'; if (type === StorageDriverType.Filesystem) return 'Filesystem'; if (type === StorageDriverType.Memory) return 'Memory'; + if (type === StorageDriverType.S3) return 'S3'; throw new Error(`Invalid type: "${type}"`); }; diff --git a/packages/server/src/utils/setupCommands.ts b/packages/server/src/utils/setupCommands.ts index b5f87d6df43..27ab72ab1f0 100644 --- a/packages/server/src/utils/setupCommands.ts +++ b/packages/server/src/utils/setupCommands.ts @@ -2,6 +2,7 @@ import yargs = require('yargs'); import BaseCommand from '../commands/BaseCommand'; import DbCommand from '../commands/DbCommand'; import DeleteOldChangesCommand from '../commands/DeleteOldChangesCommand'; +import ImportContentCommand from '../commands/ImportContentCommand'; import MigrateCommand from '../commands/MigrateCommand'; export interface Commands { @@ -16,6 +17,7 @@ export default async function setupCommands(): Promise { new MigrateCommand(), new DbCommand(), new DeleteOldChangesCommand(), + new ImportContentCommand(), ]; for (const cmd of commands) { diff --git a/packages/server/src/utils/testing/testUtils.ts b/packages/server/src/utils/testing/testUtils.ts index f4113aa05ae..3e9236740a5 100644 --- a/packages/server/src/utils/testing/testUtils.ts +++ b/packages/server/src/utils/testing/testUtils.ts @@ -42,11 +42,11 @@ export function tempDirPath(): string { } let tempDir_: string = null; -export async function tempDir(): Promise { - if (tempDir_) return tempDir_; - tempDir_ = tempDirPath(); - await fs.mkdirp(tempDir_); - return tempDir_; +export async function tempDir(subDir: string = null): Promise { + if (!tempDir_) tempDir_ = tempDirPath(); + const fullDir = tempDir_ + (subDir ? `/${subDir}` : ''); + await fs.mkdirp(fullDir); + return fullDir; } export async function makeTempFileWithContent(content: string | Buffer): Promise {