From 0e8dfa88a7d11e4e5e1e5be7a206b3620a286a07 Mon Sep 17 00:00:00 2001 From: "Mark S. Lewis" Date: Mon, 30 Mar 2020 12:23:13 +0100 Subject: [PATCH] FABN-1524: Default file checkpointer implementation (#201) * FABN-1524: Default file checkpointer implementation Signed-off-by: Mark S. Lewis * FABN-1524: Checkpoint scenario tests Signed-off-by: Mark S. Lewis --- fabric-network/index.js | 5 +- fabric-network/src/defaultcheckpointers.ts | 13 +- fabric-network/src/impl/filecheckpointer.ts | 86 +++++++++++ ...asyncnotifier.ts => asyncnotifier.spec.ts} | 0 ...blocklistener.ts => blocklistener.spec.ts} | 0 ...mmitlistener.ts => commitlistener.spec.ts} | 0 ...ctlistener.ts => contractlistener.spec.ts} | 0 ... => defaulteventhandlerstrategies.spec.ts} | 0 ...eventstrategy.ts => eventstrategy.spec.ts} | 0 .../event/{listeners.ts => listeners.spec.ts} | 0 ...lockqueue.ts => orderedblockqueue.spec.ts} | 0 ...ler.ts => transactioneventhandler.spec.ts} | 0 .../test/impl/filecheckpointer.spec.ts | 136 ++++++++++++++++++ fabric-network/test/testutils.ts | 18 ++- fabric-network/types/index.d.ts | 1 + test/ts-scenario/features/events.feature | 18 +++ test/ts-scenario/steps/event-listeners.ts | 56 +++++++- test/ts-scenario/steps/lib/listeners.ts | 85 ++++++----- 18 files changed, 368 insertions(+), 50 deletions(-) create mode 100644 fabric-network/src/impl/filecheckpointer.ts rename fabric-network/test/impl/event/{asyncnotifier.ts => asyncnotifier.spec.ts} (100%) rename fabric-network/test/impl/event/{blocklistener.ts => blocklistener.spec.ts} (100%) rename fabric-network/test/impl/event/{commitlistener.ts => commitlistener.spec.ts} (100%) rename fabric-network/test/impl/event/{contractlistener.ts => contractlistener.spec.ts} (100%) rename fabric-network/test/impl/event/{defaulteventhandlerstrategies.ts => defaulteventhandlerstrategies.spec.ts} (100%) rename fabric-network/test/impl/event/{eventstrategy.ts => eventstrategy.spec.ts} (100%) rename fabric-network/test/impl/event/{listeners.ts => listeners.spec.ts} (100%) rename fabric-network/test/impl/event/{orderedblockqueue.ts => orderedblockqueue.spec.ts} (100%) rename fabric-network/test/impl/event/{transactioneventhandler.ts => transactioneventhandler.spec.ts} (100%) create mode 100644 fabric-network/test/impl/filecheckpointer.spec.ts diff --git a/fabric-network/index.js b/fabric-network/index.js index f79c17d563..07e6e6c250 100644 --- a/fabric-network/index.js +++ b/fabric-network/index.js @@ -285,7 +285,9 @@ /** * Persists the current block and transactions within that block to enable event listening to be resumed following an - * application outage. + * application outage. Default implementations can be obtained from + * [DefaultCheckpointers]{@link module:fabric-network.DefaultCheckpointers}. Application developers are encouraged to + * build their own implementations that use a persistent store suitable to their environment. * @interface Checkpointer * @memberof module:fabric-network */ @@ -489,6 +491,7 @@ module.exports.Wallet = require('./lib/impl/wallet/wallet').Wallet; module.exports.Wallets = require('./lib/impl/wallet/wallets').Wallets; module.exports.IdentityProviderRegistry = require('./lib/impl/wallet/identityproviderregistry').IdentityProviderRegistry; module.exports.HsmX509Provider = require('./lib/impl/wallet/hsmx509identity').HsmX509Provider; +module.exports.DefaultCheckpointers = require('./lib/defaultcheckpointers').DefaultCheckpointers; module.exports.DefaultEventHandlerStrategies = require('./lib/impl/event/defaulteventhandlerstrategies'); module.exports.DefaultQueryHandlerStrategies = require('./lib/impl/query/defaultqueryhandlerstrategies'); module.exports.TimeoutError = require('./lib/errors/timeouterror').TimeoutError; diff --git a/fabric-network/src/defaultcheckpointers.ts b/fabric-network/src/defaultcheckpointers.ts index 02e0062986..d932b26c78 100644 --- a/fabric-network/src/defaultcheckpointers.ts +++ b/fabric-network/src/defaultcheckpointers.ts @@ -5,25 +5,24 @@ */ import { Checkpointer } from './checkpointer'; - -// TODO: Implement, remove @private tag, and export in both index.js and index.d.ts +import { FileCheckpointer } from './impl/filecheckpointer'; /** * Provides static factory functions used to create instances of default * {@link module:fabric-network.Checkpointer|Checkpointer} implementations. - * @private + * @memberof module:fabric-network */ export class DefaultCheckpointers { /** * Create a checkpointer that uses the specified file to store persistent state. If the file does not exist, it will * be created and the checkpointer will have an uninitialized state that will accept any events. If the file does - * exist, it must contain valid checkpointer state. + * exist, it must contain valid checkpoint state. * @param {string} path Path to a file holding persistent checkpoint state. * @returns {Promise} A checkpointer. */ static async file(path: string): Promise { - throw new Error('Not yet implemented'); - // TODO: Create file system checkpointer and call async initialization to load initial state asynchronously - // from the file. + const checkpointer = new FileCheckpointer(path); + await checkpointer.init(); + return checkpointer; } } diff --git a/fabric-network/src/impl/filecheckpointer.ts b/fabric-network/src/impl/filecheckpointer.ts new file mode 100644 index 0000000000..76cc0d4f75 --- /dev/null +++ b/fabric-network/src/impl/filecheckpointer.ts @@ -0,0 +1,86 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Checkpointer } from '../checkpointer'; +import Long = require('long'); +import fs = require('fs'); + +const encoding = 'utf8'; + +interface PersistentState { + blockNumber?: string; + transactionIds?: string[]; +} + +export class FileCheckpointer implements Checkpointer { + private readonly path: string; + private blockNumber?: Long; + private transactionIds: Set = new Set(); + + constructor(path: string) { + this.path = path; + } + + async init(): Promise { + await this.load(); + await this.save(); + } + + async addTransactionId(transactionId: string): Promise { + this.transactionIds.add(transactionId); + await this.save(); + } + + async getBlockNumber(): Promise { + return this.blockNumber; + } + + async getTransactionIds(): Promise> { + return this.transactionIds; + } + + async setBlockNumber(blockNumber: Long): Promise { + this.blockNumber = blockNumber; + this.transactionIds.clear(); + await this.save(); + } + + private async load(): Promise { + const data = await this.readFile(); + if (data) { + const json = data.toString(encoding); + const state = JSON.parse(json); + this.setState(state); + } + } + + private async readFile(): Promise { + try { + return await fs.promises.readFile(this.path); + } catch (err) { + // Ignore error on non-existent file + } + } + + private setState(state: PersistentState): void { + this.blockNumber = state.blockNumber ? Long.fromString(state.blockNumber) : undefined; + this.transactionIds = new Set(state.transactionIds); + } + + private async save(): Promise { + const state = this.getState(); + const json = JSON.stringify(state); + const data = Buffer.from(json, encoding); + await fs.promises.writeFile(this.path, data); + } + + private getState(): PersistentState { + return { + blockNumber: this.blockNumber?.toString(), + transactionIds: Array.from(this.transactionIds) + }; + } +} diff --git a/fabric-network/test/impl/event/asyncnotifier.ts b/fabric-network/test/impl/event/asyncnotifier.spec.ts similarity index 100% rename from fabric-network/test/impl/event/asyncnotifier.ts rename to fabric-network/test/impl/event/asyncnotifier.spec.ts diff --git a/fabric-network/test/impl/event/blocklistener.ts b/fabric-network/test/impl/event/blocklistener.spec.ts similarity index 100% rename from fabric-network/test/impl/event/blocklistener.ts rename to fabric-network/test/impl/event/blocklistener.spec.ts diff --git a/fabric-network/test/impl/event/commitlistener.ts b/fabric-network/test/impl/event/commitlistener.spec.ts similarity index 100% rename from fabric-network/test/impl/event/commitlistener.ts rename to fabric-network/test/impl/event/commitlistener.spec.ts diff --git a/fabric-network/test/impl/event/contractlistener.ts b/fabric-network/test/impl/event/contractlistener.spec.ts similarity index 100% rename from fabric-network/test/impl/event/contractlistener.ts rename to fabric-network/test/impl/event/contractlistener.spec.ts diff --git a/fabric-network/test/impl/event/defaulteventhandlerstrategies.ts b/fabric-network/test/impl/event/defaulteventhandlerstrategies.spec.ts similarity index 100% rename from fabric-network/test/impl/event/defaulteventhandlerstrategies.ts rename to fabric-network/test/impl/event/defaulteventhandlerstrategies.spec.ts diff --git a/fabric-network/test/impl/event/eventstrategy.ts b/fabric-network/test/impl/event/eventstrategy.spec.ts similarity index 100% rename from fabric-network/test/impl/event/eventstrategy.ts rename to fabric-network/test/impl/event/eventstrategy.spec.ts diff --git a/fabric-network/test/impl/event/listeners.ts b/fabric-network/test/impl/event/listeners.spec.ts similarity index 100% rename from fabric-network/test/impl/event/listeners.ts rename to fabric-network/test/impl/event/listeners.spec.ts diff --git a/fabric-network/test/impl/event/orderedblockqueue.ts b/fabric-network/test/impl/event/orderedblockqueue.spec.ts similarity index 100% rename from fabric-network/test/impl/event/orderedblockqueue.ts rename to fabric-network/test/impl/event/orderedblockqueue.spec.ts diff --git a/fabric-network/test/impl/event/transactioneventhandler.ts b/fabric-network/test/impl/event/transactioneventhandler.spec.ts similarity index 100% rename from fabric-network/test/impl/event/transactioneventhandler.ts rename to fabric-network/test/impl/event/transactioneventhandler.spec.ts diff --git a/fabric-network/test/impl/filecheckpointer.spec.ts b/fabric-network/test/impl/filecheckpointer.spec.ts new file mode 100644 index 0000000000..9a4f14623f --- /dev/null +++ b/fabric-network/test/impl/filecheckpointer.spec.ts @@ -0,0 +1,136 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Checkpointer } from '../../src/checkpointer'; +import { DefaultCheckpointers } from '../../src/defaultcheckpointers'; +import * as testUtils from '../testutils'; +import Long = require('long'); +import path = require('path'); +import fs = require('fs'); +import chai = require('chai'); +import chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); +const expect = chai.expect; + +// tslint:disable: no-unused-expression + +describe('FileCheckpointer', () => { + let dir: string; + let file: string; + let checkpointer: Checkpointer; + + beforeEach(async () => { + dir = await testUtils.createTempDir(); + file = path.join(dir, 'checkpoint.json'); + checkpointer = await DefaultCheckpointers.file(file); + }); + + afterEach(async () => { + await testUtils.rmdir(dir); + }); + + it('new checkpointer has undefined block number', async () => { + const actual = await checkpointer.getBlockNumber(); + + expect(actual).to.be.undefined; + }); + + it('new checkpointer has empty transaction IDs', async () => { + const actual = await checkpointer.getTransactionIds(); + + expect(actual).to.be.empty; + }); + + it('can get added transaction IDs', async () => { + await checkpointer.addTransactionId('txId'); + const actual = await checkpointer.getTransactionIds(); + + expect(actual).to.have.lengthOf(1).and.include('txId'); + }); + + it('duplicate transaction IDs are ignored', async () => { + await checkpointer.addTransactionId('txId'); + await checkpointer.addTransactionId('txId'); + const actual = await checkpointer.getTransactionIds(); + + expect(actual).to.have.lengthOf(1).and.include('txId'); + }); + + it('can get updated block number', async () => { + await checkpointer.setBlockNumber(Long.ONE); + const actual = await checkpointer.getBlockNumber(); + + expect(actual?.toNumber()).to.equal(1); + }); + + it('setting block number clears transaction IDs', async () => { + await checkpointer.addTransactionId('txId'); + + await checkpointer.setBlockNumber(Long.ONE); + const actual = await checkpointer.getTransactionIds(); + + expect(actual).to.be.empty; + }); + + it('initial state retained on reopen of checkpointer', async () => { + checkpointer = await DefaultCheckpointers.file(file); + const blockNumber = await checkpointer.getBlockNumber(); + const transactionIds = await checkpointer.getTransactionIds(); + + expect(blockNumber).to.be.undefined; + expect(transactionIds).to.be.empty; + }); + + it('state is persisted when block number updated', async () => { + await checkpointer.setBlockNumber(Long.ONE); + + checkpointer = await DefaultCheckpointers.file(file); + const blockNumber = await checkpointer.getBlockNumber(); + const transactionIds = await checkpointer.getTransactionIds(); + + expect(blockNumber?.toNumber()).to.equal(1); + expect(transactionIds).to.be.empty; + }); + + it('state is persisted when transaction IDs added', async () => { + await checkpointer.addTransactionId('txId'); + + checkpointer = await DefaultCheckpointers.file(file); + const blockNumber = await checkpointer.getBlockNumber(); + const transactionIds = await checkpointer.getTransactionIds(); + + expect(blockNumber).to.be.undefined; + expect(transactionIds).to.have.lengthOf(1).and.include('txId'); + }); + + it('persistent state is consistent on multiple updates', async () => { + await checkpointer.setBlockNumber(Long.ZERO); + await checkpointer.addTransactionId('tx0'); + await checkpointer.setBlockNumber(Long.ONE); + await checkpointer.addTransactionId('tx1'); + + checkpointer = await DefaultCheckpointers.file(file); + const blockNumber = await checkpointer.getBlockNumber(); + const transactionIds = await checkpointer.getTransactionIds(); + + expect(blockNumber?.toNumber()).to.equal(1); + expect(transactionIds).to.have.lengthOf(1).and.include('tx1'); + }); + + it('create fails for bad persistent data', async () => { + await fs.promises.writeFile(file, Buffer.from('bad to the bone')); + + const promise = DefaultCheckpointers.file(file); + + await expect(promise).to.be.rejected; + }); + + it('create fails for non-writable path', async () => { + const promise = DefaultCheckpointers.file(path.join(dir, 'MISSING_DIR', 'MISSING_FILE')); + + await expect(promise).to.be.rejected; + }); +}); diff --git a/fabric-network/test/testutils.ts b/fabric-network/test/testutils.ts index cffd34ac25..0978ce9b35 100644 --- a/fabric-network/test/testutils.ts +++ b/fabric-network/test/testutils.ts @@ -4,6 +4,13 @@ * SPDX-License-Identifier: Apache-2.0 */ +import os = require('os'); +import fs = require('fs'); +import path = require('path'); +import util = require('util'); +import _rimraf = require('rimraf'); +const rimraf = util.promisify(_rimraf); + /** * Returns a new async function that taken one argument of the generic type. The returned function also has a * 'completePromise' property of type Promise, which resolves when the expected number of calls have been made @@ -18,7 +25,7 @@ export function newAsyncListener(expectedCallCount = 1, maxSleep = 0) { const events: T[] = []; const listener = async (event: T) => { if (maxSleep > 0) { - // Some random delay to similate async work in the listener and catch timing bugs + // Some random delay to simulate async work in the listener and catch timing bugs await sleep(getRandomInt(maxSleep)); } events.push(event); @@ -43,3 +50,12 @@ export function sleep(ms: number) { export function getRandomInt(max: number) { return Math.floor(Math.random() * Math.floor(max)); } + +export async function createTempDir(): Promise { + const prefix = os.tmpdir + path.sep; + return await fs.promises.mkdtemp(prefix); +} + +export async function rmdir(path: string): Promise { + await rimraf(path); +} diff --git a/fabric-network/types/index.d.ts b/fabric-network/types/index.d.ts index 78dab66ab9..c86bf2f96c 100644 --- a/fabric-network/types/index.d.ts +++ b/fabric-network/types/index.d.ts @@ -30,6 +30,7 @@ export { QueryHandler } from '../lib/impl/query/queryhandler'; export { Query, QueryResults, QueryResponse } from '../lib/impl/query/query'; export { Network }; export { Checkpointer } from '../lib/checkpointer'; +export { DefaultCheckpointers } from '../lib/defaultcheckpointers'; import * as DefaultEventHandlerStrategies from '../lib/impl/event/defaulteventhandlerstrategies'; export { DefaultEventHandlerStrategies }; diff --git a/test/ts-scenario/features/events.feature b/test/ts-scenario/features/events.feature index 038c4c8a96..d567ffb47b 100644 --- a/test/ts-scenario/features/events.feature +++ b/test/ts-scenario/features/events.feature @@ -88,3 +88,21 @@ Feature: Node SDK Events When I unregister the listener named privateBlockListener And I use the gateway named event_gateway to submit a total of 5 transactions with args [privateValuePut] for contract events instantiated on channel eventschannel Then I receive 0 events from the listener named privateBlockListener + + Scenario: Checkpoint block event listening + When I use the gateway named event_gateway to listen for full block events with a new file checkpoint listener named checkpointBlockListener on channel eventschannel + When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel + Then I receive a minimum 1 events from the listener named checkpointBlockListener + When I unregister the listener named checkpointBlockListener + When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel + When I use the gateway named event_gateway to listen for full block events with an existing file checkpoint listener named checkpointBlockListener on channel eventschannel + Then I receive a minimum 1 events from the listener named checkpointBlockListener + + Scenario: Checkpoint contract event listening + When I use the gateway named event_gateway to listen for full contract events named create with a new file checkpoint listener named checkpointContractListener for the smart contract named events on channel eventschannel + And I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel + Then the listener named checkpointContractListener should have contract events with payload containing "createValueTransactionContent" + When I unregister the listener named checkpointContractListener + And I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel + And I use the gateway named event_gateway to listen for full contract events named create with an existing file checkpoint listener named checkpointContractListener for the smart contract named events on channel eventschannel + Then the listener named checkpointContractListener should have contract events with payload containing "createValueTransactionContent" diff --git a/test/ts-scenario/steps/event-listeners.ts b/test/ts-scenario/steps/event-listeners.ts index b37bfe9e51..7a1bba3fc9 100644 --- a/test/ts-scenario/steps/event-listeners.ts +++ b/test/ts-scenario/steps/event-listeners.ts @@ -8,7 +8,7 @@ import { Constants } from './constants'; import * as Listeners from './lib/listeners'; import { Given, Then, When } from 'cucumber'; -import { EventType } from 'fabric-network'; +import { EventType, ListenerOptions } from 'fabric-network'; Given(/^I am listening for (filtered|full) contract events named (.+?) with a listener named (.+?)$/, {timeout: Constants.STEP_SHORT as number }, async (type: EventType, eventName: string, listenerName: string) => { const isActive: boolean = true; @@ -27,20 +27,66 @@ Given(/^I am listening for transaction events with a listener named (.+?)$/, {ti // Contract events When(/^I use the gateway named (.+?) to listen for (filtered|full) contract events named (.+?) with a listener named (.+?) for the smart contract named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, eventName: string, listenerName: string, ccName: string, channelName: string) => { - return await Listeners.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, eventType); + const options: ListenerOptions = { + type: eventType + }; + return await Listeners.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, options); }); When(/^I use the gateway named (.+?) to replay (filtered|full) contract events named (.+?) from starting block ([0-9]+) with a listener named (.+?) for the smart contract named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, eventName: string, startBlock: number, listenerName: string, ccName: string, channelName: string) => { - return await Listeners.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, eventType, startBlock); + const options: ListenerOptions = { + type: eventType, + startBlock + }; + return await Listeners.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, options); +}); + +When(/^I use the gateway named (.+?) to listen for (filtered|full) contract events named (.+?) with a new file checkpoint listener named (.+?) for the smart contract named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, eventName: string, listenerName: string, ccName: string, channelName: string) => { + const options: ListenerOptions = { + type: eventType, + checkpointer: await Listeners.newFileCheckpointer() + }; + return await Listeners.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, options); +}); + +When(/^I use the gateway named (.+?) to listen for (filtered|full) contract events named (.+?) with an existing file checkpoint listener named (.+?) for the smart contract named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, eventName: string, listenerName: string, ccName: string, channelName: string) => { + const options: ListenerOptions = { + type: eventType, + checkpointer: await Listeners.getFileCheckpointer() + }; + return await Listeners.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, options); }); // Block events When(/^I use the gateway named (.+?) to listen for (filtered|full|private) block events with a listener named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, listenerName: string, channelName: string) => { - return await Listeners.createBlockListener(gatewayName, channelName, listenerName, eventType); + const options: ListenerOptions = { + type: eventType + }; + return await Listeners.createBlockListener(gatewayName, channelName, listenerName, options); }); When(/^I use the gateway named (.+?) to listen for (filtered|full|private) block events between ([0-9]+) and ([0-9]+) with a listener named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, startBlock: number, endBlock: number, listenerName: string, channelName: string) => { - return await Listeners.createBlockListener(gatewayName, channelName, listenerName, eventType, startBlock, endBlock); + const options: ListenerOptions = { + type: eventType, + startBlock + }; + return await Listeners.createBlockListener(gatewayName, channelName, listenerName, options, endBlock); +}); + +When(/^I use the gateway named (.+?) to listen for (filtered|full|private) block events with a new file checkpoint listener named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, listenerName: string, channelName: string) => { + const options: ListenerOptions = { + type: eventType, + checkpointer: await Listeners.newFileCheckpointer() + }; + return await Listeners.createBlockListener(gatewayName, channelName, listenerName, options); +}); + +When(/^I use the gateway named (.+?) to listen for (filtered|full|private) block events with an existing file checkpoint listener named (.+?) on channel (.+?)$/, {timeout: Constants.STEP_SHORT as number}, async (gatewayName: string, eventType: EventType, listenerName: string, channelName: string) => { + const options: ListenerOptions = { + type: eventType, + checkpointer: await Listeners.getFileCheckpointer() + }; + return await Listeners.createBlockListener(gatewayName, channelName, listenerName, options); }); // Unregister diff --git a/test/ts-scenario/steps/lib/listeners.ts b/test/ts-scenario/steps/lib/listeners.ts index fdecff6830..4ccede8b6b 100644 --- a/test/ts-scenario/steps/lib/listeners.ts +++ b/test/ts-scenario/steps/lib/listeners.ts @@ -4,58 +4,53 @@ 'use strict'; -import { BlockEvent, BlockListener, Contract, ContractEvent, ContractListener, EventType, Gateway, ListenerOptions, Network, TransactionEvent } from 'fabric-network'; +import { BlockEvent, BlockListener, Checkpointer, Contract, ContractEvent, ContractListener, EventType, Gateway, ListenerOptions, Network, DefaultCheckpointers } from 'fabric-network'; import { Constants } from '../constants'; import * as GatewayHelper from './gateway'; import * as BaseUtils from './utility/baseUtils'; import { StateStore } from './utility/stateStore'; +import Long = require('long'); +import fs = require('fs'); +import path = require('path'); +import os = require('os'); const stateStore: StateStore = StateStore.getInstance(); +const CHECKPOINT_FILE_KEY = 'checkpointFile'; -export async function createContractListener(gatewayName: string, channelName: string, ccName: string, eventName: string, listenerName: string, type: EventType, startBlock?: number): Promise { +export async function createContractListener(gatewayName: string, channelName: string, ccName: string, eventName: string, listenerName: string, listenerOptions: ListenerOptions): Promise { const gateways: Map = stateStore.get(Constants.GATEWAYS); const gateway: Gateway = gateways.get(gatewayName).gateway; const contract: Contract = await GatewayHelper.retrieveContractFromGateway(gateway, channelName, ccName); - const listenerObject: any = { - active: true, - eventName, - eventType: type, - listener: {}, - payloads: [], - type: Constants.CONTRACT, - }; + const payloads: ContractEvent[] = []; - const contractListener: ContractListener = async (event: ContractEvent) => { + const listener: ContractListener = async (event: ContractEvent) => { BaseUtils.logMsg(`-> Received a contract event for listener [${listenerName}] of eventName ${eventName}`); if (event.eventName === eventName) { - listenerObject.payloads.push(event); + payloads.push(event); } }; - const listenerOptions: ListenerOptions = { - startBlock, - type - }; - await contract.addContractListener(contractListener, listenerOptions); + await contract.addContractListener(listener, listenerOptions); - // Roll into a listener object to store - listenerObject.listener = contractListener; - listenerObject.remove = () => contract.removeContractListener(contractListener); + const listenerObject: any = { + active: true, + eventName, + eventType: listenerOptions.type, + listener, + payloads, + type: Constants.CONTRACT, + remove: () => contract.removeContractListener(listener) + }; putListenerObject(listenerName, listenerObject); } -export async function createBlockListener(gatewayName: string, channelName: string, listenerName: string, type: EventType, startBlock?: number, endBlock?: number): Promise { +export async function createBlockListener(gatewayName: string, channelName: string, listenerName: string, listenerOptions: ListenerOptions, endBlock?: number): Promise { const gateways: Map = stateStore.get(Constants.GATEWAYS); const gateway: Gateway = gateways.get(gatewayName).gateway; const network: Network = await gateway.getNetwork(channelName); - const listenerObject: any = { - active: true, - eventType: type, - listener: {}, - payloads: [], - type: Constants.BLOCK - }; + const payloads: BlockEvent[] = []; + const startBlock = listenerOptions.startBlock ? Long.fromValue(listenerOptions.startBlock).toNumber() : undefined; // Create the listener const listener: BlockListener = async (blockEvent: BlockEvent) => { @@ -67,7 +62,7 @@ export async function createBlockListener(gatewayName: string, channelName: stri BaseUtils.checkSizeEquality(blockEvent.blockNumber.toNumber(), endBlock + 1, false, true); } - listenerObject.payloads.push(blockEvent); + payloads.push(blockEvent); BaseUtils.logMsg('->Received a block event - added blockevent to payloads', listenerName); const transactionEvents = blockEvent.getTransactionEvents(); for (const transactionEvent of transactionEvents) { @@ -80,19 +75,37 @@ export async function createBlockListener(gatewayName: string, channelName: stri network.removeBlockListener(listener); } }; - const listenerOptions: ListenerOptions = { - startBlock, - type - }; await network.addBlockListener(listener, listenerOptions); - // Roll into a listener object to store - listenerObject.listener = listener; - listenerObject.remove = () => network.removeBlockListener(listener); + const listenerObject: any = { + active: true, + eventType: listenerOptions.type, + listener, + payloads, + type: Constants.BLOCK, + remove: () => network.removeBlockListener(listener) + }; putListenerObject(listenerName, listenerObject); BaseUtils.logMsg('->Stored a block event listener:', listenerName); } +export async function newFileCheckpointer(): Promise { + const prefix = os.tmpdir + path.sep; + const tmpDir = await fs.promises.mkdtemp(prefix); + const file = path.join(tmpDir, 'checkpoint.json'); + const checkpointer = await DefaultCheckpointers.file(file); + stateStore.set(CHECKPOINT_FILE_KEY, file); + return checkpointer; +} + +export async function getFileCheckpointer(): Promise { + const file = stateStore.get(CHECKPOINT_FILE_KEY); + if (!file) { + throw new Error('Checkpointer does not exist'); + } + return await DefaultCheckpointers.file(file); +} + function getListenerObject(listenerName: string): any { const listener = getListeners().get(listenerName); if (!listener) {