From d5e55825d6a589f1922c081b8bb2509491615542 Mon Sep 17 00:00:00 2001 From: Bret Harrison Date: Wed, 8 Aug 2018 17:51:53 -0400 Subject: [PATCH] FABN-853 NodeSDK - listen for newest block Allow for listeners to register for the 'newest' block when setting the endBlock during event replay operations. Change-Id: I1089d92d2109e973a80349fdfc8fa37d54926f21 Signed-off-by: Bret Harrison --- docs/tutorials/channel-events.md | 54 +++++++++++-- fabric-client/lib/ChannelEventHub.js | 111 +++++++++++++++++++------- test/integration/channel-event-hub.js | 50 +++++++++++- test/unit/channel-event-hub.js | 44 ++++++++++ 4 files changed, 220 insertions(+), 39 deletions(-) diff --git a/docs/tutorials/channel-events.md b/docs/tutorials/channel-events.md index 3ca685b5f9..b30bc92623 100644 --- a/docs/tutorials/channel-events.md +++ b/docs/tutorials/channel-events.md @@ -59,6 +59,9 @@ connects with the service, it will request to receive blocks or filtered blocks. If the `full_block` parameter is omitted, it will default to false and filtered blocks will be requested. Receiving blocks or filtered blocks can not be changed once `connect()` is called. +When replaying blocks (by setting the startBlock and endBlock) `connect()` must be +called after registering the listener as the connection to the peer must be +setup to request existing blocks. * `disconnect()` -- To have the client channel event hub shutdown the connection to the fabric network channel-based event service and notify all current channel event registrations of the shutdown by using the registered `errorCallBack`s. @@ -78,6 +81,8 @@ This is an optional parameter. This is the callback function to be notified when this channel event hub is shutdown. The shutdown may be caused by a fabric network error, network connection problem or by a call to the `disconnect()` method. +This callback will also be called when the channel event hub is shutdown +due to the last block being received if replaying with the endBlock set to 'newest'. #### `options` parameter This is an optional parameter. This parameter will contain the following optional @@ -92,9 +97,9 @@ properties: Replaying events may confuse other event listeners; therefore, only one listener will be allowed on a `ChannelEventHub` when `startBlock` and/or `endBlock` are used. When this parameter is excluded (as it will be normally) the event service - will be asked to start sending blocks from the last block on the ledger. + will be asked to start sending blocks from the last (newest) block on the ledger. -* {integer} `endBlock` -- (Optional) The ending block number for event checking. +* {integer | 'newest'} `endBlock` -- (Optional) The ending block number for event checking. When included, the peer's channel-based event service will be asked to stop sending blocks once this block is delivered. @@ -103,6 +108,11 @@ properties: the current channel block height. Replaying events may confuse other event listeners; therefore, only one listener will be allowed on a `ChannelEventHub` when `startBlock` and/or `endBlock` are used. + The value 'newest' will indicate that 'endBlock' will be calculated by the + peer as the newest block on the ledger. + This allows the application to replay up to the latest block on + the ledger and then the listener will stop and be notified by the + 'onError' callback. * {boolean} `unregister` -- (Optional) This setting indicates that the registration should be removed (unregister) when the event is seen. When the @@ -176,7 +186,8 @@ application to resume and replay events that may have been lost if the application were to be offline. The application should remember the last block it has processed to avoid replaying the entire ledger. -The following example will register a block listener to start receiving blocks. +The following example will register a block listener to start receiving new +blocks as they are added to the ledger. ``` // keep the block_reg to unregister with later if needed @@ -216,17 +227,45 @@ when the end block event is seen by the listener. The application will not have to handle this housekeeping. ``` -block_reg = channel_event_hub.registerBlockEvent((block) => { - console.log('Successfully received the block event'); +block_reg = channel_event_hub.registerBlockEvent((full_block) => { + console.log('Successfully received a block event'); + const event_block = Long.fromValue(full_block.header.number); + if(event_block.equals(current_block)) { + console.log('Successfully got the last block number'); + + } }, (error)=> { console.log('Failed to receive the block event ::'+error); }, // for block listeners, the defaults for unregister and disconnect are true, - // so the they are not required to be set in the following example + // so they are not required to be set in the following example {startBlock:23, endBlock:30, unregister: true, disconnect: true} ); +channel_event_hub.connect(true); //get full blocks +``` + +The following example will register with a start block number and an end block +set to 'newest'. The error callback will be called to notify the application +that the last block has been delivered and that the listener has been shutdown. + +``` +block_reg = channel_event_hub.registerBlockEvent((block) => { + console.log('Successfully received the block event'); + +}, (error)=> { + if(error.toString().indexOf('Newest block received')) { + console.log('Received latest block'); + + } else { + console.log('Failed to receive the block event ::'+error); + + } + +}, + {startBlock:23, endBlock:'newest'} +); ``` ### Transaction listener @@ -296,6 +335,7 @@ let event_monitor = new Promise((resolve, reject) => { // notice that `unregister` is not specified, so it will default to true // `disconnect` is also not specified and will default to false ); + channel_event_hub.connect(); }); let send_trans = channel.sendTransaction({proposalResponses: results[0], proposal: results[1]}); @@ -384,8 +424,6 @@ let event_monitor = new Promise((resolve, reject) => { reject(error); } // no options specified - // startBlock will default to latest - // endBlock will default to MAX // unregister will default to false // disconnect will default to false ); diff --git a/fabric-client/lib/ChannelEventHub.js b/fabric-client/lib/ChannelEventHub.js index ea54b170b9..01da04b563 100644 --- a/fabric-client/lib/ChannelEventHub.js +++ b/fabric-client/lib/ChannelEventHub.js @@ -56,6 +56,8 @@ const five_minutes_ms = 5 * 60 * 1000; // notified of all transactions const ALL = 'all'; +// Special value for block numbers +const NEWEST = Long.fromValue(-1); /** * Transaction processing in fabric v1.1 is a long operation spanning multiple @@ -138,12 +140,9 @@ const ChannelEventHub = class { logger.debug('const '); // this will hold the last block number received this._last_block_seen = null; - // these will hold the block numbers to be used when this - // event hub connects to the remote peer's channel event sevice - this._starting_block_number = null; - this._ending_block_number = null; - this._ending_block_seen = false; - this._start_stop_registration = null; + + this._setReplayDefaults(); + // hashtable of clients registered for chaincode events this._chaincodeRegistrants = {}; // set of clients registered for block events @@ -155,8 +154,7 @@ const ChannelEventHub = class { this._event_client = null; // grpc chat streaming interface this._stream = null; - //allow this hub to to registar new listeners - this._allowRegistration = true; + // fabric connection state of this ChannelEventHub this._connected = false; this._connect_running = false; @@ -370,14 +368,17 @@ const ChannelEventHub = class { } } else if (deliverResponse.Type === 'status') { - if (self._ending_block_seen) { - // this is normal after the last block comes in when we set - // an ending block - logger.debug('on.data - status received after last block seen: %s', deliverResponse.status); + if( deliverResponse.status === 'SUCCESS') { + if (self._ending_block_seen) { + // this is normal after the last block comes in when we set an ending block + logger.debug('on.data - status received after last block seen: %s block_num:', deliverResponse.status, self._last_block_seen); + } if (self._ending_block_newest) { + // this is normal after the last block comes in when we set to newest as an ending block + logger.debug('on.data - status received when newest block seen: %s block_num:', deliverResponse.status, self._last_block_seen); + self._disconnect(new Error(`Newest block received:${self._last_block_seen} status:${deliverResponse.status}`)); + } } else { - // only blocks should be received .... get status means we need to tell - // all registered users that something is wrong and the stream is will be close or - // has been closed + // tell all registered users that something is wrong and shutting down logger.debug('on.data - status received - %s', deliverResponse.status); self._disconnect(new Error(`Received status message on the event stream. status:${deliverResponse.status}`)); } @@ -456,7 +457,8 @@ const ChannelEventHub = class { this.disconnect(); } - /* Internal method + /* + * Internal method * Disconnects the connection to the peer event source. * Will close all event listeners and send an `Error` to * all listeners that provided an "onError" callback. @@ -467,9 +469,14 @@ const ChannelEventHub = class { this._connect_running = false; this._closeAllCallbacks(err); this._shutdown(); + this._setReplayDefaults(); logger.debug('_disconnect - end -- called due to:: %s, peer:%s', err.message, this.getPeerAddr()); } + /* + * Internal method + * Closes the grpc stream and service client + */ _shutdown() { if (this._stream) { logger.debug('_shutdown - shutdown existing stream'); @@ -488,6 +495,8 @@ const ChannelEventHub = class { * and sends it to the peer's event hub. */ _sendRegistration() { + // The behavior when a missing block is encountered + let behavior = _abProto.SeekInfo.SeekBehavior.BLOCK_UNTIL_READY; // build start const seekStart = new _abProto.SeekPosition(); if (this._starting_block_number) { @@ -501,13 +510,21 @@ const ChannelEventHub = class { // build stop const seekStop = new _abProto.SeekPosition(); - const seekSpecifiedStop = new _abProto.SeekSpecified(); - if (this._ending_block_number) { - seekSpecifiedStop.setNumber(this._ending_block_number); + if(this._ending_block_newest) { + const seekNewest = new _abProto.SeekNewest(); + seekStop.setNewest(seekNewest); + behavior = _abProto.SeekInfo.SeekBehavior.FAIL_IF_NOT_READY; } else { - seekSpecifiedStop.setNumber(Long.MAX_VALUE); + const seekSpecifiedStop = new _abProto.SeekSpecified(); + if (this._ending_block_number) { + seekSpecifiedStop.setNumber(this._ending_block_number); + // user should know the block does not exist + behavior = _abProto.SeekInfo.SeekBehavior.FAIL_IF_NOT_READY; + } else { + seekSpecifiedStop.setNumber(Long.MAX_VALUE); + } + seekStop.setSpecified(seekSpecifiedStop); } - seekStop.setSpecified(seekSpecifiedStop); // seek info with all parts const seekInfo = new _abProto.SeekInfo(); @@ -516,7 +533,9 @@ const ChannelEventHub = class { // BLOCK_UNTIL_READY will mean hold the stream open and keep sending as // the blocks come in // FAIL_IF_NOT_READY will mean if the block is not there throw an error - seekInfo.setBehavior(_abProto.SeekInfo.SeekBehavior.BLOCK_UNTIL_READY); + seekInfo.setBehavior(behavior); + + // use the admin if available const tx_id = this._clientContext.newTransactionID(true); const signer = this._clientContext._getSigningIdentity(true); @@ -603,10 +622,11 @@ const ChannelEventHub = class { * Internal method * checks the startBlock/endBlock options * checks that only one registration when using startBlock/endBlock - * @returns true if the endBlock has been set otherwise false + * @returns enum of how the endBlock and startBlock have been set */ _checkReplay(options) { logger.debug('_checkReplay - start'); + let result = NO_START_STOP; let have_start_block = false; let have_end_block = false; @@ -621,7 +641,14 @@ const ChannelEventHub = class { } if (options && typeof options.endBlock !== 'undefined') { try { - converted_options.end_block = utils.convertToLong(options.endBlock); + let end_block = options.endBlock; + if(typeof end_block === 'string') { + if(end_block.toLowerCase() === 'newest') { + end_block = Long.MAX_VALUE; + this._ending_block_newest = true; + } + } + converted_options.end_block = utils.convertToLong(end_block); have_end_block = true; } catch (error) { throw new Error('Problem with the endBlock parameter ::' + error); @@ -752,8 +779,13 @@ const ChannelEventHub = class { * Setting a startBlock may confuse other event listeners, * therefore only one listener will be allowed on a ChannelEventHub * when a startBlock is being used. - * @property {integer} endBlock - Optional - The ending block number - * for event checking. When included, the peer's channel event service + * @property {integer | 'newest'} endBlock - Optional - The ending block number + * for event checking. The value 'newest' to indicate that endBlock + * will be calculated by the peer as the newest block on the ledger. + * This allows the application to replay up to the latest block on + * the ledger and then the listener will stop and be notified by the + * 'onError' callback. + * When included, the peer's channel event service * will be asked to stop sending blocks once this block is delivered. * This is how to replay missed blocks that were added * to the ledger. When a startBlock is not included, the endBlock @@ -804,6 +836,9 @@ const ChannelEventHub = class { * @param {function} onError - Optional callback function to be notified when this event hub * is shutdown. The shutdown may be caused by a network error or by * a call to the "disconnect()" method or a connection error. + * This callback will also be called when the event hub is shutdown + * due to the last block being received if replaying and requesting + * the endBlock to be 'newest'. * @param {RegistrationOpts} options - * @returns {Object} An object that should be treated as an opaque handle used * to unregister (see {@link unregisterChaincodeEvent}) @@ -890,6 +925,9 @@ const ChannelEventHub = class { * @param {function} onError - Optional callback function to be notified when this event hub * is shutdown. The shutdown may be caused by a network error or by * a call to the "disconnect()" method or a connection error. + * This callback will also be called when the event hub is shutdown + * due to the last block being received if replaying and requesting + * the endBlock to be 'newest'. * @param {RegistrationOpts} options - * @returns {int} This is the block registration number that must be * sed to unregister (see unregisterBlockEvent) @@ -971,6 +1009,9 @@ const ChannelEventHub = class { * @param {function} onError - Optional callback function to be notified when this event hub * is shutdown. The shutdown may be caused by a network error or by * a call to the "disconnect()" method or a connection error. + * This callback will also be called when the event hub is shutdown + * due to the last block being received if replaying and requesting + * the endBlock to be 'newest'. * @param {RegistrationOpts} options - * @returns {string} The transaction ID that was used to register this event listener, * will the same as the txid parameter and must be used to unregister @@ -1220,7 +1261,7 @@ const ChannelEventHub = class { _checkReplayEnd() { if (this._ending_block_number) { if (this._ending_block_number.lessThanOrEqual(this._last_block_seen)) { - //see if the listener wants to do anything else + this._ending_block_seen = true; if (this._start_stop_registration) { if (this._start_stop_registration.unregister) { this._start_stop_registration.unregister_action(); @@ -1232,6 +1273,22 @@ const ChannelEventHub = class { } } } + + /* + * utility method to reset the replay state + */ + _setReplayDefaults() { + // these will hold the block numbers to be used when this + // event hub connects to the remote peer's channel event sevice + this._starting_block_number = null; + this._ending_block_number = null; + this._ending_block_seen = false; + this._ending_block_newest = false; + //allow this hub to to registar new listeners + this._allowRegistration = true; + this._start_stop_registration = null; + } + }; module.exports = ChannelEventHub; diff --git a/test/integration/channel-event-hub.js b/test/integration/channel-event-hub.js index 3a699b202c..1414dc5571 100644 --- a/test/integration/channel-event-hub.js +++ b/test/integration/channel-event-hub.js @@ -431,11 +431,11 @@ test('***** Test channel events', async (t) => { eventhubs.push(eh2); //putting on this list will have it closed on the test end let block_reg_num = null; - const block_replay = new Promise((resolve, reject) => { + let block_replay = new Promise((resolve, reject) => { const handle = setTimeout(() => { t.fail('Timeout - Failed to replay all the block events in a reasonable amount of time'); throw new Error('Timeout - block replay has not completed'); - }, 600000000); + }, 10000); // register to replay all block events block_reg_num = eh2.registerBlockEvent((full_block) => { @@ -444,7 +444,7 @@ test('***** Test channel events', async (t) => { // let's put it back into a long const event_block = Long.fromValue(full_block.header.number); if(event_block.equals(current_block)) { - t.pass('Successfully got the same last block number'); + t.pass('Successfully got the last block number'); clearTimeout(handle); resolve('all blocks replayed'); } @@ -454,7 +454,6 @@ test('***** Test channel events', async (t) => { t.fail('Failed to replay all the block events'); throw new Error('Replay Error callback was called with ::' + error); }, - // setting the unregister to false, application code will handle it {startBlock : 0, endBlock : current_block} ); eh2.connect(true); @@ -463,10 +462,53 @@ test('***** Test channel events', async (t) => { results = await block_replay; t.equals(results, 'all blocks replayed', 'Checking that all blocks were replayed'); + eh2.disconnect(); //clean up + + let seen_last_block = false; + block_replay = new Promise((resolve, reject) => { + const handle = setTimeout(() => { + t.fail('Timeout - Failed to replay all the block events in a reasonable amount of time'); + throw new Error('Timeout - block replay has not completed'); + }, 10000); + + // register to replay all block events + block_reg_num = eh2.registerBlockEvent((full_block) => { + t.pass('Successfully got a replayed block ::' + full_block.header.number); + // block number is decoded into human readable form + // let's put it back into a long + const event_block = Long.fromValue(full_block.header.number); + if(event_block.equals(current_block)) { + t.pass('Successfully got the last block number'); + seen_last_block = true; + } + // keep going...do not resolve this promise yet + }, (error) => { + clearTimeout(handle); + if(error.toString().indexOf('Newest block received')) { + // this error callback will be called to indicate that the listener is no longer listening + // in this case it is OK as the message indicates that newest block was sent + t.pass('Message received inidicating newest block received ::' + error); + resolve('newest block replayed'); + } else { + t.fail('Failed to replay all the block events'); + throw new Error('Replay Error callback was called with ::' + error); + } + + }, + {startBlock : 0, endBlock : 'newest'} + ); + eh2.connect(true); //need to connect as disconnect was called + }); + + results = await block_replay; + t.equals(results, 'newest block replayed', 'Checking that newest block replayed'); + + t.pass(' ======>>>>> CHANNEL EVENT INTEGRATION TEST END'); } catch(catch_err) { t.fail('Testing of channel events has failed with ' + catch_err); } + t.end(); }); diff --git a/test/unit/channel-event-hub.js b/test/unit/channel-event-hub.js index 6244f4ab77..1c5beb1e7c 100644 --- a/test/unit/channel-event-hub.js +++ b/test/unit/channel-event-hub.js @@ -447,6 +447,50 @@ test('\n\n** ChannelEventHub transaction callback with replay \n\n', (t) => { 'Checking for "startBlock" (%s) must not be larger than "endBlock" (%s)' ); + let got_called = false; + try { + eh.unregisterTxEvent('transid'); + eh.registerTxEvent('all', () => { + t.fail('Should not have called success callback'); + t.end(); + }, (err) =>{ + got_called = true; + t.pass('Should be called after getting last trans or a shutdown'); + t.equals(err.toString().indexOf('ChannelEventHub has been shutdown'), 7,'Check that we got the correct error message'); + }, + {startBlock: 1, endBlock: 'newest'} + ); + t.pass('Successfully registered a newest playback transaction event'); + } catch(error) { + t.fail( 'Failed - Should be able to register with newest replay'); + } + t.equal(eh._ending_block_newest, true, 'Check the newest state'); + t.equal(eh._allowRegistration, false, 'Check the replay state'); + t.equal(eh._ending_block_number, Long.MAX_VALUE, 'Check the replay end block'); + + // this should get some errors posted + eh.disconnect(); + t.equal(got_called, true, 'Check that error callback was called'); + + try { + eh.unregisterTxEvent('transid'); + eh.registerBlockEvent(() => { + t.fail('Should not have called success callback'); + t.end(); + }, () =>{ + t.fail('Should not have called error callback'); + t.end(); + }, + {startBlock: 10000000, endBlock: 'newest'} + ); + t.pass('Successfully registered a newest playback block event'); + } catch(error) { + t.fail( 'Failed - Should be able to register with newest replay'); + } + t.equal(eh._ending_block_newest, true, 'Check the newest state'); + t.equal(eh._allowRegistration, false, 'Check the replay state'); + t.equal(eh._ending_block_number, Long.MAX_VALUE, 'Check the replay end block'); + t.end(); });