Skip to content

Commit

Permalink
[FAB-6396] NodeSDK - eventHub not recover
Browse files Browse the repository at this point in the history
NodeSDK eventHub should be able to recover if
it has taken a error and the stream has been
shutdown. There was a case where on registration
the connection was not being restarted.

Change-Id: Id9b13bafc7b196155b8f05fa75f9d69df4064523
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Oct 3, 2017
1 parent d7953fb commit c84d3b3
Show file tree
Hide file tree
Showing 3 changed files with 364 additions and 7 deletions.
9 changes: 4 additions & 5 deletions fabric-client/lib/EventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ var EventHub = class {
// grpc chat streaming interface
this._stream = null;
// fabric connection state of this eventhub
this._connect_called = false;
this._connected = false;
this._connect_running = false;
// should this event hub reconnect on registrations
Expand Down Expand Up @@ -255,7 +254,7 @@ var EventHub = class {
* the connection to the peer event hub
*/
_connect(force) {
this._connect_called = true;
logger.debug('_connect - start - %s', new Date());
if(this._connect_running) {
logger.debug('_connect - connect is running');
return;
Expand Down Expand Up @@ -344,6 +343,7 @@ var EventHub = class {
logger.debug('on.error - event stream:%s _current_stream:%s',stream_id, self._current_stream);
if(stream_id != self._current_stream) {
logger.debug('on.error - incoming event was from a canceled stream');
logger.debug('on.error - %s %s',new Date(),err);
return;
}

Expand Down Expand Up @@ -387,7 +387,6 @@ var EventHub = class {
this._stream.end();
this._stream = null;
}
this._connect_called = false; //we can turn off since we closed out all callbacks
}

/*
Expand Down Expand Up @@ -477,7 +476,8 @@ var EventHub = class {
}
}

if(force_reconnect && this._connect_called) {
//reconnect will only happen when there is error callback
if(force_reconnect) {
try {
if(this._stream) {
var is_paused = this._stream.isPaused();
Expand All @@ -503,7 +503,6 @@ var EventHub = class {
logger.error('_checkConnection - error ::' + error.stack ? error.stack : error);
var err = new Error('Event hub is not connected ');
this._disconnect(err);
throw err;
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions test/integration/e2e/e2eUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ function instantiateChaincode(userOrg, chaincode_path, version, language, upgrad
t.pass('The chaincode ' + type + ' transaction was valid.');
resolve();
}
}, (err) => {
t.fail('The was a problem with the instantiate event '+err);
clearTimeout(handle);
eh.unregisterTxEvent(deployId);
});
});
logger.debug('register eventhub %s with tx=%s',eh.getPeerAddr(),deployId);
Expand Down Expand Up @@ -571,7 +575,8 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore){
{
pem: Buffer.from(data).toString(),
'ssl-target-name-override': ORGS[userOrg].peer1['server-hostname'],
'grpc.http2.keepalive_time' : 15
'grpc.keepalive_timeout_ms' : 3000, // time to respond to the ping, 3 seconds
'grpc.keepalive_time_ms' : 360000, // time to wait for ping response, 6 minutes
}
);
eh.connect();
Expand Down Expand Up @@ -610,7 +615,7 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore){
}
}
t.comment('*****************************************************************************');
t.comment('stop and start the peer event hub ---- N O W ----- you have ' + sleep_time + ' millis');
t.comment('stop and start the peer event hub ---- N O W ----- you have ' + sleep_time + ' millis ' + (new Date()).toString());
t.comment('*****************************************************************************');
return sleep(sleep_time);
}).then((nothing) => {
Expand Down
Loading

0 comments on commit c84d3b3

Please sign in to comment.