Skip to content

Commit

Permalink
[FABN-1100] Checkpoint after callback is complete
Browse files Browse the repository at this point in the history
Change-Id: I0251af6ca3a8559baac09c458a28173c91ed752a
Signed-off-by: Liam Grace <[email protected]>
  • Loading branch information
liam-grace committed Apr 2, 2019
1 parent 1dc025c commit 9ffc037
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 21 deletions.
10 changes: 5 additions & 5 deletions fabric-network/lib/impl/event/blockeventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ class BlockEventListener extends AbstractEventListener {
* @param {*} block Either a full or filtered block
* @private
*/
_onEvent(block) {
async _onEvent(block) {
const blockNumber = Number(block.number);

try {
this.eventCallback(null, block);
await this.eventCallback(null, block);
if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) {
await this.checkpointer.save(null, blockNumber);
}
} catch (err) {
logger.error(util.format('Error executing callback: %s', err));
}
if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) {
this.checkpointer.save(null, blockNumber);
}
if (this._registration.unregister) {
this.unregister();
}
Expand Down
8 changes: 6 additions & 2 deletions fabric-network/lib/impl/event/contracteventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,21 @@ class ContractEventListener extends AbstractEventListener {
async _onEvent(event, blockNumber, transactionId, status) {
logger.debug(`_onEvent[${this.listenerName}]:`, util.format('success for transaction %s', transactionId));
blockNumber = Number(blockNumber);
let useCheckpoint = false;
if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) {
const checkpoint = await this.checkpointer.load();
useCheckpoint = true;
if (checkpoint && checkpoint.transactionIds && checkpoint.transactionIds.includes(transactionId)) {
logger.debug(util.format('_onEvent skipped transaction: %s', transactionId));
return;
}
await this.checkpointer.save(transactionId, blockNumber);
}

try {
this.eventCallback(null, event, blockNumber, transactionId, status);
await this.eventCallback(null, event, blockNumber, transactionId, status);
if (useCheckpoint) {
await this.checkpointer.save(transactionId, blockNumber);
}
} catch (err) {
logger.debug(util.format('_onEvent error from callback: %s', err));
}
Expand Down
8 changes: 8 additions & 0 deletions fabric-network/test/impl/event/basecheckpointer.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ describe('BaseCheckpointer', () => {
expect(checkpointer.load()).to.be.rejectedWith('Method has not been implemented');
});
});

describe('#setChaincodeId', () => {
it('should set the chaincodeId', () => {
const checkpointer = new BaseCheckpointer();
checkpointer.setChaincodeId('CHAINCODE_ID');
expect(checkpointer._chaincodeId).to.equal('CHAINCODE_ID');
});
});
});
18 changes: 9 additions & 9 deletions fabric-network/test/impl/event/blockeventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,36 +93,36 @@ describe('BlockEventListener', () => {
sandbox.stub(blockEventListener, 'eventCallback');
});

it('should call the event callback', () => {
it('should call the event callback', async () => {
const block = {number: '10'};
blockEventListener._onEvent(block);
await blockEventListener._onEvent(block);
sinon.assert.calledWith(blockEventListener.eventCallback, null, block);
sinon.assert.notCalled(checkpointerStub.save);
sinon.assert.notCalled(blockEventListener.unregister);
});

it('should save a checkpoint', () => {
it('should save a checkpoint', async () => {
const block = {number: '10'};
blockEventListener.checkpointer = checkpointerStub;
blockEventListener._onEvent(block);
await blockEventListener._onEvent(block);
sinon.assert.calledWith(checkpointerStub.save, null, 10);
});

it('should unregister if registration.unregister is set', () => {
it('should unregister if registration.unregister is set', async () => {
const block = {number: '10'};
blockEventListener._registration.unregister = true;
blockEventListener._onEvent(block);
await blockEventListener._onEvent(block);
sinon.assert.calledWith(blockEventListener.eventCallback, null, block);
sinon.assert.called(blockEventListener.unregister);
});

it ('should not save a checkpoint if the callback fails', () => {
it ('should not save a checkpoint if the callback fails', async () => {
const block = {number: '10'};
blockEventListener.eventCallback.throws(new Error());
blockEventListener.checkpointer = checkpointerStub;
blockEventListener._onEvent(block);
await blockEventListener._onEvent(block);
sinon.assert.calledWith(blockEventListener.eventCallback, null, block);
sinon.assert.calledWith(checkpointerStub.save, null, 10);
sinon.assert.notCalled(checkpointerStub.save);
});
});

Expand Down
2 changes: 1 addition & 1 deletion fabric-network/test/impl/event/contracteventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ describe('ContractEventListener', () => {
contractEventListener.options.replay = true;
await contractEventListener._onEvent(event, blockNumber, transactionId, status);
sinon.assert.calledWith(contractEventListener.eventCallback, null, event, Number(blockNumber), transactionId, status);
sinon.assert.calledWith(checkpointerStub.save, 'transactionId', 10);
sinon.assert.notCalled(checkpointerStub.save);
});

it('should skip a transaction if it is in the checkpoint', async () => {
Expand Down
8 changes: 8 additions & 0 deletions fabric-network/test/impl/event/filesystemcheckpointer.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,12 @@ describe('FileSystemCheckpointer', () => {
expect(loadedCheckpoint).to.deep.equal({});
});
});

describe('#_getCheckpointFileName', () => {
it('should return a file path including the chaincode ID', () => {
const chaincodeId = 'CHAINCODE_ID';
checkpointer.setChaincodeId(chaincodeId);
expect(checkpointer._getCheckpointFileName()).to.equal(`home/.hlf-checkpoint/${channelName}/${chaincodeId}/${listenerName}`);
});
});
});
8 changes: 4 additions & 4 deletions fabric-network/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ export class Gateway {
export interface Network {
getChannel(): Channel;
getContract(chaincodeId: string, name?: string): Contract;
addBlockListener(listenerName: string, callback: (block: Client.Block) => void, options?: object): Promise<BlockEventListener>;
addCommitListener(listenerName: string, callback: (error: Error, transactionId: string, status: string, blockNumber: string) => void, options?: object): Promise<CommitEventListener>;
addBlockListener(listenerName: string, callback: (block: Client.Block) => Promise<any>, options?: object): Promise<BlockEventListener>;
addCommitListener(listenerName: string, callback: (error: Error, transactionId: string, status: string, blockNumber: string) => Promise<any>, options?: object): Promise<CommitEventListener>;
}

export interface Contract {
createTransaction(name: string): Transaction;
evaluateTransaction(name: string, ...args: string[]): Promise<Buffer>;
submitTransaction(name: string, ...args: string[]): Promise<Buffer>;
addContractListener(listenerName: string, eventName: string, callback: (error: Error, event: {[key: string]: any}, blockNumber: string, transactionId: string, status: string) => void, options?: object): Promise<ContractEventListener>;
addContractListener(listenerName: string, eventName: string, callback: (error: Error, event: {[key: string]: any}, blockNumber: string, transactionId: string, status: string) => Promise<any>, options?: object): Promise<ContractEventListener>;
}

export interface TransientMap {
Expand All @@ -110,7 +110,7 @@ export interface Transaction {
getNetwork(): Network;
setTransient(transientMap: TransientMap): this;
submit(...args: string[]): Promise<Buffer>;
addCommitListener(callback: (error: Error, transactionId: string, status: string, blockNumber: string) => void, options: object, eventHub?: Client.ChannelEventHub): Promise<CommitEventListener>;
addCommitListener(callback: (error: Error, transactionId: string, status: string, blockNumber: string) => Promise<any>, options: object, eventHub?: Client.ChannelEventHub): Promise<CommitEventListener>;
}

export interface FabricError extends Error {
Expand Down

0 comments on commit 9ffc037

Please sign in to comment.