Skip to content

Commit

Permalink
Server: Added command to migrate content to different storage
Browse files Browse the repository at this point in the history
  • Loading branch information
laurent22 committed Nov 11, 2021
1 parent 725c79d commit 005f720
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 28 deletions.
2 changes: 1 addition & 1 deletion packages/server/src/commands/DeleteOldChangesCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ interface Argv {
export default class DeleteOldChangesCommand extends BaseCommand {

public command() {
return 'deleteOldChanges';
return 'delete-old-changes';
}

public description() {
Expand Down
54 changes: 54 additions & 0 deletions packages/server/src/commands/ImportContentCommand.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { PositionalOptions, Options } from 'yargs';
import Logger from '@joplin/lib/Logger';
import BaseCommand, { RunContext } from './BaseCommand';
import parseStorageConnectionString from '../models/items/storage/parseStorageConnectionString';

const logger = Logger.create('ImportContentCommand');

interface Argv {
toStorage: string;
batchSize?: number;
}

export default class ImportContentCommand extends BaseCommand {

public command() {
return 'import-content <to-storage>';
}

public description() {
return 'import content to storage';
}

public positionals(): Record<string, PositionalOptions> {
return {
'to-storage': {
description: 'storage connection string',
type: 'string',
},
};
}

public options(): Record<string, Options> {
return {
'batch-size': {
type: 'number',
description: 'Item batch size',
},
};
}

public async run(argv: Argv, runContext: RunContext): Promise<void> {
const toStorageConfig = parseStorageConnectionString(argv.toStorage);
const batchSize = argv.batchSize || 1000;

logger.info('Importing to storage:', toStorageConfig);
logger.info(`Batch size: ${batchSize}`);

await runContext.models.item().importContentToStorage(toStorageConfig, {
batchSize: batchSize || 1000,
logger: logger as Logger,
});
}

}
2 changes: 1 addition & 1 deletion packages/server/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Config, DatabaseConfig, DatabaseConfigClient, Env, MailerConfig, RouteT
import * as pathUtils from 'path';
import { loadStripeConfig, StripePublicConfig } from '@joplin/lib/utils/joplinCloud';
import { EnvVariables } from './env';
import parseStorageDriverConnectionString from './models/items/storage/parseStorageDriverConnectionString';
import parseStorageDriverConnectionString from './models/items/storage/parseStorageConnectionString';

interface PackageJson {
version: string;
Expand Down
14 changes: 14 additions & 0 deletions packages/server/src/migrations/20211111134329_storage_index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Knex } from 'knex';
import { DbConnection } from '../db';

export async function up(db: DbConnection): Promise<any> {
await db.schema.alterTable('items', (table: Knex.CreateTableBuilder) => {
table.index('content_storage_id');
});
}

export async function down(db: DbConnection): Promise<any> {
await db.schema.alterTable('items', (table: Knex.CreateTableBuilder) => {
table.dropIndex('content_storage_id');
});
}
74 changes: 73 additions & 1 deletion packages/server/src/models/ItemModel.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, createItem, createItemTree, createResource, createNote, createFolder, createItemTree3 } from '../utils/testing/testUtils';
import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, createItem, createItemTree, createResource, createNote, createFolder, createItemTree3, db, tempDir } from '../utils/testing/testUtils';
import { shareFolderWithUser } from '../utils/testing/shareApiUtils';
import { resourceBlobPath } from '../utils/joplinUtils';
import newModelFactory from './factory';
import { StorageDriverType } from '../utils/types';
import config from '../config';
import { msleep } from '../utils/time';
import loadStorageDriver from './items/storage/loadStorageDriver';

describe('ItemModel', function() {

Expand Down Expand Up @@ -265,4 +270,71 @@ describe('ItemModel', function() {
expect((await models().user().load(user3.id)).total_item_size).toBe(expected3);
});

test('should allow importing content to item storage', async function() {
const { user: user1 } = await createUserAndSession(1);

const tempDir1 = await tempDir('storage1');
const tempDir2 = await tempDir('storage2');

const fromStorageConfig = {
type: StorageDriverType.Filesystem,
path: tempDir1,
};

const models = newModelFactory(db(), {
...config(),
storageDriver: fromStorageConfig,
});

await models.item().saveFromRawContent(user1, {
body: Buffer.from(JSON.stringify({ 'version': 1 })),
name: 'info.json',
});

const itemBefore = (await models.item().all())[0];

const fromDriver = await loadStorageDriver(fromStorageConfig, db());
const fromContent = await fromDriver.read(itemBefore.id, { models });

expect(fromContent.toString()).toBe('{"version":1}');

expect(itemBefore.content_storage_id).toBe(1);

await msleep(2);

const toStorageConfig = {
type: StorageDriverType.Filesystem,
path: tempDir2,
};

const toModels = newModelFactory(db(), {
...config(),
storageDriver: toStorageConfig,
});

const result = await toModels.item().saveFromRawContent(user1, {
body: Buffer.from(JSON.stringify({ 'version': 2 })),
name: 'info2.json',
});

const itemBefore2 = result['info2.json'].item;

await models.item().importContentToStorage(toStorageConfig);

const itemAfter = (await models.item().all()).find(it => it.id === itemBefore.id);
expect(itemAfter.content_storage_id).toBe(2);
expect(itemAfter.updated_time).toBe(itemBefore.updated_time);

// Just check the second item has not been processed since it was
// already on the right storage
const itemAfter2 = (await models.item().all()).find(it => it.id === itemBefore2.id);
expect(itemAfter2.content_storage_id).toBe(2);
expect(itemAfter2.updated_time).toBe(itemBefore2.updated_time);

const toDriver = await loadStorageDriver(toStorageConfig, db());
const toContent = await toDriver.read(itemAfter.id, { models });

expect(toContent.toString()).toBe(fromContent.toString());
});

});
76 changes: 75 additions & 1 deletion packages/server/src/models/ItemModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@ import { Knex } from 'knex';
import { ChangePreviousItem } from './ChangeModel';
import { unique } from '../utils/array';
import StorageDriverBase, { Context } from './items/storage/StorageDriverBase';
import { DbConnection } from '../db';
import { DbConnection, returningSupported } from '../db';
import { Config, StorageDriverConfig, StorageDriverMode } from '../utils/types';
import { NewModelFactoryHandler } from './factory';
import loadStorageDriver from './items/storage/loadStorageDriver';
import { msleep } from '../utils/time';
import Logger from '@joplin/lib/Logger';

const mimeUtils = require('@joplin/lib/mime-utils.js').mime;

// Converts "root:/myfile.txt:" to "myfile.txt"
const extractNameRegex = /^root:\/(.*):$/;

export interface ImportContentToStorageOptions {
batchSize?: number;
logger?: Logger;
}

export interface SaveFromRawContentItem {
name: string;
body: Buffer;
Expand Down Expand Up @@ -268,6 +275,73 @@ export default class ItemModel extends BaseModel<Item> {
}
}

private async atomicMoveContent(item: Item, toDriver: StorageDriverBase, drivers: Record<number, StorageDriverBase>) {
for (let i = 0; i < 10; i++) {
let fromDriver: StorageDriverBase = drivers[item.content_storage_id];

if (!fromDriver) {
fromDriver = await loadStorageDriver(item.content_storage_id, this.db);
drivers[item.content_storage_id] = fromDriver;
}

const content = await fromDriver.read(item.id, { models: this.models() });
await toDriver.write(item.id, content, { models: this.models() });

const updatedRows = await this
.db(this.tableName)
.where('id', '=', item.id)
.where('updated_time', '=', item.updated_time) // Check that the row hasn't changed while we were transferring the content
.update({ content_storage_id: toDriver.storageId }, returningSupported(this.db) ? ['id'] : undefined);

if (!returningSupported(this.db) || updatedRows.length) return;

await msleep(1000 + 1000 * i);
}

throw new Error(`Could not atomically update content for item: ${JSON.stringify(item)}`);
}

// Loop throught the items in the database and import their content to the
// target storage. Only items not already in that storage will be processed.
public async importContentToStorage(toStorageConfig: StorageDriverConfig, options: ImportContentToStorageOptions = null) {
options = {
batchSize: 1000,
logger: new Logger(),
...options,
};

const toStorageDriver = await this.loadStorageDriver(toStorageConfig);
const fromDrivers: Record<number, StorageDriverBase> = {};

const itemCount = (await this.db(this.tableName)
.count('id', { as: 'total' })
.where('content_storage_id', '!=', toStorageDriver.storageId)
.first())['total'];

let totalDone = 0;

while (true) {
const items: Item[] = await this
.db(this.tableName)
.select(['id', 'content_storage_id', 'updated_time'])
.where('content_storage_id', '!=', toStorageDriver.storageId)
.limit(options.batchSize);

options.logger.info(`Processing items ${totalDone} / ${itemCount}`);

if (!items.length) {
options.logger.info(`All items have been processed. Total: ${totalDone}`);
return;
}

for (const item of items) {
await this.atomicMoveContent(item, toStorageDriver, fromDrivers);
}

totalDone += items.length;
}
}

public async sharedFolderChildrenItems(shareUserIds: Uuid[], folderId: string, includeResources: boolean = true): Promise<Item[]> {
if (!shareUserIds.length) throw new Error('User IDs must be specified');

Expand Down
4 changes: 4 additions & 0 deletions packages/server/src/models/StorageModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ export default class StorageModel extends BaseModel<Storage> {
return this.db(this.tableName).where('connection_string', connectionString).first();
}

public async byId(id: number): Promise<Storage> {
return this.db(this.tableName).where('id', id).first();
}

}
33 changes: 22 additions & 11 deletions packages/server/src/models/items/storage/loadStorageDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import globalConfig from '../../../config';
import { clientType, DbConnection } from '../../../db';
import { StorageDriverConfig, StorageDriverType } from '../../../utils/types';
import newModelFactory from '../../factory';
import parseStorageDriverConnectionString from './parseStorageConnectionString';
import serializeStorageConfig from './serializeStorageConfig';
import StorageDriverBase from './StorageDriverBase';
import StorageDriverDatabase from './StorageDriverDatabase';
Expand All @@ -12,7 +13,7 @@ export interface Options {
assignDriverId?: boolean;
}

export default async function(config: StorageDriverConfig, db: DbConnection, options: Options = null): Promise<StorageDriverBase | null> {
export default async function(config: StorageDriverConfig | number, db: DbConnection, options: Options = null): Promise<StorageDriverBase | null> {
if (!config) return null;

options = {
Expand All @@ -22,20 +23,30 @@ export default async function(config: StorageDriverConfig, db: DbConnection, opt

let storageId: number = 0;

if (options.assignDriverId) {
if (typeof config === 'number') {
storageId = config;

const models = newModelFactory(db, globalConfig());
const storage = await models.storage().byId(storageId);
if (!storage) throw new Error(`No such storage ID: ${storageId}`);

const connectionString = serializeStorageConfig(config);
let storage = await models.storage().byConnectionString(connectionString);
config = parseStorageDriverConnectionString(storage.connection_string);
} else {
if (options.assignDriverId) {
const models = newModelFactory(db, globalConfig());

if (!storage) {
await models.storage().save({
connection_string: connectionString,
});
storage = await models.storage().byConnectionString(connectionString);
}
const connectionString = serializeStorageConfig(config);
let storage = await models.storage().byConnectionString(connectionString);

storageId = storage.id;
if (!storage) {
await models.storage().save({
connection_string: connectionString,
});
storage = await models.storage().byConnectionString(connectionString);
}

storageId = storage.id;
}
}

if (config.type === StorageDriverType.Database) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { StorageDriverConfig, StorageDriverType } from '../../../utils/types';
import parseStorageDriverConnectionString from './parseStorageDriverConnectionString';
import parseStorageConnectionString from './parseStorageConnectionString';

describe('parseStorageDriverConnectionString', function() {
describe('parseStorageConnectionString', function() {

test('should parse a connection string', async function() {
const testCases: Record<string, StorageDriverConfig> = {
Expand All @@ -26,17 +26,17 @@ describe('parseStorageDriverConnectionString', function() {
};

for (const [connectionString, config] of Object.entries(testCases)) {
const actual = parseStorageDriverConnectionString(connectionString);
const actual = parseStorageConnectionString(connectionString);
expect(actual).toEqual(config);
}
});

test('should detect errors', async function() {
expect(() => parseStorageDriverConnectionString('Path=/path/to/dir')).toThrow(); // Type is missing
expect(() => parseStorageDriverConnectionString('Type=')).toThrow();
expect(() => parseStorageDriverConnectionString('Type;')).toThrow();
expect(() => parseStorageDriverConnectionString('Type=DoesntExist')).toThrow();
expect(() => parseStorageDriverConnectionString('Type=Filesystem')).toThrow();
expect(() => parseStorageConnectionString('Path=/path/to/dir')).toThrow(); // Type is missing
expect(() => parseStorageConnectionString('Type=')).toThrow();
expect(() => parseStorageConnectionString('Type;')).toThrow();
expect(() => parseStorageConnectionString('Type=DoesntExist')).toThrow();
expect(() => parseStorageConnectionString('Type=Filesystem')).toThrow();
});

});
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const parseType = (type: string): StorageDriverType => {
if (type === 'Database') return StorageDriverType.Database;
if (type === 'Filesystem') return StorageDriverType.Filesystem;
if (type === 'Memory') return StorageDriverType.Memory;
if (type === 'S3') return StorageDriverType.S3;
throw new Error(`Invalid type: "${type}"`);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const serializeType = (type: StorageDriverType): string => {
if (type === StorageDriverType.Database) return 'Database';
if (type === StorageDriverType.Filesystem) return 'Filesystem';
if (type === StorageDriverType.Memory) return 'Memory';
if (type === StorageDriverType.S3) return 'S3';
throw new Error(`Invalid type: "${type}"`);
};

Expand Down
Loading

0 comments on commit 005f720

Please sign in to comment.