Skip to content

Commit

Permalink
FABN-853 NodeSDK - listen for newest block
Browse files Browse the repository at this point in the history
Allow for listeners to register for the 'newest' block
when setting the endBlock during event replay operations.

Change-Id: I1089d92d2109e973a80349fdfc8fa37d54926f21
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Aug 8, 2018
1 parent 3b187e4 commit d5e5582
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 39 deletions.
54 changes: 46 additions & 8 deletions docs/tutorials/channel-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ connects with the service, it will request to receive blocks or filtered blocks.
If the `full_block` parameter is omitted, it will default to false
and filtered blocks will be requested. Receiving blocks or filtered blocks
can not be changed once `connect()` is called.
When replaying blocks (by setting the startBlock and endBlock) `connect()` must be
called after registering the listener as the connection to the peer must be
setup to request existing blocks.
* `disconnect()` -- To have the client channel event hub shutdown the connection
to the fabric network channel-based event service and notify all current channel
event registrations of the shutdown by using the registered `errorCallBack`s.
Expand All @@ -78,6 +81,8 @@ This is an optional parameter. This is the callback function to be notified when
this channel event hub is shutdown. The shutdown may be caused by a fabric
network error, network connection problem or by a call to the `disconnect()`
method.
This callback will also be called when the channel event hub is shutdown
due to the last block being received if replaying with the endBlock set to 'newest'.

#### `options` parameter
This is an optional parameter. This parameter will contain the following optional
Expand All @@ -92,9 +97,9 @@ properties:
Replaying events may confuse other event listeners; therefore, only one listener
will be allowed on a `ChannelEventHub` when `startBlock` and/or `endBlock` are used.
When this parameter is excluded (as it will be normally) the event service
will be asked to start sending blocks from the last block on the ledger.
will be asked to start sending blocks from the last (newest) block on the ledger.

* {integer} `endBlock` -- (Optional) The ending block number for event checking.
* {integer | 'newest'} `endBlock` -- (Optional) The ending block number for event checking.
When included, the peer's channel-based event service will be asked to stop
sending blocks once this block is delivered.

Expand All @@ -103,6 +108,11 @@ properties:
the current channel block height. Replaying events may confuse other event
listeners; therefore, only one listener will be allowed on a `ChannelEventHub`
when `startBlock` and/or `endBlock` are used.
The value 'newest' will indicate that 'endBlock' will be calculated by the
peer as the newest block on the ledger.
This allows the application to replay up to the latest block on
the ledger and then the listener will stop and be notified by the
'onError' callback.

* {boolean} `unregister` -- (Optional) This setting indicates that the
registration should be removed (unregister) when the event is seen. When the
Expand Down Expand Up @@ -176,7 +186,8 @@ application to resume and replay events that may have been lost if the
application were to be offline. The application should remember the last block
it has processed to avoid replaying the entire ledger.

The following example will register a block listener to start receiving blocks.
The following example will register a block listener to start receiving new
blocks as they are added to the ledger.

```
// keep the block_reg to unregister with later if needed
Expand Down Expand Up @@ -216,17 +227,45 @@ when the end block event is seen by the listener. The application will not have
to handle this housekeeping.

```
block_reg = channel_event_hub.registerBlockEvent((block) => {
console.log('Successfully received the block event');
block_reg = channel_event_hub.registerBlockEvent((full_block) => {
console.log('Successfully received a block event');
<do something with the block>
const event_block = Long.fromValue(full_block.header.number);
if(event_block.equals(current_block)) {
console.log('Successfully got the last block number');
<application is now up to date>
}
}, (error)=> {
console.log('Failed to receive the block event ::'+error);
<do something with the error>
},
// for block listeners, the defaults for unregister and disconnect are true,
// so the they are not required to be set in the following example
// so they are not required to be set in the following example
{startBlock:23, endBlock:30, unregister: true, disconnect: true}
);
channel_event_hub.connect(true); //get full blocks
```

The following example will register with a start block number and an end block
set to 'newest'. The error callback will be called to notify the application
that the last block has been delivered and that the listener has been shutdown.

```
block_reg = channel_event_hub.registerBlockEvent((block) => {
console.log('Successfully received the block event');
<do something with the block>
}, (error)=> {
if(error.toString().indexOf('Newest block received')) {
console.log('Received latest block');
<application is now up to date>
} else {
console.log('Failed to receive the block event ::'+error);
<do something with the error>
}
},
{startBlock:23, endBlock:'newest'}
);
```

### Transaction listener
Expand Down Expand Up @@ -296,6 +335,7 @@ let event_monitor = new Promise((resolve, reject) => {
// notice that `unregister` is not specified, so it will default to true
// `disconnect` is also not specified and will default to false
);
channel_event_hub.connect();
});
let send_trans = channel.sendTransaction({proposalResponses: results[0], proposal: results[1]});
Expand Down Expand Up @@ -384,8 +424,6 @@ let event_monitor = new Promise((resolve, reject) => {
reject(error);
}
// no options specified
// startBlock will default to latest
// endBlock will default to MAX
// unregister will default to false
// disconnect will default to false
);
Expand Down
111 changes: 84 additions & 27 deletions fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const five_minutes_ms = 5 * 60 * 1000;
// notified of all transactions
const ALL = 'all';

// Special value for block numbers
const NEWEST = Long.fromValue(-1);

/**
* Transaction processing in fabric v1.1 is a long operation spanning multiple
Expand Down Expand Up @@ -138,12 +140,9 @@ const ChannelEventHub = class {
logger.debug('const ');
// this will hold the last block number received
this._last_block_seen = null;
// these will hold the block numbers to be used when this
// event hub connects to the remote peer's channel event sevice
this._starting_block_number = null;
this._ending_block_number = null;
this._ending_block_seen = false;
this._start_stop_registration = null;

this._setReplayDefaults();

// hashtable of clients registered for chaincode events
this._chaincodeRegistrants = {};
// set of clients registered for block events
Expand All @@ -155,8 +154,7 @@ const ChannelEventHub = class {
this._event_client = null;
// grpc chat streaming interface
this._stream = null;
//allow this hub to to registar new listeners
this._allowRegistration = true;

// fabric connection state of this ChannelEventHub
this._connected = false;
this._connect_running = false;
Expand Down Expand Up @@ -370,14 +368,17 @@ const ChannelEventHub = class {
}
}
else if (deliverResponse.Type === 'status') {
if (self._ending_block_seen) {
// this is normal after the last block comes in when we set
// an ending block
logger.debug('on.data - status received after last block seen: %s', deliverResponse.status);
if( deliverResponse.status === 'SUCCESS') {
if (self._ending_block_seen) {
// this is normal after the last block comes in when we set an ending block
logger.debug('on.data - status received after last block seen: %s block_num:', deliverResponse.status, self._last_block_seen);
} if (self._ending_block_newest) {
// this is normal after the last block comes in when we set to newest as an ending block
logger.debug('on.data - status received when newest block seen: %s block_num:', deliverResponse.status, self._last_block_seen);
self._disconnect(new Error(`Newest block received:${self._last_block_seen} status:${deliverResponse.status}`));
}
} else {
// only blocks should be received .... get status means we need to tell
// all registered users that something is wrong and the stream is will be close or
// has been closed
// tell all registered users that something is wrong and shutting down
logger.debug('on.data - status received - %s', deliverResponse.status);
self._disconnect(new Error(`Received status message on the event stream. status:${deliverResponse.status}`));
}
Expand Down Expand Up @@ -456,7 +457,8 @@ const ChannelEventHub = class {
this.disconnect();
}

/* Internal method
/*
* Internal method
* Disconnects the connection to the peer event source.
* Will close all event listeners and send an `Error` to
* all listeners that provided an "onError" callback.
Expand All @@ -467,9 +469,14 @@ const ChannelEventHub = class {
this._connect_running = false;
this._closeAllCallbacks(err);
this._shutdown();
this._setReplayDefaults();
logger.debug('_disconnect - end -- called due to:: %s, peer:%s', err.message, this.getPeerAddr());
}

/*
* Internal method
* Closes the grpc stream and service client
*/
_shutdown() {
if (this._stream) {
logger.debug('_shutdown - shutdown existing stream');
Expand All @@ -488,6 +495,8 @@ const ChannelEventHub = class {
* and sends it to the peer's event hub.
*/
_sendRegistration() {
// The behavior when a missing block is encountered
let behavior = _abProto.SeekInfo.SeekBehavior.BLOCK_UNTIL_READY;
// build start
const seekStart = new _abProto.SeekPosition();
if (this._starting_block_number) {
Expand All @@ -501,13 +510,21 @@ const ChannelEventHub = class {

// build stop
const seekStop = new _abProto.SeekPosition();
const seekSpecifiedStop = new _abProto.SeekSpecified();
if (this._ending_block_number) {
seekSpecifiedStop.setNumber(this._ending_block_number);
if(this._ending_block_newest) {
const seekNewest = new _abProto.SeekNewest();
seekStop.setNewest(seekNewest);
behavior = _abProto.SeekInfo.SeekBehavior.FAIL_IF_NOT_READY;
} else {
seekSpecifiedStop.setNumber(Long.MAX_VALUE);
const seekSpecifiedStop = new _abProto.SeekSpecified();
if (this._ending_block_number) {
seekSpecifiedStop.setNumber(this._ending_block_number);
// user should know the block does not exist
behavior = _abProto.SeekInfo.SeekBehavior.FAIL_IF_NOT_READY;
} else {
seekSpecifiedStop.setNumber(Long.MAX_VALUE);
}
seekStop.setSpecified(seekSpecifiedStop);
}
seekStop.setSpecified(seekSpecifiedStop);

// seek info with all parts
const seekInfo = new _abProto.SeekInfo();
Expand All @@ -516,7 +533,9 @@ const ChannelEventHub = class {
// BLOCK_UNTIL_READY will mean hold the stream open and keep sending as
// the blocks come in
// FAIL_IF_NOT_READY will mean if the block is not there throw an error
seekInfo.setBehavior(_abProto.SeekInfo.SeekBehavior.BLOCK_UNTIL_READY);
seekInfo.setBehavior(behavior);

// use the admin if available
const tx_id = this._clientContext.newTransactionID(true);
const signer = this._clientContext._getSigningIdentity(true);

Expand Down Expand Up @@ -603,10 +622,11 @@ const ChannelEventHub = class {
* Internal method
* checks the startBlock/endBlock options
* checks that only one registration when using startBlock/endBlock
* @returns true if the endBlock has been set otherwise false
* @returns enum of how the endBlock and startBlock have been set
*/
_checkReplay(options) {
logger.debug('_checkReplay - start');

let result = NO_START_STOP;
let have_start_block = false;
let have_end_block = false;
Expand All @@ -621,7 +641,14 @@ const ChannelEventHub = class {
}
if (options && typeof options.endBlock !== 'undefined') {
try {
converted_options.end_block = utils.convertToLong(options.endBlock);
let end_block = options.endBlock;
if(typeof end_block === 'string') {
if(end_block.toLowerCase() === 'newest') {
end_block = Long.MAX_VALUE;
this._ending_block_newest = true;
}
}
converted_options.end_block = utils.convertToLong(end_block);
have_end_block = true;
} catch (error) {
throw new Error('Problem with the endBlock parameter ::' + error);
Expand Down Expand Up @@ -752,8 +779,13 @@ const ChannelEventHub = class {
* Setting a startBlock may confuse other event listeners,
* therefore only one listener will be allowed on a ChannelEventHub
* when a startBlock is being used.
* @property {integer} endBlock - Optional - The ending block number
* for event checking. When included, the peer's channel event service
* @property {integer | 'newest'} endBlock - Optional - The ending block number
* for event checking. The value 'newest' to indicate that endBlock
* will be calculated by the peer as the newest block on the ledger.
* This allows the application to replay up to the latest block on
* the ledger and then the listener will stop and be notified by the
* 'onError' callback.
* When included, the peer's channel event service
* will be asked to stop sending blocks once this block is delivered.
* This is how to replay missed blocks that were added
* to the ledger. When a startBlock is not included, the endBlock
Expand Down Expand Up @@ -804,6 +836,9 @@ const ChannelEventHub = class {
* @param {function} onError - Optional callback function to be notified when this event hub
* is shutdown. The shutdown may be caused by a network error or by
* a call to the "disconnect()" method or a connection error.
* This callback will also be called when the event hub is shutdown
* due to the last block being received if replaying and requesting
* the endBlock to be 'newest'.
* @param {RegistrationOpts} options -
* @returns {Object} An object that should be treated as an opaque handle used
* to unregister (see {@link unregisterChaincodeEvent})
Expand Down Expand Up @@ -890,6 +925,9 @@ const ChannelEventHub = class {
* @param {function} onError - Optional callback function to be notified when this event hub
* is shutdown. The shutdown may be caused by a network error or by
* a call to the "disconnect()" method or a connection error.
* This callback will also be called when the event hub is shutdown
* due to the last block being received if replaying and requesting
* the endBlock to be 'newest'.
* @param {RegistrationOpts} options -
* @returns {int} This is the block registration number that must be
* sed to unregister (see unregisterBlockEvent)
Expand Down Expand Up @@ -971,6 +1009,9 @@ const ChannelEventHub = class {
* @param {function} onError - Optional callback function to be notified when this event hub
* is shutdown. The shutdown may be caused by a network error or by
* a call to the "disconnect()" method or a connection error.
* This callback will also be called when the event hub is shutdown
* due to the last block being received if replaying and requesting
* the endBlock to be 'newest'.
* @param {RegistrationOpts} options -
* @returns {string} The transaction ID that was used to register this event listener,
* will the same as the txid parameter and must be used to unregister
Expand Down Expand Up @@ -1220,7 +1261,7 @@ const ChannelEventHub = class {
_checkReplayEnd() {
if (this._ending_block_number) {
if (this._ending_block_number.lessThanOrEqual(this._last_block_seen)) {
//see if the listener wants to do anything else
this._ending_block_seen = true;
if (this._start_stop_registration) {
if (this._start_stop_registration.unregister) {
this._start_stop_registration.unregister_action();
Expand All @@ -1232,6 +1273,22 @@ const ChannelEventHub = class {
}
}
}

/*
* utility method to reset the replay state
*/
_setReplayDefaults() {
// these will hold the block numbers to be used when this
// event hub connects to the remote peer's channel event sevice
this._starting_block_number = null;
this._ending_block_number = null;
this._ending_block_seen = false;
this._ending_block_newest = false;
//allow this hub to to registar new listeners
this._allowRegistration = true;
this._start_stop_registration = null;
}

};
module.exports = ChannelEventHub;

Expand Down
Loading

0 comments on commit d5e5582

Please sign in to comment.