Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FABN-1524: Default file checkpointer implementation #201

Merged
merged 2 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion fabric-network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@

/**
* Persists the current block and transactions within that block to enable event listening to be resumed following an
* application outage.
* application outage. Default implementations can be obtained from
* [DefaultCheckpointers]{@link module:fabric-network.DefaultCheckpointers}. Application developers are encouraged to
* build their own implementations that use a persistent store suitable to their environment.
* @interface Checkpointer
* @memberof module:fabric-network
*/
Expand Down Expand Up @@ -489,6 +491,7 @@ module.exports.Wallet = require('./lib/impl/wallet/wallet').Wallet;
module.exports.Wallets = require('./lib/impl/wallet/wallets').Wallets;
module.exports.IdentityProviderRegistry = require('./lib/impl/wallet/identityproviderregistry').IdentityProviderRegistry;
module.exports.HsmX509Provider = require('./lib/impl/wallet/hsmx509identity').HsmX509Provider;
module.exports.DefaultCheckpointers = require('./lib/defaultcheckpointers').DefaultCheckpointers;
module.exports.DefaultEventHandlerStrategies = require('./lib/impl/event/defaulteventhandlerstrategies');
module.exports.DefaultQueryHandlerStrategies = require('./lib/impl/query/defaultqueryhandlerstrategies');
module.exports.TimeoutError = require('./lib/errors/timeouterror').TimeoutError;
13 changes: 6 additions & 7 deletions fabric-network/src/defaultcheckpointers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,24 @@
*/

import { Checkpointer } from './checkpointer';

// TODO: Implement, remove @private tag, and export in both index.js and index.d.ts
import { FileCheckpointer } from './impl/filecheckpointer';

/**
* Provides static factory functions used to create instances of default
* {@link module:fabric-network.Checkpointer|Checkpointer} implementations.
* @private
* @memberof module:fabric-network
*/
export class DefaultCheckpointers {
/**
* Create a checkpointer that uses the specified file to store persistent state. If the file does not exist, it will
* be created and the checkpointer will have an uninitialized state that will accept any events. If the file does
* exist, it must contain valid checkpointer state.
* exist, it must contain valid checkpoint state.
* @param {string} path Path to a file holding persistent checkpoint state.
* @returns {Promise<module:fabric-network.Checkpointer>} A checkpointer.
*/
static async file(path: string): Promise<Checkpointer> {
throw new Error('Not yet implemented');
// TODO: Create file system checkpointer and call async initialization to load initial state asynchronously
// from the file.
const checkpointer = new FileCheckpointer(path);
await checkpointer.init();
return checkpointer;
}
}
86 changes: 86 additions & 0 deletions fabric-network/src/impl/filecheckpointer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2020 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

import { Checkpointer } from '../checkpointer';
import Long = require('long');
import fs = require('fs');

const encoding = 'utf8';

interface PersistentState {
blockNumber?: string;
transactionIds?: string[];
}

export class FileCheckpointer implements Checkpointer {
private readonly path: string;
private blockNumber?: Long;
private transactionIds: Set<string> = new Set();

constructor(path: string) {
this.path = path;
}

async init(): Promise<void> {
await this.load();
await this.save();
}

async addTransactionId(transactionId: string): Promise<void> {
this.transactionIds.add(transactionId);
await this.save();
}

async getBlockNumber(): Promise<Long | undefined> {
return this.blockNumber;
}

async getTransactionIds(): Promise<Set<string>> {
return this.transactionIds;
}

async setBlockNumber(blockNumber: Long): Promise<void> {
this.blockNumber = blockNumber;
this.transactionIds.clear();
await this.save();
}

private async load(): Promise<void> {
const data = await this.readFile();
if (data) {
const json = data.toString(encoding);
const state = JSON.parse(json);
this.setState(state);
}
}

private async readFile(): Promise<Buffer | undefined> {
try {
return await fs.promises.readFile(this.path);
} catch (err) {
// Ignore error on non-existent file
}
}

private setState(state: PersistentState): void {
this.blockNumber = state.blockNumber ? Long.fromString(state.blockNumber) : undefined;
this.transactionIds = new Set(state.transactionIds);
}

private async save(): Promise<void> {
const state = this.getState();
const json = JSON.stringify(state);
const data = Buffer.from(json, encoding);
await fs.promises.writeFile(this.path, data);
}

private getState(): PersistentState {
return {
blockNumber: this.blockNumber?.toString(),
transactionIds: Array.from(this.transactionIds)
};
}
}
136 changes: 136 additions & 0 deletions fabric-network/test/impl/filecheckpointer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Copyright 2020 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

import { Checkpointer } from '../../src/checkpointer';
import { DefaultCheckpointers } from '../../src/defaultcheckpointers';
import * as testUtils from '../testutils';
import Long = require('long');
import path = require('path');
import fs = require('fs');
import chai = require('chai');
import chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const expect = chai.expect;

// tslint:disable: no-unused-expression

describe('FileCheckpointer', () => {
let dir: string;
let file: string;
let checkpointer: Checkpointer;

beforeEach(async () => {
dir = await testUtils.createTempDir();
file = path.join(dir, 'checkpoint.json');
checkpointer = await DefaultCheckpointers.file(file);
});

afterEach(async () => {
await testUtils.rmdir(dir);
});

it('new checkpointer has undefined block number', async () => {
const actual = await checkpointer.getBlockNumber();

expect(actual).to.be.undefined;
});

it('new checkpointer has empty transaction IDs', async () => {
const actual = await checkpointer.getTransactionIds();

expect(actual).to.be.empty;
});

it('can get added transaction IDs', async () => {
await checkpointer.addTransactionId('txId');
const actual = await checkpointer.getTransactionIds();

expect(actual).to.have.lengthOf(1).and.include('txId');
});

it('duplicate transaction IDs are ignored', async () => {
await checkpointer.addTransactionId('txId');
await checkpointer.addTransactionId('txId');
const actual = await checkpointer.getTransactionIds();

expect(actual).to.have.lengthOf(1).and.include('txId');
});

it('can get updated block number', async () => {
await checkpointer.setBlockNumber(Long.ONE);
const actual = await checkpointer.getBlockNumber();

expect(actual?.toNumber()).to.equal(1);
});

it('setting block number clears transaction IDs', async () => {
await checkpointer.addTransactionId('txId');

await checkpointer.setBlockNumber(Long.ONE);
const actual = await checkpointer.getTransactionIds();

expect(actual).to.be.empty;
});

it('initial state retained on reopen of checkpointer', async () => {
checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber).to.be.undefined;
expect(transactionIds).to.be.empty;
});

it('state is persisted when block number updated', async () => {
await checkpointer.setBlockNumber(Long.ONE);

checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber?.toNumber()).to.equal(1);
expect(transactionIds).to.be.empty;
});

it('state is persisted when transaction IDs added', async () => {
await checkpointer.addTransactionId('txId');

checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber).to.be.undefined;
expect(transactionIds).to.have.lengthOf(1).and.include('txId');
});

it('persistent state is consistent on multiple updates', async () => {
await checkpointer.setBlockNumber(Long.ZERO);
await checkpointer.addTransactionId('tx0');
await checkpointer.setBlockNumber(Long.ONE);
await checkpointer.addTransactionId('tx1');

checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber?.toNumber()).to.equal(1);
expect(transactionIds).to.have.lengthOf(1).and.include('tx1');
});

it('create fails for bad persistent data', async () => {
await fs.promises.writeFile(file, Buffer.from('bad to the bone'));

const promise = DefaultCheckpointers.file(file);

await expect(promise).to.be.rejected;
});

it('create fails for non-writable path', async () => {
const promise = DefaultCheckpointers.file(path.join(dir, 'MISSING_DIR', 'MISSING_FILE'));

await expect(promise).to.be.rejected;
});
});
18 changes: 17 additions & 1 deletion fabric-network/test/testutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

import os = require('os');
import fs = require('fs');
import path = require('path');
import util = require('util');
import _rimraf = require('rimraf');
const rimraf = util.promisify(_rimraf);

/**
* Returns a new async function that taken one argument of the generic type. The returned function also has a
* 'completePromise' property of type Promise<T[]>, which resolves when the expected number of calls have been made
Expand All @@ -18,7 +25,7 @@ export function newAsyncListener<T>(expectedCallCount = 1, maxSleep = 0) {
const events: T[] = [];
const listener = async (event: T) => {
if (maxSleep > 0) {
// Some random delay to similate async work in the listener and catch timing bugs
// Some random delay to simulate async work in the listener and catch timing bugs
await sleep(getRandomInt(maxSleep));
}
events.push(event);
Expand All @@ -43,3 +50,12 @@ export function sleep(ms: number) {
export function getRandomInt(max: number) {
return Math.floor(Math.random() * Math.floor(max));
}

export async function createTempDir(): Promise<string> {
const prefix = os.tmpdir + path.sep;
return await fs.promises.mkdtemp(prefix);
}

export async function rmdir(path: string): Promise<void> {
await rimraf(path);
}
1 change: 1 addition & 0 deletions fabric-network/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export { QueryHandler } from '../lib/impl/query/queryhandler';
export { Query, QueryResults, QueryResponse } from '../lib/impl/query/query';
export { Network };
export { Checkpointer } from '../lib/checkpointer';
export { DefaultCheckpointers } from '../lib/defaultcheckpointers';

import * as DefaultEventHandlerStrategies from '../lib/impl/event/defaulteventhandlerstrategies';
export { DefaultEventHandlerStrategies };
Expand Down
18 changes: 18 additions & 0 deletions test/ts-scenario/features/events.feature
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,21 @@ Feature: Node SDK Events
When I unregister the listener named privateBlockListener
And I use the gateway named event_gateway to submit a total of 5 transactions with args [privateValuePut] for contract events instantiated on channel eventschannel
Then I receive 0 events from the listener named privateBlockListener

Scenario: Checkpoint block event listening
When I use the gateway named event_gateway to listen for full block events with a new file checkpoint listener named checkpointBlockListener on channel eventschannel
When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
Then I receive a minimum 1 events from the listener named checkpointBlockListener
When I unregister the listener named checkpointBlockListener
When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
When I use the gateway named event_gateway to listen for full block events with an existing file checkpoint listener named checkpointBlockListener on channel eventschannel
Then I receive a minimum 1 events from the listener named checkpointBlockListener

Scenario: Checkpoint contract event listening
When I use the gateway named event_gateway to listen for full contract events named create with a new file checkpoint listener named checkpointContractListener for the smart contract named events on channel eventschannel
And I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
Then the listener named checkpointContractListener should have contract events with payload containing "createValueTransactionContent"
When I unregister the listener named checkpointContractListener
And I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
And I use the gateway named event_gateway to listen for full contract events named create with an existing file checkpoint listener named checkpointContractListener for the smart contract named events on channel eventschannel
Then the listener named checkpointContractListener should have contract events with payload containing "createValueTransactionContent"
Loading