Skip to content

Commit

Permalink
FAB-8751 NodeSDK - e2e use channel event hub
Browse files Browse the repository at this point in the history
Update the end to end testing to use the channel event hub.

Change-Id: Ib5f2d08ab707dd74003e57b7af20bb974182cec9
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Mar 23, 2018
1 parent e91be71 commit 54488fe
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 186 deletions.
2 changes: 2 additions & 0 deletions build/tasks/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ gulp.task('test', ['clean-up', 'lint', 'pre-test', 'compile', 'docker-ready', 'c
'!test/unit/logger.js',
// channel: mychannel, chaincode: end2endnodesdk:v0/v1
'test/integration/e2e.js',
// channel: mychannel, chaincode: e2enodecc:v0
'test/integration/nodechaincode/e2e.js',
'test/integration/query.js',
'test/integration/fabric-ca-affiliation-service-tests.js',
'test/integration/fabric-ca-identity-service-tests.js',
Expand Down
45 changes: 26 additions & 19 deletions fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ var ChannelEventHub = class {
* provide suficient information.
*/
connect(full_block){
logger.debug('connect - start');
logger.debug('connect - start %s', this.getPeerAddr());
if (!this._clientContext._userContext && !this._clientContext._adminSigningIdentity) {
throw new Error('The clientContext has not been properly initialized, missing userContext or admin identity');
}
Expand All @@ -270,6 +270,7 @@ var ChannelEventHub = class {
}

this._connect();
logger.debug('connect - end %s', this.getPeerAddr());
}

/*
Expand Down Expand Up @@ -327,9 +328,9 @@ var ChannelEventHub = class {
clearTimeout(connecton_setup_timeout);
}

logger.debug('on.data - event stream:%s _current_stream:%s',stream_id, self._current_stream);
logger.debug('on.data - event stream:%s _current_stream:%s peer:%s',stream_id, self._current_stream, self.getPeerAddr());
if(stream_id != self._current_stream) {
logger.debug('on.data - incoming event was from a canceled stream');
logger.debug('on.data - incoming event was from a cancelled stream');
return;
}

Expand Down Expand Up @@ -368,17 +369,16 @@ var ChannelEventHub = class {
}
}
else if(deliverResponse.Type === 'status') {
logger.debug('on.data - status received');
if(self._ending_block_seen) {
// this is normal after the last block comes in when we set
// an ending block
logger.debug('on.data - status received after last block seen');
logger.debug('on.data - status received after last block seen: %s', deliverResponse.status);
} else {
// only blocks should be received .... get status means we need to tell
// all registered users that something is wrong and the stream is will be close or
// has been closed
logger.debug('on.data - status received - %s',deliverResponse.status);
self._disconnect(new Error(util.format('Received status message on the event stream. status:%s',deliverResponse.status)));
self._disconnect(new Error(util.format('Received status message on the event stream. status:%s', deliverResponse.status)));
}
}
else {
Expand All @@ -388,13 +388,13 @@ var ChannelEventHub = class {
});

this._stream.on('status', function (response) {
logger.debug('on status - status received: %j',response);
logger.debug('on status - status received: %j peer:%s', response, self.getPeerAddr());
});

this._stream.on('end', function() {
self._connect_running = false;
clearTimeout(connecton_setup_timeout);
logger.debug('on.end - event stream:%s _current_stream:%s',stream_id, self._current_stream);
logger.debug('on.end - event stream:%s _current_stream:%s peer:%s', stream_id, self._current_stream, self.getPeerAddr());
if(stream_id != self._current_stream) {
logger.debug('on.end - incoming event was from a canceled stream');
return;
Expand All @@ -408,7 +408,7 @@ var ChannelEventHub = class {
this._stream.on('error', function(err) {
self._connect_running = false;
clearTimeout(connecton_setup_timeout);
logger.debug('on.error - event stream:%s _current_stream:%s',stream_id, self._current_stream);
logger.debug('on.error - event stream:%s _current_stream:%s peer:%s',stream_id, self._current_stream, self.getPeerAddr());
if(stream_id != self._current_stream) {
logger.debug('on.error - incoming event was from a canceled stream');
logger.debug('on.error - %s %s',new Date(),err);
Expand Down Expand Up @@ -445,11 +445,12 @@ var ChannelEventHub = class {
* all listeners that provided an "onError" callback.
*/
_disconnect(err) {
logger.debug('_disconnect - start -- called due to:: %s',err.message);
logger.debug('_disconnect - start -- called due to:: %s, peer:%s',err.message, this.getPeerAddr());
this._connected = false;
this._connect_running = false;
this._closeAllCallbacks(err);
this._shutdown();
logger.debug('_disconnect - end -- called due to:: %s, peer:%s',err.message, this.getPeerAddr());
}

_shutdown() {
Expand Down Expand Up @@ -537,28 +538,29 @@ var ChannelEventHub = class {
* Sends an error to all registered event "onError" callbacks
*/
_closeAllCallbacks(err) {
logger.debug('_closeAllCallbacks - start');
let method = '_closeAllCallbacks -' + this.getPeerAddr();
logger.debug('%s - start', method);

logger.debug('_closeAllCallbacks - blockOnErrors %s', Object.keys(this._blockRegistrations).length);
logger.debug('%s - blockOnErrors %s', method, Object.keys(this._blockRegistrations).length);
for (let key in this._blockRegistrations) {
let block_registration = this._blockRegistrations[key];
if(block_registration.onError) {
logger.debug('_closeAllCallbacks - calling block error callback for %s',key);
logger.debug('%s - calling block error callback for %s', method, key);
block_registration.onError(err);
} else {
logger.debug('_closeAllCallbacks - no block error callback to call for %s',key);
logger.debug('%s - no block error callback to call for %s', method, key);
}
}
this._blockRegistrations = {};

logger.debug('_closeAllCallbacks - transactionOnErrors %s', Object.keys(this._transactionRegistrations).length);
logger.debug('%s - transactionOnErrors %s', method, Object.keys(this._transactionRegistrations).length);
for (let key in this._transactionRegistrations) {
let trans_reg = this._transactionRegistrations[key];
if(trans_reg.onError) {
logger.debug('_closeAllCallbacks - calling transaction error callback for %s',key);
logger.debug('%s - calling transaction error callback for %s', method, key);
trans_reg.onError(err);
} else {
logger.debug('_closeAllCallbacks - no transaction error callback to call for %s',key);
logger.debug('%s - no transaction error callback to call for %s', method, key);
}
}
this._transactionRegistrations = {};
Expand All @@ -567,16 +569,18 @@ var ChannelEventHub = class {
let cc_closer = function(key) {
let cbtable = self._chaincodeRegistrants[key];
cbtable.forEach(function(chaincode_reg) {
logger.debug('_closeAllCallbacks - closing this chaincode event ccid:%s eventNameFilter:%s',chaincode_reg.ccid, chaincode_reg.eventNameFilter);
logger.debug('%s - closing this chaincode event ccid:%s eventNameFilter:%s', method, chaincode_reg.ccid, chaincode_reg.eventNameFilter);
if(chaincode_reg.event_reg.onError) {
chaincode_reg.event_reg.onError(err);
}
});
};

logger.debug('_closeAllCallbacks - chaincodeRegistrants %s', Object.keys(this._chaincodeRegistrants).length);
logger.debug('%s - chaincodeRegistrants %s', method, Object.keys(this._chaincodeRegistrants).length);
Object.keys(this._chaincodeRegistrants).forEach(cc_closer);
this._chaincodeRegistrants = {};

logger.debug('%s - end', method);
}

/*
Expand Down Expand Up @@ -634,6 +638,7 @@ var ChannelEventHub = class {
logger.debug('_checkReplay - Event listening will start at block %s', converted_options.start_block);
}

logger.debug('_checkReplay - end');
return result;
}

Expand Down Expand Up @@ -668,6 +673,8 @@ var ChannelEventHub = class {
} else {
logger.debug('_checkConnection - connection has not been started');
}

logger.debug('_checkConnection - end');
}

/**
Expand Down
Loading

0 comments on commit 54488fe

Please sign in to comment.