From 9ffc037b323ee35aba0716907f5caf608997a3df Mon Sep 17 00:00:00 2001 From: Liam Grace Date: Tue, 2 Apr 2019 09:57:52 +0100 Subject: [PATCH] [FABN-1100] Checkpoint after callback is complete Change-Id: I0251af6ca3a8559baac09c458a28173c91ed752a Signed-off-by: Liam Grace --- .../lib/impl/event/blockeventlistener.js | 10 +++++----- .../lib/impl/event/contracteventlistener.js | 8 ++++++-- .../test/impl/event/basecheckpointer.js | 8 ++++++++ .../test/impl/event/blockeventlistener.js | 18 +++++++++--------- .../test/impl/event/contracteventlistener.js | 2 +- .../test/impl/event/filesystemcheckpointer.js | 8 ++++++++ fabric-network/types/index.d.ts | 8 ++++---- 7 files changed, 41 insertions(+), 21 deletions(-) diff --git a/fabric-network/lib/impl/event/blockeventlistener.js b/fabric-network/lib/impl/event/blockeventlistener.js index d2dfb8bcc9..65a4e227f9 100644 --- a/fabric-network/lib/impl/event/blockeventlistener.js +++ b/fabric-network/lib/impl/event/blockeventlistener.js @@ -58,17 +58,17 @@ class BlockEventListener extends AbstractEventListener { * @param {*} block Either a full or filtered block * @private */ - _onEvent(block) { + async _onEvent(block) { const blockNumber = Number(block.number); try { - this.eventCallback(null, block); + await this.eventCallback(null, block); + if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) { + await this.checkpointer.save(null, blockNumber); + } } catch (err) { logger.error(util.format('Error executing callback: %s', err)); } - if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) { - this.checkpointer.save(null, blockNumber); - } if (this._registration.unregister) { this.unregister(); } diff --git a/fabric-network/lib/impl/event/contracteventlistener.js b/fabric-network/lib/impl/event/contracteventlistener.js index bbffe80edd..83dce730c1 100644 --- a/fabric-network/lib/impl/event/contracteventlistener.js +++ b/fabric-network/lib/impl/event/contracteventlistener.js @@ -74,17 +74,21 @@ class ContractEventListener extends AbstractEventListener { async _onEvent(event, blockNumber, transactionId, status) { logger.debug(`_onEvent[${this.listenerName}]:`, util.format('success for transaction %s', transactionId)); blockNumber = Number(blockNumber); + let useCheckpoint = false; if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) { const checkpoint = await this.checkpointer.load(); + useCheckpoint = true; if (checkpoint && checkpoint.transactionIds && checkpoint.transactionIds.includes(transactionId)) { logger.debug(util.format('_onEvent skipped transaction: %s', transactionId)); return; } - await this.checkpointer.save(transactionId, blockNumber); } try { - this.eventCallback(null, event, blockNumber, transactionId, status); + await this.eventCallback(null, event, blockNumber, transactionId, status); + if (useCheckpoint) { + await this.checkpointer.save(transactionId, blockNumber); + } } catch (err) { logger.debug(util.format('_onEvent error from callback: %s', err)); } diff --git a/fabric-network/test/impl/event/basecheckpointer.js b/fabric-network/test/impl/event/basecheckpointer.js index 2df881e6e2..3f91ce4a9f 100644 --- a/fabric-network/test/impl/event/basecheckpointer.js +++ b/fabric-network/test/impl/event/basecheckpointer.js @@ -38,4 +38,12 @@ describe('BaseCheckpointer', () => { expect(checkpointer.load()).to.be.rejectedWith('Method has not been implemented'); }); }); + + describe('#setChaincodeId', () => { + it('should set the chaincodeId', () => { + const checkpointer = new BaseCheckpointer(); + checkpointer.setChaincodeId('CHAINCODE_ID'); + expect(checkpointer._chaincodeId).to.equal('CHAINCODE_ID'); + }); + }); }); diff --git a/fabric-network/test/impl/event/blockeventlistener.js b/fabric-network/test/impl/event/blockeventlistener.js index 92a7a70432..18a86f5fa0 100644 --- a/fabric-network/test/impl/event/blockeventlistener.js +++ b/fabric-network/test/impl/event/blockeventlistener.js @@ -93,36 +93,36 @@ describe('BlockEventListener', () => { sandbox.stub(blockEventListener, 'eventCallback'); }); - it('should call the event callback', () => { + it('should call the event callback', async () => { const block = {number: '10'}; - blockEventListener._onEvent(block); + await blockEventListener._onEvent(block); sinon.assert.calledWith(blockEventListener.eventCallback, null, block); sinon.assert.notCalled(checkpointerStub.save); sinon.assert.notCalled(blockEventListener.unregister); }); - it('should save a checkpoint', () => { + it('should save a checkpoint', async () => { const block = {number: '10'}; blockEventListener.checkpointer = checkpointerStub; - blockEventListener._onEvent(block); + await blockEventListener._onEvent(block); sinon.assert.calledWith(checkpointerStub.save, null, 10); }); - it('should unregister if registration.unregister is set', () => { + it('should unregister if registration.unregister is set', async () => { const block = {number: '10'}; blockEventListener._registration.unregister = true; - blockEventListener._onEvent(block); + await blockEventListener._onEvent(block); sinon.assert.calledWith(blockEventListener.eventCallback, null, block); sinon.assert.called(blockEventListener.unregister); }); - it ('should not save a checkpoint if the callback fails', () => { + it ('should not save a checkpoint if the callback fails', async () => { const block = {number: '10'}; blockEventListener.eventCallback.throws(new Error()); blockEventListener.checkpointer = checkpointerStub; - blockEventListener._onEvent(block); + await blockEventListener._onEvent(block); sinon.assert.calledWith(blockEventListener.eventCallback, null, block); - sinon.assert.calledWith(checkpointerStub.save, null, 10); + sinon.assert.notCalled(checkpointerStub.save); }); }); diff --git a/fabric-network/test/impl/event/contracteventlistener.js b/fabric-network/test/impl/event/contracteventlistener.js index 03dfa0d90e..c56115772d 100644 --- a/fabric-network/test/impl/event/contracteventlistener.js +++ b/fabric-network/test/impl/event/contracteventlistener.js @@ -149,7 +149,7 @@ describe('ContractEventListener', () => { contractEventListener.options.replay = true; await contractEventListener._onEvent(event, blockNumber, transactionId, status); sinon.assert.calledWith(contractEventListener.eventCallback, null, event, Number(blockNumber), transactionId, status); - sinon.assert.calledWith(checkpointerStub.save, 'transactionId', 10); + sinon.assert.notCalled(checkpointerStub.save); }); it('should skip a transaction if it is in the checkpoint', async () => { diff --git a/fabric-network/test/impl/event/filesystemcheckpointer.js b/fabric-network/test/impl/event/filesystemcheckpointer.js index b9177f8959..6ac9bb0df6 100644 --- a/fabric-network/test/impl/event/filesystemcheckpointer.js +++ b/fabric-network/test/impl/event/filesystemcheckpointer.js @@ -139,4 +139,12 @@ describe('FileSystemCheckpointer', () => { expect(loadedCheckpoint).to.deep.equal({}); }); }); + + describe('#_getCheckpointFileName', () => { + it('should return a file path including the chaincode ID', () => { + const chaincodeId = 'CHAINCODE_ID'; + checkpointer.setChaincodeId(chaincodeId); + expect(checkpointer._getCheckpointFileName()).to.equal(`home/.hlf-checkpoint/${channelName}/${chaincodeId}/${listenerName}`); + }); + }); }); diff --git a/fabric-network/types/index.d.ts b/fabric-network/types/index.d.ts index da547f07b6..448582323b 100644 --- a/fabric-network/types/index.d.ts +++ b/fabric-network/types/index.d.ts @@ -89,15 +89,15 @@ export class Gateway { export interface Network { getChannel(): Channel; getContract(chaincodeId: string, name?: string): Contract; - addBlockListener(listenerName: string, callback: (block: Client.Block) => void, options?: object): Promise; - addCommitListener(listenerName: string, callback: (error: Error, transactionId: string, status: string, blockNumber: string) => void, options?: object): Promise; + addBlockListener(listenerName: string, callback: (block: Client.Block) => Promise, options?: object): Promise; + addCommitListener(listenerName: string, callback: (error: Error, transactionId: string, status: string, blockNumber: string) => Promise, options?: object): Promise; } export interface Contract { createTransaction(name: string): Transaction; evaluateTransaction(name: string, ...args: string[]): Promise; submitTransaction(name: string, ...args: string[]): Promise; - addContractListener(listenerName: string, eventName: string, callback: (error: Error, event: {[key: string]: any}, blockNumber: string, transactionId: string, status: string) => void, options?: object): Promise; + addContractListener(listenerName: string, eventName: string, callback: (error: Error, event: {[key: string]: any}, blockNumber: string, transactionId: string, status: string) => Promise, options?: object): Promise; } export interface TransientMap { @@ -110,7 +110,7 @@ export interface Transaction { getNetwork(): Network; setTransient(transientMap: TransientMap): this; submit(...args: string[]): Promise; - addCommitListener(callback: (error: Error, transactionId: string, status: string, blockNumber: string) => void, options: object, eventHub?: Client.ChannelEventHub): Promise; + addCommitListener(callback: (error: Error, transactionId: string, status: string, blockNumber: string) => Promise, options: object, eventHub?: Client.ChannelEventHub): Promise; } export interface FabricError extends Error {