Skip to content

Commit

Permalink
FABN-1207 NodeSDK chaincode event defaults
Browse files Browse the repository at this point in the history
Chaincode event default for unregister  would cause the
event listener to be disconnected after one event was
received when setting endBlock. Now for all event listeners
the default to unregister will be true when the endBlock is
received. Default for disconnect will be disconnect the
channelEventHub when the endBlock is seen.

Change-Id: If7ef15674411d52c15eaa7554dfd8baca945d1e3
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Apr 16, 2019
1 parent a5a57de commit 126175c
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 124 deletions.
23 changes: 16 additions & 7 deletions docs/tutorials/channel-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ therefore the registration must be made before the
channel event hub has setup the connection.
Replaying events may confuse other event listeners; therefore, only one listener
will be allowed on a `ChannelEventHub` when `startBlock`
and/or `endBlock` are used.
and/or `endBlock` are used on a listener registration.

- `Number` - A number value may be specified as the block number.

Expand Down Expand Up @@ -335,20 +335,29 @@ Using this option on an event listener does require that this
channel event hub has been previously running.

* {boolean} `unregister` -- (Optional) This setting indicates that the
registration should be removed (unregister) when the event is seen. When the
registration should be removed (unregister) after the event is seen. When the
application is using a timeout to only wait a specified amount of time for the
transaction to be seen, the timeout processing should include the manual
'unregister' of the transaction event listener to avoid the event callbacks
being called unexpectedly. The default for this setting is different for the
different types of event listeners. For block listeners the default is true when
an end_block was set as a option. For transaction listeners the default is true.
For chaincode listeners the default will be false as the match filter might be
intended for many transactions.
an end_block was set as a option, the listener will be active and receiving
blocks until the end block is received and then the listener will be automatically
unregistered. For transaction listeners the default is true and once the transaction
event has occurred the listener will be automatically unregistered. If the
transaction listener has used an endBlock, the default will be
to automatically unregister the listener even if the transaction has not been
seen.
For chaincode event listeners the default will be false as the match filter
might be intended for many transactions, however if the chaincode event
listener has set an endBlock it will be automatically unregistered after
the endBlock is seen.

* {boolean} `disconnect` -- (Optional) This setting indicates to the
`ChannelEventHub` instance to automatically disconnect itself from the peer's
channel-based event service once the event has been seen. The default is false
unless the endBlock has been set, then the default will be true.
channel-based event service once the event has been seen. The default is false.
When not set and the endBlock has been set the ChannelEventHub instance
will automatically disconnect itself.

### Get a Channel-based Event Hub
Use the fabric-client {@link Channel}
Expand Down
164 changes: 104 additions & 60 deletions fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class ChannelEventHub {
* only one event listener is allowed.
*/
_checkAllowRegistrations() {
if (this._start_stop_registration) {
if (this._start_stop_action) {
throw new Error('This ChannelEventHub is not open to event listener registrations');
}
}
Expand Down Expand Up @@ -938,7 +938,8 @@ class ChannelEventHub {
* @returns enum of how the endBlock and startBlock have been set
*/
_checkReplay(options, fromConnect) {
logger.debug('_checkReplay - start');
const method = '_checkReplay';
logger.debug('%s - start', method);

let result = NO_START_STOP;
let _start_block = null;
Expand All @@ -951,7 +952,7 @@ class ChannelEventHub {

if (_start_block || _end_block) {
if (fromConnect) {
if (this._start_stop_registration) {
if (this._start_stop_action) {
logger.error('This ChannelEventHub has a registered listener that has options of startBlock or endBlock');
throw new Error('Not able to connect with startBlock or endBlock when a registered listener has those options.');
}
Expand All @@ -974,23 +975,23 @@ class ChannelEventHub {
if (_end_block) {
this._ending_block_number = _end_block;
if (fromConnect) {
logger.debug('_checkReplay - connect will end receiving blocks from %s', this._ending_block_number);
logger.debug('%s - connect will end receiving blocks from %s', method, this._ending_block_number);
this._start_stop_connect = true;
}
result = END_ONLY;
logger.debug('_checkReplay - Event listening will end at block %s', this._ending_block_number);
logger.debug('%s - Event listening will end at block %s', method, this._ending_block_number);
}
if (_start_block) {
this._starting_block_number = _start_block;
if (fromConnect) {
logger.debug('_checkReplay - connect will start receiving blocks from %s', this._starting_block_number);
logger.debug('%s - connect will start receiving blocks from %s', method, this._starting_block_number);
this._start_stop_connect = true;
}
result++; // will move result to START_ONLY or START_AND_END
logger.debug('_checkReplay - Event listening will start at block %s', this._starting_block_number);
logger.debug('%s - Event listening will start at block %s', method, this._starting_block_number);
}

logger.debug('_checkReplay - end');
logger.debug('%s - end', method);
return result;
}

Expand Down Expand Up @@ -1127,11 +1128,13 @@ class ChannelEventHub {
* seen by this listener. For chaincode listeners the default will be
* false as the match filter might be intended for many transactions
* rather than a specific transaction or block as in the other listeners.
* If not set and the endBlock has been set, the listener will be
* automatically unregistered.
* @property {boolean} disconnect - Optional - This option setting Indicates
* to the ChannelEventHub instance to automatically disconnect itself
* from the peer's fabric service once the event has been seen.
* The default is false unless the endBlock has been set, then it
* it will be true.
* The default is false. If not set and the endBlock has been set, the
* the ChannelEventHub instance will automatically disconnect itself.
*/

/**
Expand Down Expand Up @@ -1177,26 +1180,25 @@ class ChannelEventHub {
}

this._checkAllowRegistrations();
let default_disconnect = false;
const startstop_mode = this._checkReplay(options);
if (startstop_mode > START_ONLY) {
default_disconnect = true;
}
const event_reg = new EventRegistration(onEvent, onError, options, false, default_disconnect);

const event_reg = new EventRegistration(onEvent, onError, options, false, false);

const chaincode_reg = new ChaincodeRegistration(ccid, eventname, event_reg);
const unregister_action = () => {
this.unregisterChaincodeEvent(chaincode_reg);
};
this._on_end_actions(chaincode_reg, unregister_action, startstop_mode, options);

let cbtable = this._chaincodeRegistrants[ccid];
if (!cbtable) {
cbtable = new Set();
this._chaincodeRegistrants[ccid] = cbtable;
}
cbtable.add(chaincode_reg);
if (startstop_mode > NO_START_STOP) {
this._start_stop_registration = chaincode_reg.event_reg;
chaincode_reg.event_reg.unregister_action = () => {
this.unregisterChaincodeEvent(chaincode_reg);
};
}



this._checkConnection();

return chaincode_reg;
Expand Down Expand Up @@ -1258,21 +1260,15 @@ class ChannelEventHub {
}

this._checkAllowRegistrations();
let default_disconnect = false;
const startstop_mode = this._checkReplay(options);
if (startstop_mode > START_ONLY) {
default_disconnect = true;
}

const block_registration_number = ++this._block_registrant_count;
const block_registration = new EventRegistration(onEvent, onError, options, true, default_disconnect);
const block_registration = new EventRegistration(onEvent, onError, options, false, false);
this._blockRegistrations[block_registration_number] = block_registration;
if (startstop_mode > NO_START_STOP) {
this._start_stop_registration = block_registration;
block_registration.unregister_action = () => {
this.unregisterBlockEvent(block_registration_number);
};
}
const unregister_action = () => {
this.unregisterBlockEvent(block_registration_number);
};
this._on_end_actions(block_registration, unregister_action, startstop_mode, options);

this._checkConnection();

return block_registration_number;
Expand Down Expand Up @@ -1339,6 +1335,7 @@ class ChannelEventHub {
}

this._checkAllowRegistrations();
const startstop_mode = this._checkReplay(options);

let default_unregister = true;
let _txid = txid;
Expand All @@ -1350,20 +1347,14 @@ class ChannelEventHub {
if (temp) {
throw new Error(`TransactionId (${txid}) has already been registered`);
}
let default_disconnect = false;
const startstop_mode = this._checkReplay(options);
if (startstop_mode > START_ONLY) {
default_disconnect = true;
}

const trans_registration = new EventRegistration(onEvent, onError, options, default_unregister, default_disconnect);
const trans_registration = new EventRegistration(onEvent, onError, options, default_unregister, false);
this._transactionRegistrations[_txid] = trans_registration;
if (startstop_mode > NO_START_STOP) {
this._start_stop_registration = trans_registration;
trans_registration.unregister_action = () => {
this.unregisterTxEvent(_txid);
};
}
const unregister_action = () => {
this.unregisterTxEvent(_txid);
};
this._on_end_actions(trans_registration, unregister_action, startstop_mode, options);

this._checkConnection();

return _txid;
Expand Down Expand Up @@ -1404,12 +1395,22 @@ class ChannelEventHub {
const block_reg = this._blockRegistrations[key];
logger.debug('_processBlockEvents - calling block listener callback');
block_reg.onEvent(block);

// check to see if we should automatically unregister or/and disconnect this hub
if (block_reg.unregister) {
this.unregisterBlockEvent(key);
logger.debug('_processBlockEvents - automatically unregister block listener for %s', key);
}
if (block_reg.disconnect) {
logger.debug('_processBlockEvents - automatically disconnect');
this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on block registration'));
}
});
}

/*
* private internal method for processing tx events
* @param {Object} block protobuf object which might contain the tx from the fabric
* @param {Object} block protobuf object which might contain transactions
*/
_processTxEvents(block) {
if (Object.keys(this._transactionRegistrations).length === 0) {
Expand Down Expand Up @@ -1459,7 +1460,10 @@ class ChannelEventHub {
_callTransactionListener(tx_id, val_code, block_num, trans_reg) {
logger.debug('_callTransactionListener - about to call the transaction call back for code=%s tx=%s', val_code, tx_id);
const status = convertValidationCode(val_code);

trans_reg.onEvent(tx_id, status, block_num);

// check to see if we should automatically unregister or/and disconnect this hub
if (trans_reg.unregister) {
this.unregisterTxEvent(tx_id);
logger.debug('_callTransactionListener - automatically unregister tx listener for %s', tx_id);
Expand Down Expand Up @@ -1555,12 +1559,14 @@ class ChannelEventHub {
delete chaincode_event.payload;
}
chaincode_reg.event_reg.onEvent(chaincode_event, block_num, tx_id, tx_status);

// see if we should automatically unregister this event listener or disconnect this hub
if (chaincode_reg.event_reg.unregister) {
cbtable.delete(chaincode_reg);
logger.debug('_callChaincodeListener - automatically unregister tx listener for %s', tx_id);
this.unregisterChaincodeEvent(chaincode_reg);
logger.debug('_callChaincodeListener - automatically unregister chaincode event listener for tx_id:%s', tx_id);
}
if (chaincode_reg.event_reg.disconnect) {
this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on transaction id registration'));
this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on chaincode event registration'));
}
} else {
logger.debug('_callChaincodeListener - NOT calling chaincode listener callback');
Expand All @@ -1576,11 +1582,11 @@ class ChannelEventHub {
if (this._ending_block_number) {
if (this._ending_block_number.lessThanOrEqual(this._last_block_seen)) {
this._ending_block_seen = true;
if (this._start_stop_registration) {
if (this._start_stop_registration.unregister) {
this._start_stop_registration.unregister_action();
if (this._start_stop_action) {
if (this._start_stop_action.unregister) {
this._start_stop_action.unregister();
}
if (this._start_stop_registration.disconnect) {
if (this._start_stop_action.disconnect) {
this._disconnect(new EventHubDisconnectError('Shutdown due to end block number has been seen'));
}
}
Expand All @@ -1593,14 +1599,52 @@ class ChannelEventHub {
*/
_setReplayDefaults() {
// these will hold the block numbers to be used when this
// event hub connects to the remote peer's channel event sevice
// event hub connects to the remote peer's channel event service
this._starting_block_number = null;
this._ending_block_number = null;
this._ending_block_seen = false;
this._ending_block_newest = false;
this._start_stop_registration = null;
this._start_stop_action = null;
this._start_stop_connect = false;
}


/*
* utility method to calculate if this listener should be removed
* and if the event hub should be disconnected
* if the end block has been seen
*/
_on_end_actions(event_reg, unregister_action, startstop_mode, options) {
if (startstop_mode > NO_START_STOP) {
this._start_stop_action = {};
this._start_stop_action.event_reg = event_reg; // might be useful
} else {
logger.debug('_on_end_actions - no end block action required');
return; // all done checking
}

let _end_register = true; // default if end block is seen
if (options && typeof options.unregister === 'boolean') {
_end_register = options.unregister;
}
if (_end_register && startstop_mode > START_ONLY) {
logger.debug('listener will be unregistered when end block is seen');
this._start_stop_action.unregister = unregister_action;
} else {
logger.debug('listener will not be unregistered when end block is seen');
}

let _end_disconnect = true; // default if end block is seen
if (options && typeof options.disconnect === 'boolean') {
_end_disconnect = options.disconnect;
}
if (_end_disconnect && startstop_mode > START_ONLY) {
logger.debug('event hub will be disconnected when end block is seen');
this._start_stop_action.disconnect = true;
} else {
logger.debug('event hub will not be disconnected when end block is seen');
}
}
}

module.exports = ChannelEventHub;
Expand Down Expand Up @@ -1675,21 +1719,21 @@ class EventRegistration {
this._onErrorFn = onError;
this.unregister = default_unregister;
this.disconnect = default_disconnect;
this.unregister_action = () => {
}; // do nothing by default

if (options) {
if (typeof options.unregister === 'undefined' || options.unregister === null) {
logger.debug('const-EventRegistration - unregister was not defined');
logger.debug('const-EventRegistration - unregister was not defined, using default of %s', default_unregister);
} else if (typeof options.unregister === 'boolean') {
this.unregister = options.unregister;
logger.debug('const-EventRegistration - unregister was defined, %s', this.unregister);
} else {
throw new Error('Event registration has invalid value for "unregister" option');
}
if (typeof options.disconnect === 'undefined' || options.disconnect === null) {
logger.debug('const-EventRegistration - disconnect was not defined');
logger.debug('const-EventRegistration - disconnect was not defined, using default of %s', default_disconnect);
} else if (typeof options.disconnect === 'boolean') {
this.disconnect = options.disconnect;
logger.debug('const-EventRegistration - disconnect was defined, %s', this.disconnect);
} else {
throw new Error('Event registration has invalid value for "disconnect" option');
}
Expand All @@ -1708,7 +1752,7 @@ class EventRegistration {
try {
this._onErrorFn(...args);
} catch (error) {
logger.warn('Error notifacation callback failed', error);
logger.warn('Error notification callback failed', error);
}
}
}
Loading

0 comments on commit 126175c

Please sign in to comment.