diff --git a/fabric-network/src/checkpointer.ts b/fabric-network/src/checkpointer.ts index 361aa379dc..c95a17179f 100644 --- a/fabric-network/src/checkpointer.ts +++ b/fabric-network/src/checkpointer.ts @@ -8,7 +8,7 @@ import Long = require('long'); export interface Checkpointer { addTransactionId(transactionId: string): Promise; - getBlockNumber(): Long | undefined; - getTransactionIds(): Set; + getBlockNumber(): Promise; + getTransactionIds(): Promise>; setBlockNumber(blockNumber: Long): Promise; } diff --git a/fabric-network/src/defaultcheckpointers.ts b/fabric-network/src/defaultcheckpointers.ts new file mode 100644 index 0000000000..02e0062986 --- /dev/null +++ b/fabric-network/src/defaultcheckpointers.ts @@ -0,0 +1,29 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Checkpointer } from './checkpointer'; + +// TODO: Implement, remove @private tag, and export in both index.js and index.d.ts + +/** + * Provides static factory functions used to create instances of default + * {@link module:fabric-network.Checkpointer|Checkpointer} implementations. + * @private + */ +export class DefaultCheckpointers { + /** + * Create a checkpointer that uses the specified file to store persistent state. If the file does not exist, it will + * be created and the checkpointer will have an uninitialized state that will accept any events. If the file does + * exist, it must contain valid checkpointer state. + * @param {string} path Path to a file holding persistent checkpoint state. + * @returns {Promise} A checkpointer. + */ + static async file(path: string): Promise { + throw new Error('Not yet implemented'); + // TODO: Create file system checkpointer and call async initialization to load initial state asynchronously + // from the file. + } +} diff --git a/fabric-network/src/impl/event/contractlistenersession.ts b/fabric-network/src/impl/event/contractlistenersession.ts index 0319b4c8bb..c78d622507 100644 --- a/fabric-network/src/impl/event/contractlistenersession.ts +++ b/fabric-network/src/impl/event/contractlistenersession.ts @@ -8,7 +8,7 @@ import { BlockEvent, BlockListener, ContractEvent, ContractListener, ListenerOpt import * as Logger from '../../logger'; import { Network } from '../../network'; import { ListenerSession } from './listenersession'; -import * as GatewayUtils from '../gatewayutils'; +import * as Listeners from './listeners'; const logger = Logger.getLogger('ContractListenerSession'); export class ContractListenerSession implements ListenerSession { @@ -22,7 +22,7 @@ export class ContractListenerSession implements ListenerSession { this.listener = listener; this.chaincodeId = chaincodeId; this.network = network; - this.blockListener = (blockEvent: BlockEvent) => this.onBlockEvent(blockEvent); + this.blockListener = this.newBlockListener(options); this.options = options; } @@ -34,20 +34,14 @@ export class ContractListenerSession implements ListenerSession { this.network.removeBlockListener(this.blockListener); } - private async onBlockEvent(blockEvent: BlockEvent): Promise { - const transactionPromises = blockEvent.getTransactionEvents() - .filter((transactionEvent) => transactionEvent.isValid) - .map((transactionEvent) => this.onTransactionEvent(transactionEvent)); - - // Don't use Promise.all() as it returns early if any promises are rejected - await GatewayUtils.allSettled(transactionPromises); + private newBlockListener(options?: ListenerOptions): BlockListener { + const callback = this.onContractEvent.bind(this); + return Listeners.blockFromContractListener(callback, options?.checkpointer); } - private async onTransactionEvent(transactionEvent: TransactionEvent): Promise { - for (const contractEvent of transactionEvent.getContractEvents()) { - if (this.isMatch(contractEvent)) { - await this.notifyListener(contractEvent); - } + private async onContractEvent(event: ContractEvent): Promise { + if (this.isMatch(event)) { + await this.notifyListener(event); } } diff --git a/fabric-network/src/impl/event/listeners.ts b/fabric-network/src/impl/event/listeners.ts index 3bcaed3959..d47a31620c 100644 --- a/fabric-network/src/impl/event/listeners.ts +++ b/fabric-network/src/impl/event/listeners.ts @@ -5,15 +5,73 @@ */ import { Checkpointer } from '../../checkpointer'; -import { BlockListener } from '../../events'; - -export function newCheckpointBlockListener(listener: BlockListener, checkpointer: Checkpointer): BlockListener { - return async (event) => { - const checkpointBlockNumber = checkpointer.getBlockNumber(); - if (!checkpointBlockNumber || checkpointBlockNumber.equals(event.blockNumber)) { - await listener(event); - const nextBlockNumber = event.blockNumber.add(1); +import { BlockListener, TransactionEvent, ContractListener } from '../../events'; +import * as GatewayUtils from '../gatewayutils'; +import * as Logger from '../../logger'; + +const logger = Logger.getLogger('Listener'); + +type TransactionListener = (event: TransactionEvent) => Promise; + +export function checkpointBlockListener(listener: BlockListener, checkpointer: Checkpointer): BlockListener { + return async (blockEvent) => { + const checkpointBlockNumber = await checkpointer.getBlockNumber(); + if (!checkpointBlockNumber || checkpointBlockNumber.equals(blockEvent.blockNumber)) { + await listener(blockEvent); + const nextBlockNumber = blockEvent.blockNumber.add(1); await checkpointer.setBlockNumber(nextBlockNumber); } }; } + +export function blockFromContractListener(listener: ContractListener, checkpointer?: Checkpointer): BlockListener { + if (checkpointer) { + const transactionListener = transactionFromContractListener(listener); + const checkpointTxListener = checkpointTransactionListener(transactionListener, checkpointer); + const blockListener = blockFromTransactionListener(checkpointTxListener); + return checkpointBlockListener(blockListener, checkpointer); + } else { + const transactionListener = transactionFromContractListener(listener); + return blockFromTransactionListener(transactionListener); + } +} + +function transactionFromContractListener(listener: ContractListener): TransactionListener { + return async (transactionEvent) => { + for (const contractEvent of transactionEvent.getContractEvents()) { + await listener(contractEvent); + } + }; +} + +function checkpointTransactionListener(listener: TransactionListener, checkpointer: Checkpointer): TransactionListener { + return async (transactionEvent) => { + const checkpointTransactionIds = await checkpointer.getTransactionIds(); + if (!checkpointTransactionIds.has(transactionEvent.transactionId)) { + await listener(transactionEvent); + await checkpointer.addTransactionId(transactionEvent.transactionId); + } + }; +} + +function blockFromTransactionListener(listener: TransactionListener): BlockListener { + return async (blockEvent) => { + const transactionPromises = blockEvent.getTransactionEvents() + .filter((transactionEvent) => transactionEvent.isValid) + .map((transactionEvent) => listener(transactionEvent)); + + // Don't use Promise.all() as it returns early if any promises are rejected + const results = await GatewayUtils.allSettled(transactionPromises); + logAndThrowErrors(results); + }; +} + +function logAndThrowErrors(results: GatewayUtils.SettledPromiseResult[]): void { + const errors = results + .filter((result) => result.status === 'rejected') + .map((result) => (result as GatewayUtils.RejectedPromiseResult).reason); + if (errors.length > 0) { + errors.forEach((error) => logger.warn('Error notifying transaction listener', error)); + throw new Error('Error notifying listener: ' + errors[0].stack || errors[0].message); + } +} diff --git a/fabric-network/src/impl/event/listenersession.ts b/fabric-network/src/impl/event/listenersession.ts index fafb73565c..2aeb0d7368 100644 --- a/fabric-network/src/impl/event/listenersession.ts +++ b/fabric-network/src/impl/event/listenersession.ts @@ -9,9 +9,9 @@ export interface ListenerSession { close(): void; } -export async function addListener(listener: T, listenerSessions: Map, sessionSupplier: () => ListenerSession) { +export async function addListener(listener: T, listenerSessions: Map, sessionSupplier: () => Promise) { if (!listenerSessions.has(listener)) { - const session = sessionSupplier(); + const session = await sessionSupplier(); // Store listener before starting in case start fires error events that trigger remove of the listener listenerSessions.set(listener, session); await session.start(); diff --git a/fabric-network/src/network.ts b/fabric-network/src/network.ts index 5e5dcae165..c0204dca20 100644 --- a/fabric-network/src/network.ts +++ b/fabric-network/src/network.ts @@ -12,7 +12,7 @@ import { BlockEventSource } from './impl/event/blockeventsource'; import { CommitListenerSession } from './impl/event/commitlistenersession'; import { EventServiceManager } from './impl/event/eventservicemanager'; import { IsolatedBlockListenerSession } from './impl/event/isolatedblocklistenersession'; -import { newCheckpointBlockListener } from './impl/event/listeners'; +import { checkpointBlockListener } from './impl/event/listeners'; import { addListener, ListenerSession, removeListener } from './impl/event/listenersession'; import { SharedBlockListenerSession } from './impl/event/sharedblocklistenersession'; import { QueryHandlerFactory } from './impl/query/queryhandler'; @@ -22,13 +22,13 @@ import Gateway = require('./gateway'); const logger = Logger.getLogger('Network'); -function listenerOptionsWithDefaults(options: ListenerOptions): ListenerOptions { +async function listenerOptionsWithDefaults(options: ListenerOptions): Promise { const defaultOptions = { type: 'full' }; const result = Object.assign(defaultOptions, options); - const checkpointBlock = options.checkpointer?.getBlockNumber(); + const checkpointBlock = await options.checkpointer?.getBlockNumber(); if (checkpointBlock) { result.startBlock = checkpointBlock; } @@ -223,7 +223,7 @@ export class NetworkImpl implements Network { } async addCommitListener(listener: CommitListener, peers: Endorser[], transactionId: string) { - const sessionSupplier = () => new CommitListenerSession(listener, this.eventServiceManager, peers, transactionId); + const sessionSupplier = async () => new CommitListenerSession(listener, this.eventServiceManager, peers, transactionId); return await addListener(listener, this.commitListeners, sessionSupplier); } @@ -232,7 +232,7 @@ export class NetworkImpl implements Network { } async addBlockListener(listener: BlockListener, options = {} as ListenerOptions) { - const sessionSupplier = () => this.newBlockListenerSession(listener, options); + const sessionSupplier = async () => await this.newBlockListenerSession(listener, options); return await addListener(listener, this.blockListeners, sessionSupplier); } @@ -240,11 +240,11 @@ export class NetworkImpl implements Network { removeListener(listener, this.blockListeners); } - private newBlockListenerSession(listener: BlockListener, options: ListenerOptions) { - options = listenerOptionsWithDefaults(options); + private async newBlockListenerSession(listener: BlockListener, options: ListenerOptions): Promise { + options = await listenerOptionsWithDefaults(options); if (options.checkpointer) { - listener = newCheckpointBlockListener(listener, options.checkpointer); + listener = checkpointBlockListener(listener, options.checkpointer); } if (options.startBlock) { diff --git a/fabric-network/test/impl/event/blocklistener.ts b/fabric-network/test/impl/event/blocklistener.ts index eda54125cb..cac98d496e 100644 --- a/fabric-network/test/impl/event/blocklistener.ts +++ b/fabric-network/test/impl/event/blocklistener.ts @@ -18,7 +18,8 @@ import { StubEventService } from './stubeventservice'; import Long = require('long'); import Gateway = require('../../../src/gateway'); -import { InMemoryCheckpointer } from './inmemorycheckpointer'; +import { StubCheckpointer } from './stubcheckpointer'; +import { Checkpointer } from '../../../src/checkpointer'; interface StubBlockListener extends BlockListener { completePromise: Promise; @@ -104,458 +105,408 @@ describe('block listener', () => { }); } - it('add listener returns the listener', async () => { - const result = await network.addBlockListener(listener, listenerOptions); - expect(result).to.equal(listener); - }); - - it('listener receives events', async () => { - const event = newFilteredBlockEventInfo(1); - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event); - - const actual = await listener.completePromise; - expect(actual[0].blockNumber).to.equal(event.blockNumber); - }); - - it('removed listener does not receive events', async () => { - const removedListener = sinon.spy(testUtils.newAsyncListener()); - - await network.addBlockListener(listener, listenerOptions); - await network.addBlockListener(removedListener, listenerOptions); - network.removeBlockListener(removedListener); - eventService.sendEvent(newFilteredBlockEventInfo(1)); - - await listener.completePromise; - sinon.assert.notCalled(removedListener); - }); - - it('add listener multiple times has no effect', async () => { - const spy = sinon.spy(listener); - - await network.addBlockListener(spy, listenerOptions); - await network.addBlockListener(spy, listenerOptions); - eventService.sendEvent(newFilteredBlockEventInfo(1)); - - await listener.completePromise; - sinon.assert.calledOnce(spy); - }); - - it('remove listener multiple times has no effect', async () => { - const removedListener = sinon.spy(testUtils.newAsyncListener()); - - await network.addBlockListener(listener, listenerOptions); - await network.addBlockListener(removedListener, listenerOptions); - network.removeBlockListener(removedListener); - network.removeBlockListener(removedListener); - eventService.sendEvent(newFilteredBlockEventInfo(1)); + describe('common behavior', () => { + it('add listener returns the listener', async () => { + const result = await network.addBlockListener(listener, listenerOptions); + expect(result).to.equal(listener); + }); - await listener.completePromise; - sinon.assert.notCalled(removedListener); - }); + it('listener receives events', async () => { + const event = newFilteredBlockEventInfo(1); + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event); - it('listener can remove itself when receiving event', async () => { - listener = testUtils.newAsyncListener(2); - const fake = sinon.fake(async (event: BlockEvent) => { - network.removeBlockListener(fake); + const actual = await listener.completePromise; + expect(actual[0].blockNumber).to.equal(event.blockNumber); }); - await network.addBlockListener(listener, listenerOptions); - await network.addBlockListener(fake, listenerOptions); - eventService.sendEvent(newFilteredBlockEventInfo(1)); - // fake listener should have removed itself - eventService.sendEvent(newFilteredBlockEventInfo(2)); + it('removed listener does not receive events', async () => { + const removedListener = sinon.spy(testUtils.newAsyncListener()); - await listener.completePromise; - sinon.assert.calledOnce(fake); - }); + await network.addBlockListener(listener, listenerOptions); + await network.addBlockListener(removedListener, listenerOptions); + network.removeBlockListener(removedListener); + eventService.sendEvent(newFilteredBlockEventInfo(1)); - it('listener does not auto-unregister when receiving events', async () => { - listener = testUtils.newAsyncListener(2); - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); - - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event1); - eventService.sendEvent(event2); + await listener.completePromise; + sinon.assert.notCalled(removedListener); + }); - const actual = await listener.completePromise; - network.removeBlockListener(listener); + it('add listener multiple times has no effect', async () => { + const spy = sinon.spy(listener); - const blockNumbers = actual.map((e) => e.blockNumber); - expect(blockNumbers).to.deep.equal([event1.blockNumber, event2.blockNumber]); - }); + await network.addBlockListener(spy, listenerOptions); + await network.addBlockListener(spy, listenerOptions); + eventService.sendEvent(newFilteredBlockEventInfo(1)); - it('error thrown by listener does not stop subsequent events being delivered', async () => { - listener = testUtils.newAsyncListener(2); - const errorListener = sinon.fake(async (event) => { - await listener(event); - throw new Error('LISTENER_ERROR'); + await listener.completePromise; + sinon.assert.calledOnce(spy); }); - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); - - await network.addBlockListener(errorListener, listenerOptions); - eventService.sendEvent(event1); - eventService.sendEvent(event2); - - const actual = await listener.completePromise; - const blockNumbers = actual.map((e) => e.blockNumber); - expect(blockNumbers).to.deep.equal([event1.blockNumber, event2.blockNumber]); - }); - it('error thrown by listener does not stop other listeners being notified', async () => { - const listener2 = testUtils.newAsyncListener(); - const errorListener = sinon.fake.rejects(new Error('LISTENER_ERROR')); - const event = newFilteredBlockEventInfo(1); + it('remove listener multiple times has no effect', async () => { + const removedListener = sinon.spy(testUtils.newAsyncListener()); - await network.addBlockListener(listener, listenerOptions); - await network.addBlockListener(errorListener, listenerOptions); - await network.addBlockListener(listener2, listenerOptions); - eventService.sendEvent(event); + await network.addBlockListener(listener, listenerOptions); + await network.addBlockListener(removedListener, listenerOptions); + network.removeBlockListener(removedListener); + network.removeBlockListener(removedListener); + eventService.sendEvent(newFilteredBlockEventInfo(1)); - const [actual1] = await listener.completePromise; - const [actual2] = await listener2.completePromise; - expect(actual1.blockNumber).to.deep.equal(event.blockNumber); - expect(actual2.blockNumber).to.deep.equal(event.blockNumber); - }); + await listener.completePromise; + sinon.assert.notCalled(removedListener); + }); - it('listener receives blocks in order', async () => { - listener = testUtils.newAsyncListener(3); - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); - const event3 = newFilteredBlockEventInfo(3); + it('listener can remove itself when receiving event', async () => { + listener = testUtils.newAsyncListener(2); + const fake = sinon.fake(async (event: BlockEvent) => { + network.removeBlockListener(fake); + }); - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event1); - eventService.sendEvent(event3); - eventService.sendEvent(event2); + await network.addBlockListener(listener, listenerOptions); + await network.addBlockListener(fake, listenerOptions); + eventService.sendEvent(newFilteredBlockEventInfo(1)); + // fake listener should have removed itself + eventService.sendEvent(newFilteredBlockEventInfo(2)); - const actual = await listener.completePromise; - const blockNumbers = actual.map((e) => e.blockNumber); - expect(blockNumbers).to.deep.equal([event1.blockNumber, event2.blockNumber, event3.blockNumber]); - }); + await listener.completePromise; + sinon.assert.calledOnce(fake); + }); - it('listener does not receive old blocks', async () => { - listener = testUtils.newAsyncListener(2); - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); - const event3 = newFilteredBlockEventInfo(3); + it('listener does not auto-unregister when receiving events', async () => { + listener = testUtils.newAsyncListener(2); + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event2); - eventService.sendEvent(event1); // Ignored as older than first block received - eventService.sendEvent(event3); + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event1); + eventService.sendEvent(event2); - const actual = await listener.completePromise; - const blockNumbers = actual.map((e) => e.blockNumber); - expect(blockNumbers).to.deep.equal([event2.blockNumber, event3.blockNumber]); - }); + const actual = await listener.completePromise; + network.removeBlockListener(listener); - it('errors trigger reconnect of event service with no start block if no events received', async () => { - await network.addBlockListener(listener, listenerOptions); - const startListener = testUtils.newAsyncListener(); - const stub = sinon.stub(eventServiceManager, 'startEventService').callsFake(() => startListener()); + const blockNumbers = actual.map((e) => e.blockNumber); + expect(blockNumbers).to.deep.equal([event1.blockNumber, event2.blockNumber]); + }); - eventService.sendError(new Error('DISCONNECT')); + it('error thrown by listener does not stop subsequent events being delivered', async () => { + listener = testUtils.newAsyncListener(2); + const errorListener = sinon.fake(async (event) => { + await listener(event); + throw new Error('LISTENER_ERROR'); + }); + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); + + await network.addBlockListener(errorListener, listenerOptions); + eventService.sendEvent(event1); + eventService.sendEvent(event2); + + const actual = await listener.completePromise; + const blockNumbers = actual.map((e) => e.blockNumber); + expect(blockNumbers).to.deep.equal([event1.blockNumber, event2.blockNumber]); + }); - await startListener.completePromise; - sinon.assert.calledWith(stub, eventService); - sinon.assert.neverCalledWith(stub, sinon.match.any, sinon.match.has('startBlock', sinon.match.number)); - }); + it('error thrown by listener does not stop other listeners being notified', async () => { + const listener2 = testUtils.newAsyncListener(); + const errorListener = sinon.fake.rejects(new Error('LISTENER_ERROR')); + const event = newFilteredBlockEventInfo(1); - it('errors trigger reconnect of event service with next block as start block if events received', async () => { - await network.addBlockListener(listener, listenerOptions); - const startListener = testUtils.newAsyncListener(); - const stub = sinon.stub(eventServiceManager, 'startEventService').callsFake(() => startListener()); + await network.addBlockListener(listener, listenerOptions); + await network.addBlockListener(errorListener, listenerOptions); + await network.addBlockListener(listener2, listenerOptions); + eventService.sendEvent(event); - eventService.sendEvent(newFilteredBlockEventInfo(1)); - eventService.sendError(new Error('DISCONNECT')); + const [actual1] = await listener.completePromise; + const [actual2] = await listener2.completePromise; + expect(actual1.blockNumber).to.deep.equal(event.blockNumber); + expect(actual2.blockNumber).to.deep.equal(event.blockNumber); + }); - await startListener.completePromise; - sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); - }); + it('listener receives blocks in order', async () => { + listener = testUtils.newAsyncListener(3); + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); + const event3 = newFilteredBlockEventInfo(3); - it('replay listener sends start block to event service', async () => { - const stub = sinon.stub(eventServiceManager, 'startEventService'); + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event1); + eventService.sendEvent(event3); + eventService.sendEvent(event2); - listenerOptions.startBlock = 2; - await network.addBlockListener(listener, listenerOptions); + const actual = await listener.completePromise; + const blockNumbers = actual.map((e) => e.blockNumber); + expect(blockNumbers).to.deep.equal([event1.blockNumber, event2.blockNumber, event3.blockNumber]); + }); - sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); - }); + it('listener does not receive old blocks', async () => { + listener = testUtils.newAsyncListener(2); + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); + const event3 = newFilteredBlockEventInfo(3); - it('replay listener does not receive events earlier than start block', async () => { - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event2); + eventService.sendEvent(event1); // Ignored as older than first block received + eventService.sendEvent(event3); - listenerOptions.startBlock = 2; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event1); - eventService.sendEvent(event2); + const actual = await listener.completePromise; + const blockNumbers = actual.map((e) => e.blockNumber); + expect(blockNumbers).to.deep.equal([event2.blockNumber, event3.blockNumber]); + }); - const [actual] = await listener.completePromise; - expect(actual.blockNumber).to.equal(event2.blockNumber); - }); + it('errors trigger reconnect of event service with no start block if no events received', async () => { + await network.addBlockListener(listener, listenerOptions); + const startListener = testUtils.newAsyncListener(); + const stub = sinon.stub(eventServiceManager, 'startEventService').callsFake(() => startListener()); - it('replay listener does not miss start block if later block arrive first', async () => { - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); + eventService.sendError(new Error('DISCONNECT')); - listenerOptions.startBlock = 1; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event2); - eventService.sendEvent(event1); + await startListener.completePromise; + sinon.assert.calledWith(stub, eventService); + sinon.assert.neverCalledWith(stub, sinon.match.any, sinon.match.has('startBlock', sinon.match.number)); + }); - const actual = await listener.completePromise; - expect(actual[0].blockNumber).to.equal(event1.blockNumber); - }); + it('errors trigger reconnect of event service with next block as start block if events received', async () => { + await network.addBlockListener(listener, listenerOptions); + const startListener = testUtils.newAsyncListener(); + const stub = sinon.stub(eventServiceManager, 'startEventService').callsFake(() => startListener()); - it('remove of realtime filtered listener does not close shared event service', async () => { - const stub = sinon.stub(eventService, 'close'); + eventService.sendEvent(newFilteredBlockEventInfo(1)); + eventService.sendError(new Error('DISCONNECT')); - await network.addBlockListener(listener, listenerOptions); - network.removeBlockListener(listener); + await startListener.completePromise; + sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); + }); - sinon.assert.notCalled(stub); + it('listener changing event data does not affect other listeners', async () => { + const fake1 = sinon.fake(async (e) => { + await listener(e); + e.blockNumber = Long.ONE; + }); + const listener2 = testUtils.newAsyncListener(); + const fake2 = sinon.fake(async (e) => { + await listener2(e); + e.blockNumber = Long.fromNumber(2); + }); + const event = newFilteredBlockEventInfo(0); + + await network.addBlockListener(fake1, listenerOptions); + await network.addBlockListener(fake2, listenerOptions); + eventService.sendEvent(event); + + const [actual1] = await listener.completePromise; + const [actual2] = await listener2.completePromise; + expect(actual1.blockNumber).to.deep.equal(event.blockNumber); + expect(actual2.blockNumber).to.deep.equal(event.blockNumber); + }); }); - it('remove of replay listener closes isolated event service', async () => { - const stub = sinon.stub(eventService, 'close'); + describe('realtime', () => { + it('remove of realtime filtered listener does not close shared event service', async () => { + const stub = sinon.stub(eventService, 'close'); - listenerOptions.startBlock = 1; - await network.addBlockListener(listener, listenerOptions); - network.removeBlockListener(listener); + await network.addBlockListener(listener, listenerOptions); + network.removeBlockListener(listener); - sinon.assert.called(stub); - }); - - it('listener changing event data does not affect other listeners', async () => { - const fake1 = sinon.fake(async (e) => { - await listener(e); - e.blockNumber = Long.ONE; - }); - const listener2 = testUtils.newAsyncListener(); - const fake2 = sinon.fake(async (e) => { - await listener2(e); - e.blockNumber = Long.fromNumber(2); + sinon.assert.notCalled(stub); }); - const event = newFilteredBlockEventInfo(0); - await network.addBlockListener(fake1, listenerOptions); - await network.addBlockListener(fake2, listenerOptions); - eventService.sendEvent(event); + it('remove of realtime full listener does not close shared event service', async () => { + const stub = sinon.stub(eventService, 'close'); - const [actual1] = await listener.completePromise; - const [actual2] = await listener2.completePromise; - expect(actual1.blockNumber).to.deep.equal(event.blockNumber); - expect(actual2.blockNumber).to.deep.equal(event.blockNumber); - }); - - it('remove of realtime full listener does not close shared event service', async () => { - const stub = sinon.stub(eventService, 'close'); - - listenerOptions = { - type: 'full' - }; - await network.addBlockListener(listener, listenerOptions); - network.removeBlockListener(listener); + listenerOptions = { + type: 'full' + }; + await network.addBlockListener(listener, listenerOptions); + network.removeBlockListener(listener); - sinon.assert.notCalled(stub); + sinon.assert.notCalled(stub); + }); }); - it('listener defaults to full blocks', async () => { - const stub = sinon.stub(eventServiceManager, 'startEventService'); + describe('replay', () => { + it('replay listener sends start block to event service', async () => { + const stub = sinon.stub(eventServiceManager, 'startEventService'); - await network.addBlockListener(listener); + listenerOptions.startBlock = 2; + await network.addBlockListener(listener, listenerOptions); - sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('blockType', 'full')); - }); + sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); + }); - it('listener defaults to no start block', async () => { - const stub = sinon.stub(eventServiceManager, 'startEventService'); + it('replay listener does not receive events earlier than start block', async () => { + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); - await network.addBlockListener(listener); + listenerOptions.startBlock = 2; + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event1); + eventService.sendEvent(event2); - sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('startBlock', undefined)); - }); + const [actual] = await listener.completePromise; + expect(actual.blockNumber).to.equal(event2.blockNumber); + }); - it('listener can specify filtered blocks', async () => { - const stub = sinon.stub(eventServiceManager, 'startEventService'); + it('replay listener does not miss start block if later block arrive first', async () => { + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); - listenerOptions = { - type: 'filtered' - }; - await network.addBlockListener(listener, listenerOptions); + listenerOptions.startBlock = 1; + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event2); + eventService.sendEvent(event1); - sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('blockType', 'filtered')); - }); + const actual = await listener.completePromise; + expect(actual[0].blockNumber).to.equal(event1.blockNumber); + }); - it('listener can specify private blocks', async () => { - const stub = sinon.stub(eventServiceManager, 'startEventService'); + it('remove of replay listener closes isolated event service', async () => { + const stub = sinon.stub(eventService, 'close'); - listenerOptions = { - type: 'private' - }; - await network.addBlockListener(listener, listenerOptions); + listenerOptions.startBlock = 1; + await network.addBlockListener(listener, listenerOptions); + network.removeBlockListener(listener); - sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('blockType', 'private')); + sinon.assert.called(stub); + }); }); - it('listener receives full block events', async () => { - const event = newFullBlockEventInfo(1); - listenerOptions = { - type: 'full' - }; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event); - - const [actual] = await listener.completePromise; - expect(actual.blockNumber).to.equal(event.blockNumber); - }); + describe('default options', () => { + it('listener defaults to full blocks', async () => { + const stub = sinon.stub(eventServiceManager, 'startEventService'); - it('listener receives private block events', async () => { - const event = newPrivateBlockEventInfo(1); - listenerOptions = { - type: 'private' - }; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event); + await network.addBlockListener(listener); - const [actual] = await listener.completePromise; - expect(actual.blockNumber).to.equal(event.blockNumber); - }); + sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('blockType', 'full')); + }); - it('new checkpoint listener receives events', async () => { - const event = newFilteredBlockEventInfo(1); - listenerOptions.checkpointer = new InMemoryCheckpointer(); + it('listener defaults to no start block', async () => { + const stub = sinon.stub(eventServiceManager, 'startEventService'); - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event); + await network.addBlockListener(listener); - const [actual] = await listener.completePromise; - expect(actual.blockNumber).to.equal(event.blockNumber); + sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('startBlock', undefined)); + }); }); - it('checkpoint listener sends block number to event service', async () => { - const stub = sinon.stub(eventServiceManager, 'startEventService'); - const checkpointer = new InMemoryCheckpointer(); - await checkpointer.setBlockNumber(Long.fromNumber(2)); + describe('event types', () => { + it('listener can specify filtered blocks', async () => { + const stub = sinon.stub(eventServiceManager, 'startEventService'); - listenerOptions.checkpointer = checkpointer; - await network.addBlockListener(listener, listenerOptions); + listenerOptions = { + type: 'filtered' + }; + await network.addBlockListener(listener, listenerOptions); - sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); - }); - - it('checkpoint block number takes precedence over startBlock option', async () => { - const stub = sinon.stub(eventServiceManager, 'startEventService'); - const checkpointer = new InMemoryCheckpointer(); - await checkpointer.setBlockNumber(Long.fromNumber(2)); + sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('blockType', 'filtered')); + }); - listenerOptions.checkpointer = checkpointer; - listenerOptions.startBlock = 1; - await network.addBlockListener(listener, listenerOptions); + it('listener can specify private blocks', async () => { + const stub = sinon.stub(eventServiceManager, 'startEventService'); - sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); - }); + listenerOptions = { + type: 'private' + }; + await network.addBlockListener(listener, listenerOptions); - it('checkpoint listener receives events from checkpoint block number', async () => { - listener = testUtils.newAsyncListener(2); - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); - const event3 = newFilteredBlockEventInfo(3); - const checkpointer = new InMemoryCheckpointer(); - await checkpointer.setBlockNumber(Long.fromNumber(2)); - - listenerOptions.checkpointer = checkpointer; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event1); - eventService.sendEvent(event3); - eventService.sendEvent(event2); - - const [actual1, actual2] = await listener.completePromise; - expect(actual1.blockNumber).to.equal(event2.blockNumber); - expect(actual2.blockNumber).to.equal(event3.blockNumber); - }); + sinon.assert.calledOnceWithExactly(stub, sinon.match.any, sinon.match.has('blockType', 'private')); + }); - it('new checkpoint listener receives events from startBlock', async () => { - listener = testUtils.newAsyncListener(2); - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); - const event3 = newFilteredBlockEventInfo(3); - - listenerOptions.checkpointer = new InMemoryCheckpointer(); - listenerOptions.startBlock = 2; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event1); - eventService.sendEvent(event3); - eventService.sendEvent(event2); - - const [actual1, actual2] = await listener.completePromise; - expect(actual1.blockNumber).to.equal(event2.blockNumber); - expect(actual2.blockNumber).to.equal(event3.blockNumber); - }); + it('listener receives full block events', async () => { + const event = newFullBlockEventInfo(1); + listenerOptions = { + type: 'full' + }; + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event); - it('new checkpoint block number is set after processing event', async () => { - const event = newFilteredBlockEventInfo(1); - const checkpointer = new InMemoryCheckpointer(); + const [actual] = await listener.completePromise; + expect(actual.blockNumber).to.equal(event.blockNumber); + }); - listenerOptions.checkpointer = checkpointer; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event); + it('listener receives private block events', async () => { + const event = newPrivateBlockEventInfo(1); + listenerOptions = { + type: 'private' + }; + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event); - await listener.completePromise; - expect(checkpointer.getBlockNumber().toNumber()).to.equal(2); + const [actual] = await listener.completePromise; + expect(actual.blockNumber).to.equal(event.blockNumber); + }); }); - it('checkpoint block number is incremented after processing event', async () => { - const event = newFilteredBlockEventInfo(1); - const checkpointer = new InMemoryCheckpointer(); - await checkpointer.setBlockNumber(Long.ONE); + describe('checkpoint', () => { + it('new checkpoint listener receives events', async () => { + const event = newFilteredBlockEventInfo(1); + listenerOptions.checkpointer = new StubCheckpointer(); - listenerOptions.checkpointer = checkpointer; - await network.addBlockListener(listener, listenerOptions); - eventService.sendEvent(event); + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event); - await listener.completePromise; - expect(checkpointer.getBlockNumber().toNumber()).to.equal(2); - }); + const [actual] = await listener.completePromise; + expect(actual.blockNumber).to.equal(event.blockNumber); + }); - it('checkpoint block number is not incremented on listener failure', async () => { - const failListener: BlockListener = async (blockEvent) => { - await listener(blockEvent); - throw new Error('LISTENER_FAIL'); - }; - const event = newFilteredBlockEventInfo(1); - const checkpointer = new InMemoryCheckpointer(); - await checkpointer.setBlockNumber(Long.ONE); + it('checkpoint listener sends block number to event service', async () => { + const stub = sinon.stub(eventServiceManager, 'startEventService'); + const checkpointer = new StubCheckpointer(); + await checkpointer.setBlockNumber(Long.fromNumber(2)); - listenerOptions.checkpointer = checkpointer; - await network.addBlockListener(failListener, listenerOptions); - eventService.sendEvent(event); + listenerOptions.checkpointer = checkpointer; + await network.addBlockListener(listener, listenerOptions); - await listener.completePromise; - expect(checkpointer.getBlockNumber().toNumber()).to.equal(1); - }); + sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); + }); - it('checkpoint listener ignores events with unexpected block numbers', async () => { - const event1 = newFilteredBlockEventInfo(1); - const event2 = newFilteredBlockEventInfo(2); - const checkpointer = new InMemoryCheckpointer(); - await checkpointer.setBlockNumber(Long.ONE); + it('checkpoint block number takes precedence over startBlock option', async () => { + const stub = sinon.stub(eventServiceManager, 'startEventService'); + const checkpointer = new StubCheckpointer(); + await checkpointer.setBlockNumber(Long.fromNumber(2)); - listenerOptions.checkpointer = checkpointer; - await network.addBlockListener(listener, listenerOptions); + listenerOptions.checkpointer = checkpointer; + listenerOptions.startBlock = 1; + await network.addBlockListener(listener, listenerOptions); - // Manipulating checkpointer after registering listener is extremely bad practice but simulates the state after - // listener failure when checkpoint block number does not match delivered block number - await checkpointer.setBlockNumber(Long.fromNumber(2)); + sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); + }); - eventService.sendEvent(event1); - eventService.sendEvent(event2); + it('checkpoint listener receives events from checkpoint block number', async () => { + listener = testUtils.newAsyncListener(2); + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); + const event3 = newFilteredBlockEventInfo(3); + const checkpointer = new StubCheckpointer(); + await checkpointer.setBlockNumber(Long.fromNumber(2)); + + listenerOptions.checkpointer = checkpointer; + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event1); + eventService.sendEvent(event3); + eventService.sendEvent(event2); + + const [actual1, actual2] = await listener.completePromise; + expect(actual1.blockNumber).to.equal(event2.blockNumber); + expect(actual2.blockNumber).to.equal(event3.blockNumber); + }); - const [actual] = await listener.completePromise; - expect(actual.blockNumber).to.equal(event2.blockNumber); + it('new checkpoint listener receives events from startBlock', async () => { + listener = testUtils.newAsyncListener(2); + const event1 = newFilteredBlockEventInfo(1); + const event2 = newFilteredBlockEventInfo(2); + const event3 = newFilteredBlockEventInfo(3); + + listenerOptions.checkpointer = new StubCheckpointer(); + listenerOptions.startBlock = 2; + await network.addBlockListener(listener, listenerOptions); + eventService.sendEvent(event1); + eventService.sendEvent(event3); + eventService.sendEvent(event2); + + const [actual1, actual2] = await listener.completePromise; + expect(actual1.blockNumber).to.equal(event2.blockNumber); + expect(actual2.blockNumber).to.equal(event3.blockNumber); + }); }); }); diff --git a/fabric-network/test/impl/event/contractlistener.ts b/fabric-network/test/impl/event/contractlistener.ts index 0ad1e9d9ce..36ee777236 100644 --- a/fabric-network/test/impl/event/contractlistener.ts +++ b/fabric-network/test/impl/event/contractlistener.ts @@ -18,6 +18,7 @@ import { StubEventService } from './stubeventservice'; import Contract = require('../../../src/contract'); import ContractImpl = require('../../../src/contract'); import Gateway = require('../../../src/gateway'); +import { StubCheckpointer } from './stubcheckpointer'; interface StubContractListener extends ContractListener { completePromise: Promise; @@ -96,16 +97,17 @@ describe('contract event listener', () => { return block; } - function addTransaction(event: EventInfo, transaction: any, statusCode: number = protos.protos.TxValidationCode.VALID): void { - event.block.data.data.push(newEnvelope(transaction)); + function addTransaction(event: EventInfo, transaction: any, statusCode: number = protos.protos.TxValidationCode.VALID, transactionId?: string): void { + event.block.data.data.push(newEnvelope(transaction, transactionId)); const transactionStatusCodes = event.block.metadata.metadata[protos.common.BlockMetadataIndex.TRANSACTIONS_FILTER]; transactionStatusCodes.push(statusCode); } - function newEnvelope(transaction: any): any { + function newEnvelope(transaction: any, transactionId?: string): any { const channelHeader = new protos.common.ChannelHeader(); channelHeader.type = protos.common.HeaderType.ENDORSER_TRANSACTION; + channelHeader.tx_id = transactionId; const payload = new protos.common.Payload(); payload.header = new protos.common.Header(); @@ -431,7 +433,7 @@ describe('contract event listener', () => { expect(contractEvents[0].payload?.toString()).to.equal(eventPayload); }); - it('can navigate event heirarchy for filtered events', async () => { + it('can navigate event hierarchy for filtered events', async () => { const event = newFilteredEvent(1); addFilteredTransaction(event, newFilteredTransaction()); @@ -445,7 +447,7 @@ describe('contract event listener', () => { assertCanNavigateEvents(contractEvent); }); - it('can navigate event heirarchy for full events', async () => { + it('can navigate event hierarchy for full events', async () => { const event = newEvent(1); addTransaction(event, newTransaction()); @@ -459,7 +461,7 @@ describe('contract event listener', () => { assertCanNavigateEvents(contractEvent); }); - it('can navigate event heirarchy for private events', async () => { + it('can navigate event hierarchy for private events', async () => { const event = newPrivateEvent(1); addTransaction(event, newTransaction()); @@ -473,4 +475,87 @@ describe('contract event listener', () => { assertCanNavigateEvents(contractEvent); expect(contractEvent.getTransactionEvent().privateData).to.equal(event.privateData[0]); }); + + describe('checkpoint', () => { + it('new checkpoint listener receives events', async () => { + const checkpointer = new StubCheckpointer(); + const event = newEvent(1); + addTransaction(event, newTransaction()); + + const options: ListenerOptions = { + checkpointer + }; + await contract.addContractListener(spyListener, options); + eventService.sendEvent(event); + await listener.completePromise; + + sinon.assert.calledOnceWithExactly(spyListener, sinon.match({ chaincodeId, eventName })); + }); + + it('checkpoint listener receives events from checkpoint block number', async () => { + const checkpointer = new StubCheckpointer(); + await checkpointer.setBlockNumber(Long.fromNumber(2)); + + const transaction = newTransaction(); + const event1 = newEvent(1); + addTransaction(event1, transaction); + const event2 = newEvent(2); + addTransaction(event2, transaction); + + const options: ListenerOptions = { + checkpointer + }; + await contract.addContractListener(listener, options); + eventService.sendEvent(event1); + eventService.sendEvent(event2); + const [args] = await listener.completePromise; + + const blockNumber = args.getTransactionEvent().getBlockEvent().blockNumber.toNumber(); + expect(blockNumber).to.equal(2); + }); + + it('checkpointer records block numbers', async () => { + listener = testUtils.newAsyncListener(2); + const checkpointer = new StubCheckpointer(); + + const transaction = newTransaction(); + const event1 = newEvent(1); + addTransaction(event1, transaction); + const event2 = newEvent(2); + addTransaction(event2, transaction); + + const options: ListenerOptions = { + checkpointer + }; + await contract.addContractListener(listener, options); + eventService.sendEvent(event1); + eventService.sendEvent(event2); + await listener.completePromise; + + const blockNumber = await checkpointer.getBlockNumber(); + expect(blockNumber.toNumber()).to.be.oneOf([1, 2]); + }); + + it('checkpointer records transaction IDs', async () => { + listener = testUtils.newAsyncListener(2); + const checkpointer = new StubCheckpointer(); + const spy = sinon.spy(checkpointer, 'addTransactionId'); + + const transaction = newTransaction(); + const event1 = newEvent(1); + addTransaction(event1, transaction, undefined, 'TX1'); + const event2 = newEvent(2); + addTransaction(event2, transaction, undefined, 'TX2'); + + const options: ListenerOptions = { + checkpointer + }; + await contract.addContractListener(listener, options); + eventService.sendEvent(event1); + eventService.sendEvent(event2); + await listener.completePromise; + + sinon.assert.calledWith(spy, 'TX1'); + }); + }); }); diff --git a/fabric-network/test/impl/event/listeners.ts b/fabric-network/test/impl/event/listeners.ts new file mode 100644 index 0000000000..da748e4965 --- /dev/null +++ b/fabric-network/test/impl/event/listeners.ts @@ -0,0 +1,166 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as Listeners from '../../../src/impl/event/listeners'; +import { StubCheckpointer } from './stubcheckpointer'; +import { BlockEvent, BlockListener, TransactionEvent, ContractEvent } from '../../../src/events'; +import Long = require('long'); +import chai = require('chai'); +import sinon = require('sinon'); + +const expect = chai.expect; + +describe('listeners', () => { + const contractEvent: ContractEvent = Object.freeze({ + chaincodeId: 'CHAINCODE_ID', + eventName: 'EVENT_NAME', + getTransactionEvent: () => undefined + }); + + const transactionId = 'TRANSACTION_ID'; + const transactionEvent: TransactionEvent = Object.freeze({ + transactionId, + status: 'VALID', + isValid: true, + transactionData: undefined, + getBlockEvent: () => undefined, + getContractEvents: () => [contractEvent] + }); + + const currentBlockNumber = Long.ONE; + const nextBlockNumber = currentBlockNumber.add(1); + const blockEvent: BlockEvent = Object.freeze({ + blockNumber: currentBlockNumber, + blockData: undefined, + getTransactionEvents: () => [transactionEvent] + }); + + const noOpListener = () => undefined; + + let checkpointer; + + beforeEach(() => { + checkpointer = new StubCheckpointer(); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('#checkpointBlockListener', () => { + it('new checkpoint block number set after processing event', async () => { + const checkpointListener = Listeners.checkpointBlockListener(noOpListener, checkpointer); + + await checkpointListener(blockEvent); + + const blockNumber = await checkpointer.getBlockNumber(); + expect(blockNumber.toNumber()).to.equal(nextBlockNumber.toNumber()); + }); + + it('checkpoint block number incremented after processing event', async () => { + await checkpointer.setBlockNumber(currentBlockNumber); + const checkpointListener = Listeners.checkpointBlockListener(noOpListener, checkpointer); + + await checkpointListener(blockEvent); + + const blockNumber = await checkpointer.getBlockNumber(); + expect(blockNumber.toNumber()).to.equal(nextBlockNumber.toNumber()); + }); + + it('checkpoint block number not incremented on listener failure', async () => { + const listener: BlockListener = async () => { + throw new Error('LISTENER_FAIL'); + }; + await checkpointer.setBlockNumber(currentBlockNumber); + const checkpointListener = Listeners.checkpointBlockListener(listener, checkpointer); + + try { + await checkpointListener(blockEvent); + } catch (err) { + // Ignore errors + } + + const blockNumber = await checkpointer.getBlockNumber(); + expect(blockNumber.toNumber()).to.equal(currentBlockNumber.toNumber()); + }); + + it('ignores events with unexpected block numbers', async () => { + const listener = sinon.fake(); + await checkpointer.setBlockNumber(Long.ZERO); + const checkpointListener = Listeners.checkpointBlockListener(listener, checkpointer); + + await checkpointListener(blockEvent); + + sinon.assert.notCalled(listener); + const blockNumber = await checkpointer.getBlockNumber(); + expect(blockNumber.toNumber()).to.equal(0); + }); + }); + + describe('#blockFromContractListener', () => { + it('received transaction IDs registered with checkpointer', async () => { + const spy = sinon.spy(checkpointer, 'addTransactionId'); + const checkpointListener = Listeners.blockFromContractListener(noOpListener, checkpointer); + + await checkpointListener(blockEvent); + + sinon.assert.calledOnceWithExactly(spy, transactionId); + }); + + it('checkpoint block number incremented after processing event', async () => { + await checkpointer.setBlockNumber(currentBlockNumber); + const checkpointListener = Listeners.blockFromContractListener(noOpListener, checkpointer); + + await checkpointListener(blockEvent); + + const blockNumber = await checkpointer.getBlockNumber(); + expect(blockNumber.toNumber()).to.equal(nextBlockNumber.toNumber()); + }); + + it('ignores previously seen transaction IDs', async () => { + const listener = sinon.fake(); + const checkpointListener = Listeners.blockFromContractListener(listener, checkpointer); + await checkpointer.addTransactionId(transactionId); + + await checkpointListener(blockEvent); + + sinon.assert.notCalled(listener); + }); + + it('transaction ID not registered with checkpointer on listener failure', async () => { + const spy = sinon.spy(checkpointer, 'addTransactionId'); + const listener = () => { + throw new Error('LISTENER_FAIL'); + }; + const checkpointListener = Listeners.blockFromContractListener(listener, checkpointer); + + try { + await checkpointListener(blockEvent); + } catch (err) { + // Ignore error + } + + sinon.assert.notCalled(spy); + }); + + it('checkpoint block number not incremented on listener failure', async () => { + const listener = () => { + throw new Error('LISTENER_FAIL'); + }; + await checkpointer.setBlockNumber(currentBlockNumber); + const checkpointListener = Listeners.blockFromContractListener(listener, checkpointer); + + try { + await checkpointListener(blockEvent); + } catch (err) { + // Ignore error + } + + const blockNumber = await checkpointer.getBlockNumber(); + expect(blockNumber.toNumber()).to.equal(currentBlockNumber.toNumber()); + }); + }); +}); diff --git a/fabric-network/test/impl/event/inmemorycheckpointer.ts b/fabric-network/test/impl/event/stubcheckpointer.ts similarity index 78% rename from fabric-network/test/impl/event/inmemorycheckpointer.ts rename to fabric-network/test/impl/event/stubcheckpointer.ts index 7e6c61044b..c93e09f7c7 100644 --- a/fabric-network/test/impl/event/inmemorycheckpointer.ts +++ b/fabric-network/test/impl/event/stubcheckpointer.ts @@ -7,7 +7,7 @@ import { Checkpointer } from '../../../src/checkpointer'; import Long = require('long'); -export class InMemoryCheckpointer implements Checkpointer { +export class StubCheckpointer implements Checkpointer { private blockNumber: Long; private readonly transactionIds: Set = new Set(); @@ -15,11 +15,11 @@ export class InMemoryCheckpointer implements Checkpointer { this.transactionIds.add(transactionId); } - getBlockNumber(): Long { + async getBlockNumber(): Promise { return this.blockNumber; } - getTransactionIds(): Set { + async getTransactionIds(): Promise> { return this.transactionIds; }