diff --git a/docs/tutorials/channel-events.md b/docs/tutorials/channel-events.md index f9ddb138ea..73a7bfaa7d 100644 --- a/docs/tutorials/channel-events.md +++ b/docs/tutorials/channel-events.md @@ -276,7 +276,7 @@ therefore the registration must be made before the channel event hub has setup the connection. Replaying events may confuse other event listeners; therefore, only one listener will be allowed on a `ChannelEventHub` when `startBlock` -and/or `endBlock` are used. +and/or `endBlock` are used on a listener registration. - `Number` - A number value may be specified as the block number. @@ -335,20 +335,29 @@ Using this option on an event listener does require that this channel event hub has been previously running. * {boolean} `unregister` -- (Optional) This setting indicates that the -registration should be removed (unregister) when the event is seen. When the +registration should be removed (unregister) after the event is seen. When the application is using a timeout to only wait a specified amount of time for the transaction to be seen, the timeout processing should include the manual 'unregister' of the transaction event listener to avoid the event callbacks being called unexpectedly. The default for this setting is different for the different types of event listeners. For block listeners the default is true when -an end_block was set as a option. For transaction listeners the default is true. -For chaincode listeners the default will be false as the match filter might be -intended for many transactions. +an end_block was set as a option, the listener will be active and receiving +blocks until the end block is received and then the listener will be automatically +unregistered. For transaction listeners the default is true and once the transaction +event has occurred the listener will be automatically unregistered. If the +transaction listener has used an endBlock, the default will be +to automatically unregister the listener even if the transaction has not been +seen. +For chaincode event listeners the default will be false as the match filter +might be intended for many transactions, however if the chaincode event +listener has set an endBlock it will be automatically unregistered after +the endBlock is seen. * {boolean} `disconnect` -- (Optional) This setting indicates to the `ChannelEventHub` instance to automatically disconnect itself from the peer's -channel-based event service once the event has been seen. The default is false -unless the endBlock has been set, then the default will be true. +channel-based event service once the event has been seen. The default is false. +When not set and the endBlock has been set the ChannelEventHub instance +will automatically disconnect itself. ### Get a Channel-based Event Hub Use the fabric-client {@link Channel} diff --git a/fabric-client/lib/ChannelEventHub.js b/fabric-client/lib/ChannelEventHub.js index 628f448d0d..133f2f7901 100644 --- a/fabric-client/lib/ChannelEventHub.js +++ b/fabric-client/lib/ChannelEventHub.js @@ -225,7 +225,7 @@ class ChannelEventHub { * only one event listener is allowed. */ _checkAllowRegistrations() { - if (this._start_stop_registration) { + if (this._start_stop_action) { throw new Error('This ChannelEventHub is not open to event listener registrations'); } } @@ -938,7 +938,8 @@ class ChannelEventHub { * @returns enum of how the endBlock and startBlock have been set */ _checkReplay(options, fromConnect) { - logger.debug('_checkReplay - start'); + const method = '_checkReplay'; + logger.debug('%s - start', method); let result = NO_START_STOP; let _start_block = null; @@ -951,7 +952,7 @@ class ChannelEventHub { if (_start_block || _end_block) { if (fromConnect) { - if (this._start_stop_registration) { + if (this._start_stop_action) { logger.error('This ChannelEventHub has a registered listener that has options of startBlock or endBlock'); throw new Error('Not able to connect with startBlock or endBlock when a registered listener has those options.'); } @@ -974,23 +975,23 @@ class ChannelEventHub { if (_end_block) { this._ending_block_number = _end_block; if (fromConnect) { - logger.debug('_checkReplay - connect will end receiving blocks from %s', this._ending_block_number); + logger.debug('%s - connect will end receiving blocks from %s', method, this._ending_block_number); this._start_stop_connect = true; } result = END_ONLY; - logger.debug('_checkReplay - Event listening will end at block %s', this._ending_block_number); + logger.debug('%s - Event listening will end at block %s', method, this._ending_block_number); } if (_start_block) { this._starting_block_number = _start_block; if (fromConnect) { - logger.debug('_checkReplay - connect will start receiving blocks from %s', this._starting_block_number); + logger.debug('%s - connect will start receiving blocks from %s', method, this._starting_block_number); this._start_stop_connect = true; } result++; // will move result to START_ONLY or START_AND_END - logger.debug('_checkReplay - Event listening will start at block %s', this._starting_block_number); + logger.debug('%s - Event listening will start at block %s', method, this._starting_block_number); } - logger.debug('_checkReplay - end'); + logger.debug('%s - end', method); return result; } @@ -1127,11 +1128,13 @@ class ChannelEventHub { * seen by this listener. For chaincode listeners the default will be * false as the match filter might be intended for many transactions * rather than a specific transaction or block as in the other listeners. + * If not set and the endBlock has been set, the listener will be + * automatically unregistered. * @property {boolean} disconnect - Optional - This option setting Indicates * to the ChannelEventHub instance to automatically disconnect itself * from the peer's fabric service once the event has been seen. - * The default is false unless the endBlock has been set, then it - * it will be true. + * The default is false. If not set and the endBlock has been set, the + * the ChannelEventHub instance will automatically disconnect itself. */ /** @@ -1177,26 +1180,25 @@ class ChannelEventHub { } this._checkAllowRegistrations(); - let default_disconnect = false; const startstop_mode = this._checkReplay(options); - if (startstop_mode > START_ONLY) { - default_disconnect = true; - } - const event_reg = new EventRegistration(onEvent, onError, options, false, default_disconnect); + + const event_reg = new EventRegistration(onEvent, onError, options, false, false); const chaincode_reg = new ChaincodeRegistration(ccid, eventname, event_reg); + const unregister_action = () => { + this.unregisterChaincodeEvent(chaincode_reg); + }; + this._on_end_actions(chaincode_reg, unregister_action, startstop_mode, options); + let cbtable = this._chaincodeRegistrants[ccid]; if (!cbtable) { cbtable = new Set(); this._chaincodeRegistrants[ccid] = cbtable; } cbtable.add(chaincode_reg); - if (startstop_mode > NO_START_STOP) { - this._start_stop_registration = chaincode_reg.event_reg; - chaincode_reg.event_reg.unregister_action = () => { - this.unregisterChaincodeEvent(chaincode_reg); - }; - } + + + this._checkConnection(); return chaincode_reg; @@ -1258,21 +1260,15 @@ class ChannelEventHub { } this._checkAllowRegistrations(); - let default_disconnect = false; const startstop_mode = this._checkReplay(options); - if (startstop_mode > START_ONLY) { - default_disconnect = true; - } - const block_registration_number = ++this._block_registrant_count; - const block_registration = new EventRegistration(onEvent, onError, options, true, default_disconnect); + const block_registration = new EventRegistration(onEvent, onError, options, false, false); this._blockRegistrations[block_registration_number] = block_registration; - if (startstop_mode > NO_START_STOP) { - this._start_stop_registration = block_registration; - block_registration.unregister_action = () => { - this.unregisterBlockEvent(block_registration_number); - }; - } + const unregister_action = () => { + this.unregisterBlockEvent(block_registration_number); + }; + this._on_end_actions(block_registration, unregister_action, startstop_mode, options); + this._checkConnection(); return block_registration_number; @@ -1339,6 +1335,7 @@ class ChannelEventHub { } this._checkAllowRegistrations(); + const startstop_mode = this._checkReplay(options); let default_unregister = true; let _txid = txid; @@ -1350,20 +1347,14 @@ class ChannelEventHub { if (temp) { throw new Error(`TransactionId (${txid}) has already been registered`); } - let default_disconnect = false; - const startstop_mode = this._checkReplay(options); - if (startstop_mode > START_ONLY) { - default_disconnect = true; - } - const trans_registration = new EventRegistration(onEvent, onError, options, default_unregister, default_disconnect); + const trans_registration = new EventRegistration(onEvent, onError, options, default_unregister, false); this._transactionRegistrations[_txid] = trans_registration; - if (startstop_mode > NO_START_STOP) { - this._start_stop_registration = trans_registration; - trans_registration.unregister_action = () => { - this.unregisterTxEvent(_txid); - }; - } + const unregister_action = () => { + this.unregisterTxEvent(_txid); + }; + this._on_end_actions(trans_registration, unregister_action, startstop_mode, options); + this._checkConnection(); return _txid; @@ -1404,12 +1395,22 @@ class ChannelEventHub { const block_reg = this._blockRegistrations[key]; logger.debug('_processBlockEvents - calling block listener callback'); block_reg.onEvent(block); + + // check to see if we should automatically unregister or/and disconnect this hub + if (block_reg.unregister) { + this.unregisterBlockEvent(key); + logger.debug('_processBlockEvents - automatically unregister block listener for %s', key); + } + if (block_reg.disconnect) { + logger.debug('_processBlockEvents - automatically disconnect'); + this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on block registration')); + } }); } /* * private internal method for processing tx events - * @param {Object} block protobuf object which might contain the tx from the fabric + * @param {Object} block protobuf object which might contain transactions */ _processTxEvents(block) { if (Object.keys(this._transactionRegistrations).length === 0) { @@ -1459,7 +1460,10 @@ class ChannelEventHub { _callTransactionListener(tx_id, val_code, block_num, trans_reg) { logger.debug('_callTransactionListener - about to call the transaction call back for code=%s tx=%s', val_code, tx_id); const status = convertValidationCode(val_code); + trans_reg.onEvent(tx_id, status, block_num); + + // check to see if we should automatically unregister or/and disconnect this hub if (trans_reg.unregister) { this.unregisterTxEvent(tx_id); logger.debug('_callTransactionListener - automatically unregister tx listener for %s', tx_id); @@ -1555,12 +1559,14 @@ class ChannelEventHub { delete chaincode_event.payload; } chaincode_reg.event_reg.onEvent(chaincode_event, block_num, tx_id, tx_status); + + // see if we should automatically unregister this event listener or disconnect this hub if (chaincode_reg.event_reg.unregister) { - cbtable.delete(chaincode_reg); - logger.debug('_callChaincodeListener - automatically unregister tx listener for %s', tx_id); + this.unregisterChaincodeEvent(chaincode_reg); + logger.debug('_callChaincodeListener - automatically unregister chaincode event listener for tx_id:%s', tx_id); } if (chaincode_reg.event_reg.disconnect) { - this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on transaction id registration')); + this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on chaincode event registration')); } } else { logger.debug('_callChaincodeListener - NOT calling chaincode listener callback'); @@ -1576,11 +1582,11 @@ class ChannelEventHub { if (this._ending_block_number) { if (this._ending_block_number.lessThanOrEqual(this._last_block_seen)) { this._ending_block_seen = true; - if (this._start_stop_registration) { - if (this._start_stop_registration.unregister) { - this._start_stop_registration.unregister_action(); + if (this._start_stop_action) { + if (this._start_stop_action.unregister) { + this._start_stop_action.unregister(); } - if (this._start_stop_registration.disconnect) { + if (this._start_stop_action.disconnect) { this._disconnect(new EventHubDisconnectError('Shutdown due to end block number has been seen')); } } @@ -1593,14 +1599,52 @@ class ChannelEventHub { */ _setReplayDefaults() { // these will hold the block numbers to be used when this - // event hub connects to the remote peer's channel event sevice + // event hub connects to the remote peer's channel event service this._starting_block_number = null; this._ending_block_number = null; this._ending_block_seen = false; this._ending_block_newest = false; - this._start_stop_registration = null; + this._start_stop_action = null; this._start_stop_connect = false; } + + + /* + * utility method to calculate if this listener should be removed + * and if the event hub should be disconnected + * if the end block has been seen + */ + _on_end_actions(event_reg, unregister_action, startstop_mode, options) { + if (startstop_mode > NO_START_STOP) { + this._start_stop_action = {}; + this._start_stop_action.event_reg = event_reg; // might be useful + } else { + logger.debug('_on_end_actions - no end block action required'); + return; // all done checking + } + + let _end_register = true; // default if end block is seen + if (options && typeof options.unregister === 'boolean') { + _end_register = options.unregister; + } + if (_end_register && startstop_mode > START_ONLY) { + logger.debug('listener will be unregistered when end block is seen'); + this._start_stop_action.unregister = unregister_action; + } else { + logger.debug('listener will not be unregistered when end block is seen'); + } + + let _end_disconnect = true; // default if end block is seen + if (options && typeof options.disconnect === 'boolean') { + _end_disconnect = options.disconnect; + } + if (_end_disconnect && startstop_mode > START_ONLY) { + logger.debug('event hub will be disconnected when end block is seen'); + this._start_stop_action.disconnect = true; + } else { + logger.debug('event hub will not be disconnected when end block is seen'); + } + } } module.exports = ChannelEventHub; @@ -1675,21 +1719,21 @@ class EventRegistration { this._onErrorFn = onError; this.unregister = default_unregister; this.disconnect = default_disconnect; - this.unregister_action = () => { - }; // do nothing by default if (options) { if (typeof options.unregister === 'undefined' || options.unregister === null) { - logger.debug('const-EventRegistration - unregister was not defined'); + logger.debug('const-EventRegistration - unregister was not defined, using default of %s', default_unregister); } else if (typeof options.unregister === 'boolean') { this.unregister = options.unregister; + logger.debug('const-EventRegistration - unregister was defined, %s', this.unregister); } else { throw new Error('Event registration has invalid value for "unregister" option'); } if (typeof options.disconnect === 'undefined' || options.disconnect === null) { - logger.debug('const-EventRegistration - disconnect was not defined'); + logger.debug('const-EventRegistration - disconnect was not defined, using default of %s', default_disconnect); } else if (typeof options.disconnect === 'boolean') { this.disconnect = options.disconnect; + logger.debug('const-EventRegistration - disconnect was defined, %s', this.disconnect); } else { throw new Error('Event registration has invalid value for "disconnect" option'); } @@ -1708,7 +1752,7 @@ class EventRegistration { try { this._onErrorFn(...args); } catch (error) { - logger.warn('Error notifacation callback failed', error); + logger.warn('Error notification callback failed', error); } } } diff --git a/fabric-client/test/ChannelEventHub.js b/fabric-client/test/ChannelEventHub.js index 1402e34c32..2a2b91cebe 100644 --- a/fabric-client/test/ChannelEventHub.js +++ b/fabric-client/test/ChannelEventHub.js @@ -113,6 +113,23 @@ describe('ChannelEventHub', () => { }); }); + describe('#isFiltered', () => { + let hub; + beforeEach(() => { + hub = new ChannelEventHub('channel', 'peer'); + }); + + it('should return true', () => { + hub._filtered_stream = true; + should.equal(hub.isFiltered(), true, 'isFiltered should return true'); + }); + + it('should return false', () => { + hub._filtered_stream = false; + should.equal(hub.isFiltered(), false, 'isFiltered should return false'); + }); + }); + describe('#_assignPeer', () => { let hub, peer, channel; @@ -272,7 +289,7 @@ describe('ChannelEventHub', () => { describe('#_checkAllowRegistrations', () => { it('should throw an error if registration is not allowed', () => { const hub = new ChannelEventHub('channel', 'peer'); - hub._start_stop_registration = true; + hub._start_stop_action = true; (() => { hub._checkAllowRegistrations(); }).should.throw(/This ChannelEventHub is not open to event listener registrations/); @@ -411,6 +428,21 @@ describe('ChannelEventHub', () => { sinon.assert.calledWith(FakeLogger.debug, '%s - options do not include startBlock'); sinon.assert.calledWith(FakeLogger.debug, '%s - options do not include endBlock'); }); + + it('should set the callback', () => { + getPeerAddrStub.returns('peer'); + hub._clientContext._userContext = {}; + hub.connect({}, 'callback'); + sinon.assert.calledWith(_connectStub, {}); + should.equal(hub.connectCallback, 'callback', 'Should have set the callback on connect'); + }); + + it('should call _connect with the force option', () => { + getPeerAddrStub.returns('peer'); + hub._clientContext._userContext = {}; + hub.connect({force: true}); + sinon.assert.calledWith(_connectStub, {force: true}); + }); }); describe('#_connect', () => { @@ -548,7 +580,7 @@ describe('ChannelEventHub', () => { }); - it('should call stream on data log about an success response and not success status', () => { + it('should call stream on data log about a success response and not success status', () => { hub._filtered_stream = false; checkAndAddConfigSettingStub.onCall(0).returns({}); checkAndAddConfigSettingStub.onCall(1).returns({}); @@ -564,7 +596,7 @@ describe('ChannelEventHub', () => { }); - it('should call stream on data log about an success response and success status if _ending_block_seen and _last_block_seen set', () => { + it('should call stream on data log about a success response and success status if _ending_block_seen and _last_block_seen set', () => { hub._ending_block_seen = 1; hub._ending_block_newest = 1; hub._last_block_seen = 1; @@ -581,7 +613,7 @@ describe('ChannelEventHub', () => { }); - it('should call stream on data log about an success response and success status if _ending_block_seen and _last_block_seen not set', () => { + it('should call stream on data log about a success response and success status if _ending_block_seen and _last_block_seen not set', () => { checkAndAddConfigSettingStub.onCall(0).returns({}); checkAndAddConfigSettingStub.onCall(1).returns({}); checkAndAddConfigSettingStub.onCall(2).returns({}); @@ -606,6 +638,22 @@ describe('ChannelEventHub', () => { sinon.assert.calledWith(FakeLogger.debug, 'on.data - incoming block was from a cancelled stream'); }); + it('should log debug and call connect call back', () => { + checkAndAddConfigSettingStub.onCall(0).returns({}); + checkAndAddConfigSettingStub.onCall(1).returns({}); + checkAndAddConfigSettingStub.onCall(2).returns({}); + getConfigSettingStub.withArgs('request-timeout', 3000).returns(1000); + isStreamReadyStub.returns(true); + onStub.yields({Type: 'block'}); + decodeBlockStub.returns('block'); + const myConnectCallback = sandbox.stub(); + hub.connectCallback = myConnectCallback; + hub._start_stop_connect = true; + hub._connect({signedEvent: true}); + sinon.assert.calledWith(FakeLogger.debug, 'on.data - first block received , this ChannelEventHub now registered'); + sinon.assert.called(myConnectCallback); + }); + it('should log debug and errors if connected and an error is detected', () => { checkAndAddConfigSettingStub.onCall(0).returns({}); checkAndAddConfigSettingStub.onCall(1).returns({}); @@ -733,15 +781,18 @@ describe('ChannelEventHub', () => { const _setReplayDefaultsStub = sandbox.stub(); const getPeerAddrStub = sandbox.stub().returns('addr'); const hub = new ChannelEventHub('channel', 'peer'); + const myConnectCallback = sandbox.stub(); hub._closeAllCallbacks = _closeAllCallbacksStub; hub._shutdown = _shutdownStub; hub._setReplayDefaults = _setReplayDefaultsStub; hub.getPeerAddr = getPeerAddrStub; + hub.connectCallback = myConnectCallback; hub.disconnect({message: 'error'}); sinon.assert.called(_closeAllCallbacksStub); sinon.assert.called(_shutdownStub); sinon.assert.called(_setReplayDefaultsStub); + sinon.assert.called(myConnectCallback); hub._connected.should.be.false; hub._connect_running.should.be.false; }); @@ -1082,6 +1133,31 @@ describe('ChannelEventHub', () => { sinon.assert.calledTwice(toBufferStub); }); + it('should run with startBlock:1 and endBlock:newest given identity and txId', () => { + hub._starting_block_number = 1; + hub._ending_block_number = 'oldest'; + hub.generateUnsignedRegistration({identity: 'identity', txId: new TransactionIDStub(), mspId: 'mspId', certificate: 'certificate'}); + sinon.assert.calledWith(SeekSpecifiedStub); + sinon.assert.calledWith(setNumberStub, 1); + sinon.assert.calledWith(setSpecifiedStub, new SeekSpecifiedStub()); + sinon.assert.called(SeekOldestStub); + sinon.assert.calledWith(setOldestStub, new SeekOldestStub()); + sinon.assert.called(SeekPositionStub); + sinon.assert.called(SeekInfoStub); + sinon.assert.calledWith(setStartStub, new SeekPositionStub()); + sinon.assert.calledWith(setStopStub, new SeekPositionStub()); + sinon.assert.calledWith(setBehaviorStub, 'FAIL_IF_NOT_READY'); + sinon.assert.called(buildChannelHeaderStub); + sinon.assert.called(getTransactionIDStub); + sinon.assert.called(getClientCertHashStub); + sinon.assert.called(getNonceStub); + sinon.assert.called(buildHeaderStub); + sinon.assert.called(buildHeaderStub); + sinon.assert.called(setHeaderStub); + sinon.assert.called(setDataStub); + sinon.assert.calledTwice(toBufferStub); + }); + it('should run with startBlock:1 and endBlock:1 given identity and txId', () => { hub._starting_block_number = 1; hub._ending_block_number = 1; @@ -1219,43 +1295,43 @@ describe('ChannelEventHub', () => { it('should return NO_START_STOP and log on entry and exit', () => { const result = hub._checkReplay({}); result.should.equal(0); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - start'); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - end'); + sinon.assert.calledWith(FakeLogger.debug, '%s - start'); + sinon.assert.calledWith(FakeLogger.debug, '%s - end'); }); it('should return START_ONLY with startBlock parameter of 1', () => { const result = hub._checkReplay({startBlock: 1}); result.should.equal(1); hub._starting_block_number.toInt().should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will start at block %s', Long.fromInt(1)); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will start at block %s', '_checkReplay', Long.fromInt(1)); }); it('should return START_ONLY with startBlock parameter of string 1', () => { const result = hub._checkReplay({startBlock: '1'}); result.should.equal(1); hub._starting_block_number.toInt().should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will start at block %s', Long.fromInt(1)); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will start at block %s', '_checkReplay', Long.fromInt(1)); }); it('should return START_ONLY with startBlock parameter of Long 1', () => { const result = hub._checkReplay({startBlock: Long.fromInt(1)}); result.should.equal(1); hub._starting_block_number.toInt().should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will start at block %s', Long.fromInt(1)); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will start at block %s', '_checkReplay', Long.fromInt(1)); }); it('should return START_ONLY with startBlock parameter of newest', () => { const result = hub._checkReplay({startBlock: 'newest'}); result.should.equal(1); hub._starting_block_number.should.equal('newest'); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will start at block %s', 'newest'); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will start at block %s', '_checkReplay', 'newest'); }); it('should return START_ONLY with startBlock parameter of oldest', () => { const result = hub._checkReplay({startBlock: 'oldest'}); result.should.equal(1); hub._starting_block_number.should.equal('oldest'); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will start at block %s', 'oldest'); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will start at block %s', '_checkReplay', 'oldest'); }); it('should return START_ONLY with startBlock parameter of last_seen', () => { @@ -1263,42 +1339,42 @@ describe('ChannelEventHub', () => { const result = hub._checkReplay({startBlock: 'last_seen'}); result.should.equal(1); hub._starting_block_number.should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will start at block %s', 1); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will start at block %s', '_checkReplay', 1); }); it('should return END_ONLY with endBlock parameter of 1', () => { const result = hub._checkReplay({endBlock: 1}); result.should.equal(2); hub._ending_block_number.toInt().should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will end at block %s', Long.fromInt(1)); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will end at block %s', '_checkReplay', Long.fromInt(1)); }); it('should return END_ONLY with endBlock parameter of string 1', () => { const result = hub._checkReplay({endBlock: '1'}); result.should.equal(2); hub._ending_block_number.toInt().should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will end at block %s', Long.fromInt(1)); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will end at block %s', '_checkReplay', Long.fromInt(1)); }); it('should return END_ONLY with endBlock parameter of Long 1', () => { const result = hub._checkReplay({endBlock: Long.fromInt(1)}); result.should.equal(2); hub._ending_block_number.toInt().should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will end at block %s', Long.fromInt(1)); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will end at block %s', '_checkReplay', Long.fromInt(1)); }); it('should return END_ONLY with endBlock parameter of newest', () => { const result = hub._checkReplay({endBlock: 'newest'}); result.should.equal(2); hub._ending_block_number.should.equal('newest'); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will end at block %s', 'newest'); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will end at block %s', '_checkReplay', 'newest'); }); it('should return END_ONLY with endBlock parameter of oldest', () => { const result = hub._checkReplay({endBlock: 'oldest'}); result.should.equal(2); hub._ending_block_number.should.equal('oldest'); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will end at block %s', 'oldest'); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will end at block %s', '_checkReplay', 'oldest'); }); it('should return END_ONLY with endBlock parameter of last_seen', () => { @@ -1306,7 +1382,7 @@ describe('ChannelEventHub', () => { const result = hub._checkReplay({endBlock: 'last_seen'}); result.should.equal(2); hub._ending_block_number.should.equal(1); - sinon.assert.calledWith(FakeLogger.debug, '_checkReplay - Event listening will end at block %s', 1); + sinon.assert.calledWith(FakeLogger.debug, '%s - Event listening will end at block %s', '_checkReplay', 1); }); it('should throw an error if startBlock is greater than endBlock', () => { @@ -1315,16 +1391,16 @@ describe('ChannelEventHub', () => { }).should.throw('"startBlock" (2) must not be greater than "endBlock" (1)'); }); - it('should throw an error if startBlock given and have _start_stop_registration', () => { - hub._start_stop_registration = {}; + it('should throw an error if startBlock given and have _start_stop_action', () => { + hub._start_stop_action = {}; (() => { hub._checkReplay({startBlock: 1}, true); }).should.throw('Not able to connect with startBlock or endBlock when a registered listener has those options.'); sinon.assert.calledWith(FakeLogger.error, 'This ChannelEventHub has a registered listener that has options of startBlock or endBlock'); }); - it('should throw an error if endBlock given and have _start_stop_registration', () => { - hub._start_stop_registration = {}; + it('should throw an error if endBlock given and have _start_stop_action', () => { + hub._start_stop_action = {}; (() => { hub._checkReplay({endBlock: 1}, true); }).should.throw('Not able to connect with startBlock or endBlock when a registered listener has those options.'); @@ -1405,6 +1481,14 @@ describe('ChannelEventHub', () => { }).should.throw('Registrations with startBlock or endBlock are not allowed if this ChannelEventHub is connected with a startBlock or endBlock'); sinon.assert.calledWith(FakeLogger.error, 'This ChannelEventHub has been connected with a startBlock or endBlock'); }); + + it('should set the start and end blocks from connect', () => { + hub._start_stop_connect = false; + hub._checkReplay({startBlock: Long.fromInt(1), endBlock: Long.fromInt(2)}, true); + should.equal(hub._ending_block_number.toInt(), 2, 'Should have a ending block from connect'); + should.equal(hub._starting_block_number.toInt(), 1, 'Should have a starting block from connect'); + should.equal(hub._start_stop_connect, true, 'Should see that we have a start stop from connect'); + }); }); describe('#_haveRegistrations', () => { @@ -1624,8 +1708,8 @@ describe('ChannelEventHub', () => { hub._chaincodeRegistrants = {'cc': cc}; _checkReplayStub.returns(2); hub.registerChaincodeEvent('cc', 'event', 'onEvent', 'onError', 'options'); - hub._start_stop_registration.unregister_action.should.be.instanceof(Function); - hub._start_stop_registration.unregister_action(); + hub._start_stop_action.unregister.should.be.instanceof(Function); + hub._start_stop_action.unregister(); sinon.assert.called(unregisterChaincodeEventStub); }); }); @@ -1709,23 +1793,23 @@ describe('ChannelEventHub', () => { const regNumber = hub.registerBlockEvent('onEvent', 'onError', 'options'); sinon.assert.called(_checkAllowRegistrationsStub); sinon.assert.calledWith(_checkReplayStub, 'options'); - sinon.assert.calledWith(EventRegistrationStub, 'onEvent', 'onError', 'options', true, false); + sinon.assert.calledWith(EventRegistrationStub, 'onEvent', 'onError', 'options', false, false); sinon.assert.called(_checkConnectionStub); regNumber.should.equal(1); hub._blockRegistrations[1].should.deep.equal(new EventRegistrationStub()); }); - it('shold change default_disconnect to true', () => { + it('should change default_disconnect to false', () => { _checkReplayStub.returns(2); hub.registerBlockEvent('onEvent', 'onError', 'options'); - sinon.assert.calledWith(EventRegistrationStub, 'onEvent', 'onError', 'options', true, true); + sinon.assert.calledWith(EventRegistrationStub, 'onEvent', 'onError', 'options', false, false); }); - it('should set _start_stop_registration, unregister_action callback and call unregisterBlockEvent', () => { - _checkReplayStub.returns(1); - hub.registerBlockEvent('onEvent', 'onError', 'options'); - should.exist(hub._start_stop_registration); - hub._start_stop_registration.unregister_action(); + it('should set _start_stop_action, unregister_action callback and call unregisterBlockEvent', () => { + _checkReplayStub.returns(2); + hub.registerBlockEvent('onEvent', 'onError', {endBlock:2}); + should.exist(hub._start_stop_action); + hub._start_stop_action.unregister(); sinon.assert.calledWith(unregisterBlockEventStub, 1); }); }); @@ -1822,19 +1906,19 @@ describe('ChannelEventHub', () => { txid.should.equal('txid'); }); - it('should cahnge txid to lowercase change default_unregister to false and change default_disconnect to true', () => { + it('should change txid to lowercase change default_unregister to false and change default_disconnect to false', () => { _checkReplayStub.returns(2); const txid = hub.registerTxEvent('ALL', 'onEvent', 'onError', 'options'); sinon.assert.calledWith(_checkAllowRegistrationsStub); sinon.assert.calledWith(_checkReplayStub, 'options'); - sinon.assert.calledWith(EventRegistrationsStub, 'onEvent', 'onError', 'options', false, true); + sinon.assert.calledWith(EventRegistrationsStub, 'onEvent', 'onError', 'options', false, false); txid.should.equal('all'); }); it('should set the unregister_action', () => { - _checkReplayStub.returns(1); + _checkReplayStub.returns(2); hub.registerTxEvent('ALL', 'onEvent', 'onError', 'options'); - hub._start_stop_registration.unregister_action(); + hub._start_stop_action.unregister(); sinon.assert.calledWith(unregisterTxEventStub, 'all'); sinon.assert.calledWith(_checkConnectionStub); }); @@ -1871,6 +1955,7 @@ describe('ChannelEventHub', () => { describe('#_processBlockEvents', () => { let hub; + beforeEach(() => { hub = new ChannelEventHub('channel', 'peer'); }); @@ -1898,6 +1983,18 @@ describe('ChannelEventHub', () => { sinon.assert.calledTwice(onEventStub); }); + + it('should call unregister and disconnect', () => { + const onEventStub = sandbox.stub(); + hub.unregisterBlockEvent = sandbox.stub(); + hub._disconnect = sandbox.stub(); + hub._blockRegistrations = {key1: {onEvent: onEventStub, unregister: true, disconnect:true}}; + hub._processBlockEvents('block'); + sinon.assert.calledWith(FakeLogger.debug, '_processBlockEvents - calling block listener callback'); + sinon.assert.calledWith(onEventStub, 'block'); + sinon.assert.calledWith(hub.unregisterBlockEvent, 'key1'); + sinon.assert.called(hub._disconnect); + }); }); describe('#_processTxEvents', () => { @@ -2019,7 +2116,7 @@ describe('ChannelEventHub', () => { sinon.assert.calledWith(FakeLogger.debug, '_callTransactionListener - about to call the transaction call back for code=%s tx=%s', 'val_code', 'tx_id'); }); - it('shold call convertValidationCode and trans_reg.onEvent', () => { + it('should call convertValidationCode and trans_reg.onEvent', () => { convertValidationCodeStub.returns('status'); hub._callTransactionListener('tx_id', 'val_code', 1, {onEvent: onEventStub}); sinon.assert.calledWith(convertValidationCodeStub, 'val_code'); @@ -2213,12 +2310,12 @@ describe('ChannelEventHub', () => { convertValidationCodeStub.returns('status'); const chaincodeReg = {eventNameFilter: new RegExp(/event/), event_reg: {onEvent: onEventStub, unregister: true}}; const chaincodeEvent = {chaincode_id: 'cc', event_name: 'event', payload: 'payload'}; - Set.prototype.delete = deleteStub; hub._chaincodeRegistrants = {cc: new Set([chaincodeReg])}; + hub.unregisterChaincodeEvent = deleteStub; hub._callChaincodeListener(chaincodeEvent, 'block_num', 'tx_id', 'val_code', false); sinon.assert.called(deleteStub); sinon.assert.calledWith(deleteStub, chaincodeReg); - sinon.assert.calledWith(FakeLogger.debug, '_callChaincodeListener - automatically unregister tx listener for %s', 'tx_id'); + sinon.assert.calledWith(FakeLogger.debug, '_callChaincodeListener - automatically unregister chaincode event listener for tx_id:%s', 'tx_id'); delete Set.prototype.delete; }); @@ -2265,7 +2362,7 @@ describe('ChannelEventHub', () => { sinon.assert.calledWith(lteStub, null); }); - it('should exit if _start_stop_registration is null', () => { + it('should exit if _start_stop_action is null', () => { lteStub.returns(true); hub._last_block_seen = 1; hub._ending_block_number = {lessThanOrEqual: lteStub}; @@ -2276,10 +2373,10 @@ describe('ChannelEventHub', () => { sinon.assert.calledWith(lteStub, 1); }); - it('should exit if _start_stop_registration is not null and unregister is false', () => { + it('should exit if _start_stop_action is not null and unregister is false', () => { lteStub.returns(true); hub._last_block_seen = 1; - hub._start_stop_registration = {unregister: false}; + hub._start_stop_action = {unregister: false}; hub._ending_block_number = {lessThanOrEqual: lteStub}; hub._checkReplayEnd(); hub._ending_block_seen.should.be.true; @@ -2288,10 +2385,10 @@ describe('ChannelEventHub', () => { sinon.assert.calledWith(lteStub, 1); }); - it('should exit if _start_stop_registration is not null and unregister is true after calling unregister_action', () => { + it('should exit if _start_stop_action is not null and unregister is true after calling unregister_action', () => { lteStub.returns(true); hub._last_block_seen = 1; - hub._start_stop_registration = {unregister: true, unregister_action: unregister_actionStub}; + hub._start_stop_action = {unregister: unregister_actionStub}; hub._ending_block_number = {lessThanOrEqual: lteStub}; hub._checkReplayEnd(); hub._ending_block_seen.should.be.true; @@ -2300,10 +2397,10 @@ describe('ChannelEventHub', () => { sinon.assert.calledWith(lteStub, 1); }); - it('should exit if _start_stop_registration is not null and disconnect is true after calling disconnect', () => { + it('should exit if _start_stop_action is not null and disconnect is true after calling disconnect', () => { lteStub.returns(true); hub._last_block_seen = 1; - hub._start_stop_registration = {unregister: false, disconnect: true}; + hub._start_stop_action = {unregister: false, disconnect: true}; hub._ending_block_number = {lessThanOrEqual: lteStub}; hub._checkReplayEnd(); hub._ending_block_seen.should.be.true; @@ -2319,7 +2416,53 @@ describe('ChannelEventHub', () => { should.equal(hub._ending_block_number, null); hub._ending_block_seen.should.be.false; hub._ending_block_newest.should.be.false; - should.equal(hub._start_stop_registration, null); + should.equal(hub._start_stop_action, null); + }); + + describe('#_on_end_actions', () => { + + let hub; + beforeEach(() => { + hub = new ChannelEventHub('channel', 'peer'); + }); + + it('should exit without calling if startstop mode is not greater than no_start_stop', () => { + hub._on_end_actions('EVENT_REG', 'ACTION', hub.NO_START_STOP, {}); + should.equal(hub._start_stop_action, null); + sinon.assert.calledWith(FakeLogger.debug, '_on_end_actions - no end block action required'); + }); + + it('should not set the unregister action', () => { + hub._on_end_actions('EVENT_REG', 'ACTION', hub.END_ONLY, {unregister: false}); + should.equal(hub._start_stop_action, null); + }); + + it('should not set the unregister action', () => { + hub._on_end_actions('EVENT_REG', 'ACTION', 3, {unregister: false}); + should.equal(hub._start_stop_action.unregister, undefined); + should.equal(hub._start_stop_action.event_reg, 'EVENT_REG'); + }); + + it('should set the unregister action', () => { + hub._on_end_actions('EVENT_REG', 'ACTION', 3, {unregister: true}); + should.equal(hub._start_stop_action.unregister, 'ACTION'); + }); + + it('should not set the disconnect action', () => { + hub._on_end_actions('EVENT_REG', 'ACTION', hub.END_ONLY, {unregister: false}); + should.equal(hub._start_stop_action, null); + }); + + it('should not set the disconnect action', () => { + hub._on_end_actions('EVENT_REG', 'ACTION', 3, {disconnect: false}); + should.equal(hub._start_stop_action.disconnect, undefined); + should.equal(hub._start_stop_action.event_reg, 'EVENT_REG'); + }); + + it('should set the disconnect action', () => { + hub._on_end_actions('EVENT_REG', 'ACTION', 3, {disconnect: true}); + should.equal(hub._start_stop_action.disconnect, true); + }); }); }); @@ -2415,8 +2558,8 @@ describe('EventRegistration', () => { describe('#constructor', () => { it('should log if unregister is not defined', () => { - new EventRegistration('onEvent', 'onError', {}); - sinon.assert.calledWith(FakeLogger.debug, 'const-EventRegistration - unregister was not defined'); + new EventRegistration('onEvent', 'onError', {}, false, false); + sinon.assert.calledWith(FakeLogger.debug, 'const-EventRegistration - unregister was not defined, using default of %s'); }); it('should set unregister if it is boolean', () => { @@ -2431,8 +2574,8 @@ describe('EventRegistration', () => { }); it('should log if disconnect is not defined', () => { - new EventRegistration('onEvent', 'onError', {}); - sinon.assert.calledWith(FakeLogger.debug, 'const-EventRegistration - disconnect was not defined'); + new EventRegistration('onEvent', 'onError', {}, false, false); + sinon.assert.calledWith(FakeLogger.debug, 'const-EventRegistration - disconnect was not defined, using default of %s'); }); it('should set disconnect if it is boolean', () => { @@ -2452,8 +2595,6 @@ describe('EventRegistration', () => { reg._onErrorFn.should.equal('onError'); reg.unregister.should.equal('default_unregister'); reg.disconnect.should.equal('default_disconnect'); - reg.unregister_action.should.be.instanceof(Function); - should.equal(reg.unregister_action(), undefined); }); }); }); diff --git a/test/integration/channel-event-hub.js b/test/integration/channel-event-hub.js index 0646626407..455eacafd4 100644 --- a/test/integration/channel-event-hub.js +++ b/test/integration/channel-event-hub.js @@ -209,9 +209,9 @@ test('***** Test channel events', async (t) => { t.fail('Should have received the callback error'); } catch (error) { if (got_callback_error) { - t.pass('Successfully got the expexted error from the event service callback testing::' + error.toString()); + t.pass('Successfully got the expected error from the event service callback testing::' + error.toString()); } else { - t.fail('FAILED to get the expexted error from the event service callback testing::' + error.toString()); + t.fail('FAILED to get the expected error from the event service callback testing::' + error.toString()); } } @@ -532,10 +532,10 @@ test('***** Test channel events', async (t) => { const handle = setTimeout(() => { t.fail('Timeout - Failed to replay all the block events in a reasonable amount of time'); throw new Error('Timeout - block replay has not completed'); - }, 10000); + }, 300000); // register to replay all block events - eh2.registerBlockEvent((full_block) => { + const block_reg = eh2.registerBlockEvent((full_block) => { t.pass('Successfully got a replayed block ::' + full_block.header.number); // block number is decoded into human readable form // let's put it back into a long @@ -543,6 +543,7 @@ test('***** Test channel events', async (t) => { if (event_block.equals(current_block)) { t.pass('Successfully got the last block number'); clearTimeout(handle); + eh2.unregisterBlockEvent(block_reg); // not required, default is to unregister when last block is seen resolve('all blocks replayed'); } // keep going...do not resolve this promise yet