Skip to content

Commit

Permalink
FABN-1524: Default file checkpointer implementation (#201)
Browse files Browse the repository at this point in the history
* FABN-1524: Default file checkpointer implementation

Signed-off-by: Mark S. Lewis <[email protected]>

* FABN-1524: Checkpoint scenario tests

Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday authored Mar 30, 2020
1 parent f6e8ae3 commit 0e8dfa8
Show file tree
Hide file tree
Showing 18 changed files with 368 additions and 50 deletions.
5 changes: 4 additions & 1 deletion fabric-network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@

/**
* Persists the current block and transactions within that block to enable event listening to be resumed following an
* application outage.
* application outage. Default implementations can be obtained from
* [DefaultCheckpointers]{@link module:fabric-network.DefaultCheckpointers}. Application developers are encouraged to
* build their own implementations that use a persistent store suitable to their environment.
* @interface Checkpointer
* @memberof module:fabric-network
*/
Expand Down Expand Up @@ -489,6 +491,7 @@ module.exports.Wallet = require('./lib/impl/wallet/wallet').Wallet;
module.exports.Wallets = require('./lib/impl/wallet/wallets').Wallets;
module.exports.IdentityProviderRegistry = require('./lib/impl/wallet/identityproviderregistry').IdentityProviderRegistry;
module.exports.HsmX509Provider = require('./lib/impl/wallet/hsmx509identity').HsmX509Provider;
module.exports.DefaultCheckpointers = require('./lib/defaultcheckpointers').DefaultCheckpointers;
module.exports.DefaultEventHandlerStrategies = require('./lib/impl/event/defaulteventhandlerstrategies');
module.exports.DefaultQueryHandlerStrategies = require('./lib/impl/query/defaultqueryhandlerstrategies');
module.exports.TimeoutError = require('./lib/errors/timeouterror').TimeoutError;
13 changes: 6 additions & 7 deletions fabric-network/src/defaultcheckpointers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,24 @@
*/

import { Checkpointer } from './checkpointer';

// TODO: Implement, remove @private tag, and export in both index.js and index.d.ts
import { FileCheckpointer } from './impl/filecheckpointer';

/**
* Provides static factory functions used to create instances of default
* {@link module:fabric-network.Checkpointer|Checkpointer} implementations.
* @private
* @memberof module:fabric-network
*/
export class DefaultCheckpointers {
/**
* Create a checkpointer that uses the specified file to store persistent state. If the file does not exist, it will
* be created and the checkpointer will have an uninitialized state that will accept any events. If the file does
* exist, it must contain valid checkpointer state.
* exist, it must contain valid checkpoint state.
* @param {string} path Path to a file holding persistent checkpoint state.
* @returns {Promise<module:fabric-network.Checkpointer>} A checkpointer.
*/
static async file(path: string): Promise<Checkpointer> {
throw new Error('Not yet implemented');
// TODO: Create file system checkpointer and call async initialization to load initial state asynchronously
// from the file.
const checkpointer = new FileCheckpointer(path);
await checkpointer.init();
return checkpointer;
}
}
86 changes: 86 additions & 0 deletions fabric-network/src/impl/filecheckpointer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2020 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

import { Checkpointer } from '../checkpointer';
import Long = require('long');
import fs = require('fs');

const encoding = 'utf8';

interface PersistentState {
blockNumber?: string;
transactionIds?: string[];
}

export class FileCheckpointer implements Checkpointer {
private readonly path: string;
private blockNumber?: Long;
private transactionIds: Set<string> = new Set();

constructor(path: string) {
this.path = path;
}

async init(): Promise<void> {
await this.load();
await this.save();
}

async addTransactionId(transactionId: string): Promise<void> {
this.transactionIds.add(transactionId);
await this.save();
}

async getBlockNumber(): Promise<Long | undefined> {
return this.blockNumber;
}

async getTransactionIds(): Promise<Set<string>> {
return this.transactionIds;
}

async setBlockNumber(blockNumber: Long): Promise<void> {
this.blockNumber = blockNumber;
this.transactionIds.clear();
await this.save();
}

private async load(): Promise<void> {
const data = await this.readFile();
if (data) {
const json = data.toString(encoding);
const state = JSON.parse(json);
this.setState(state);
}
}

private async readFile(): Promise<Buffer | undefined> {
try {
return await fs.promises.readFile(this.path);
} catch (err) {
// Ignore error on non-existent file
}
}

private setState(state: PersistentState): void {
this.blockNumber = state.blockNumber ? Long.fromString(state.blockNumber) : undefined;
this.transactionIds = new Set(state.transactionIds);
}

private async save(): Promise<void> {
const state = this.getState();
const json = JSON.stringify(state);
const data = Buffer.from(json, encoding);
await fs.promises.writeFile(this.path, data);
}

private getState(): PersistentState {
return {
blockNumber: this.blockNumber?.toString(),
transactionIds: Array.from(this.transactionIds)
};
}
}
File renamed without changes.
136 changes: 136 additions & 0 deletions fabric-network/test/impl/filecheckpointer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Copyright 2020 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

import { Checkpointer } from '../../src/checkpointer';
import { DefaultCheckpointers } from '../../src/defaultcheckpointers';
import * as testUtils from '../testutils';
import Long = require('long');
import path = require('path');
import fs = require('fs');
import chai = require('chai');
import chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const expect = chai.expect;

// tslint:disable: no-unused-expression

describe('FileCheckpointer', () => {
let dir: string;
let file: string;
let checkpointer: Checkpointer;

beforeEach(async () => {
dir = await testUtils.createTempDir();
file = path.join(dir, 'checkpoint.json');
checkpointer = await DefaultCheckpointers.file(file);
});

afterEach(async () => {
await testUtils.rmdir(dir);
});

it('new checkpointer has undefined block number', async () => {
const actual = await checkpointer.getBlockNumber();

expect(actual).to.be.undefined;
});

it('new checkpointer has empty transaction IDs', async () => {
const actual = await checkpointer.getTransactionIds();

expect(actual).to.be.empty;
});

it('can get added transaction IDs', async () => {
await checkpointer.addTransactionId('txId');
const actual = await checkpointer.getTransactionIds();

expect(actual).to.have.lengthOf(1).and.include('txId');
});

it('duplicate transaction IDs are ignored', async () => {
await checkpointer.addTransactionId('txId');
await checkpointer.addTransactionId('txId');
const actual = await checkpointer.getTransactionIds();

expect(actual).to.have.lengthOf(1).and.include('txId');
});

it('can get updated block number', async () => {
await checkpointer.setBlockNumber(Long.ONE);
const actual = await checkpointer.getBlockNumber();

expect(actual?.toNumber()).to.equal(1);
});

it('setting block number clears transaction IDs', async () => {
await checkpointer.addTransactionId('txId');

await checkpointer.setBlockNumber(Long.ONE);
const actual = await checkpointer.getTransactionIds();

expect(actual).to.be.empty;
});

it('initial state retained on reopen of checkpointer', async () => {
checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber).to.be.undefined;
expect(transactionIds).to.be.empty;
});

it('state is persisted when block number updated', async () => {
await checkpointer.setBlockNumber(Long.ONE);

checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber?.toNumber()).to.equal(1);
expect(transactionIds).to.be.empty;
});

it('state is persisted when transaction IDs added', async () => {
await checkpointer.addTransactionId('txId');

checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber).to.be.undefined;
expect(transactionIds).to.have.lengthOf(1).and.include('txId');
});

it('persistent state is consistent on multiple updates', async () => {
await checkpointer.setBlockNumber(Long.ZERO);
await checkpointer.addTransactionId('tx0');
await checkpointer.setBlockNumber(Long.ONE);
await checkpointer.addTransactionId('tx1');

checkpointer = await DefaultCheckpointers.file(file);
const blockNumber = await checkpointer.getBlockNumber();
const transactionIds = await checkpointer.getTransactionIds();

expect(blockNumber?.toNumber()).to.equal(1);
expect(transactionIds).to.have.lengthOf(1).and.include('tx1');
});

it('create fails for bad persistent data', async () => {
await fs.promises.writeFile(file, Buffer.from('bad to the bone'));

const promise = DefaultCheckpointers.file(file);

await expect(promise).to.be.rejected;
});

it('create fails for non-writable path', async () => {
const promise = DefaultCheckpointers.file(path.join(dir, 'MISSING_DIR', 'MISSING_FILE'));

await expect(promise).to.be.rejected;
});
});
18 changes: 17 additions & 1 deletion fabric-network/test/testutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

import os = require('os');
import fs = require('fs');
import path = require('path');
import util = require('util');
import _rimraf = require('rimraf');
const rimraf = util.promisify(_rimraf);

/**
* Returns a new async function that taken one argument of the generic type. The returned function also has a
* 'completePromise' property of type Promise<T[]>, which resolves when the expected number of calls have been made
Expand All @@ -18,7 +25,7 @@ export function newAsyncListener<T>(expectedCallCount = 1, maxSleep = 0) {
const events: T[] = [];
const listener = async (event: T) => {
if (maxSleep > 0) {
// Some random delay to similate async work in the listener and catch timing bugs
// Some random delay to simulate async work in the listener and catch timing bugs
await sleep(getRandomInt(maxSleep));
}
events.push(event);
Expand All @@ -43,3 +50,12 @@ export function sleep(ms: number) {
export function getRandomInt(max: number) {
return Math.floor(Math.random() * Math.floor(max));
}

export async function createTempDir(): Promise<string> {
const prefix = os.tmpdir + path.sep;
return await fs.promises.mkdtemp(prefix);
}

export async function rmdir(path: string): Promise<void> {
await rimraf(path);
}
1 change: 1 addition & 0 deletions fabric-network/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export { QueryHandler } from '../lib/impl/query/queryhandler';
export { Query, QueryResults, QueryResponse } from '../lib/impl/query/query';
export { Network };
export { Checkpointer } from '../lib/checkpointer';
export { DefaultCheckpointers } from '../lib/defaultcheckpointers';

import * as DefaultEventHandlerStrategies from '../lib/impl/event/defaulteventhandlerstrategies';
export { DefaultEventHandlerStrategies };
Expand Down
18 changes: 18 additions & 0 deletions test/ts-scenario/features/events.feature
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,21 @@ Feature: Node SDK Events
When I unregister the listener named privateBlockListener
And I use the gateway named event_gateway to submit a total of 5 transactions with args [privateValuePut] for contract events instantiated on channel eventschannel
Then I receive 0 events from the listener named privateBlockListener

Scenario: Checkpoint block event listening
When I use the gateway named event_gateway to listen for full block events with a new file checkpoint listener named checkpointBlockListener on channel eventschannel
When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
Then I receive a minimum 1 events from the listener named checkpointBlockListener
When I unregister the listener named checkpointBlockListener
When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
When I use the gateway named event_gateway to listen for full block events with an existing file checkpoint listener named checkpointBlockListener on channel eventschannel
Then I receive a minimum 1 events from the listener named checkpointBlockListener

Scenario: Checkpoint contract event listening
When I use the gateway named event_gateway to listen for full contract events named create with a new file checkpoint listener named checkpointContractListener for the smart contract named events on channel eventschannel
And I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
Then the listener named checkpointContractListener should have contract events with payload containing "createValueTransactionContent"
When I unregister the listener named checkpointContractListener
And I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel
And I use the gateway named event_gateway to listen for full contract events named create with an existing file checkpoint listener named checkpointContractListener for the smart contract named events on channel eventschannel
Then the listener named checkpointContractListener should have contract events with payload containing "createValueTransactionContent"
Loading

0 comments on commit 0e8dfa8

Please sign in to comment.