Skip to content

Commit

Permalink
FABN-916 NodeSDK Update GRPC level
Browse files Browse the repository at this point in the history
Update the GRPC level to current stable 1.14.2
Required some changes to determining the state
of the connection.

Change-Id: I7670f9ebd24c3386837a621314e12d7f14fd37fe
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Sep 12, 2018
1 parent 8a56714 commit 6366fac
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 70 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
coverage
docs/gen
node_modules/*
package-lock.json
fabric-client/node_modules/*
fabric-client/.nyc_output
fabric-ca-client/node_modules/*
Expand Down
80 changes: 29 additions & 51 deletions fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,6 @@ for (const key of keys) {
_header_types[new_key] = key;
}

// GRPC connection states
// as seen in grpc/include/grpc/impl/codegen/connectivity_state.h
const CONNECTION_STATE = {
0: 'IDLE',
1: 'CONNECTING',
2: 'READY',
3: 'TRANSIENT_FAILURE',
4: 'FATAL_FAILURE',
5: 'SHUTDOWN'
};

// internal use only
const NO_START_STOP = 0;
const START_ONLY = 1;
Expand Down Expand Up @@ -333,8 +322,7 @@ const ChannelEventHub = class {
return;
}

const state = getStreamState(self);
logger.debug('on.data - grpc stream state :%s', state);
logger.debug('on.data - grpc stream is ready :%s', isStreamReady(self));
if (deliverResponse.Type === 'block' || deliverResponse.Type === 'filtered_block') {
if (self._connected === true) {
logger.debug('on.data - new block received - check event registrations');
Expand Down Expand Up @@ -402,8 +390,7 @@ const ChannelEventHub = class {
return;
}

const state = getStreamState(self);
logger.debug('on.end - grpc stream state :%s', state);
logger.debug('on.end - grpc stream is ready :%s', isStreamReady(self));
self._disconnect(new Error('Peer event hub has disconnected due to an "end" event'));
});

Expand All @@ -417,8 +404,7 @@ const ChannelEventHub = class {
return;
}

const state = getStreamState(self);
logger.debug('on.error - grpc stream state :%s', state);
logger.debug('on.error - grpc stream is ready :%s', isStreamReady(self));
if (err instanceof Error) {
self._disconnect(err);
}
Expand Down Expand Up @@ -694,19 +680,19 @@ const ChannelEventHub = class {
}

/*
* internal method to check state of the connection and if
* internal method to check if the connection is ready and if
* not in the ready state disconnect (post an error to all registered)
* and throw and error to enform the caller
*/
_checkConnection() {
logger.debug('_checkConnection - start');
if (this._connected || this._connect_running) {
const state = getStreamState(this);
logger.debug('_checkConnection - %s with stream channel state %s', this._peer.getUrl(), getStateText(state));
const ready = isStreamReady(this);
logger.debug('_checkConnection - %s with stream channel ready %s', this._peer.getUrl(), ready);

if (state !== 2 && !this._connect_running) { //Not READY, but trying
logger.error('_checkConnection - connection is not in the ready state. state:', getStateText(state));
const error = new Error('Connection is not in the READY state');
if (!ready && !this._connect_running) { //Not READY, but trying
logger.error('_checkConnection - connection is not ready');
const error = new Error('Connection is not READY');
this._disconnect(error);
throw error;
}
Expand All @@ -718,15 +704,15 @@ const ChannelEventHub = class {
}

/**
* Returns the connection state. and will attempt a restart when forced
* Returns if the stream is ready. and will attempt a restart when forced
*
* @param {boolean} force_reconnect - attempt to reconnect if the state
* @param {boolean} force_reconnect - attempt to reconnect if the stream
* is not in the 'READY' state
*/
checkConnection(force_reconnect) {
logger.debug('checkConnection - start force_reconnect:%s', force_reconnect);
const state = getStreamState(this);
logger.debug('checkConnection - %s with stream channel state %s', this._peer.getUrl(), getStateText(state));
const ready = isStreamReady(this);
logger.debug('checkConnection - %s with stream channel ready %s', this._peer.getUrl(), ready);

if (force_reconnect) {
try {
Expand All @@ -736,7 +722,7 @@ const ChannelEventHub = class {
if (is_paused) {
this._stream.resume();
logger.debug('checkConnection - grpc resuming ');
} else if (state !== 2) {
} else if (!ready) {
// try to reconnect
this._connect_running = false;
this._connect(true);
Expand All @@ -756,7 +742,7 @@ const ChannelEventHub = class {
}
}

return getStateText(state);
return isStreamReady(this);
}

/**
Expand Down Expand Up @@ -1300,31 +1286,23 @@ function convertValidationCode(code) {
}

/*
* Utility method to get the state of the GRPC stream
* Utility method to check if the stream is ready.
* The stream must be readable, writeable and reading to be 'ready'
*/
function getStreamState(self) {
let state = -1;
if (self._stream && self._stream.call && self._stream.call.channel_) {
state = self._stream.call.channel_.getConnectivityState();
function isStreamReady(self) {
const method = 'isStreamReady';
let ready = false;
if (self._stream) {
const stream = self._stream;
ready = stream.readable && stream.writable && stream.reading;
logger.debug('%s - stream.readable %s :: %s', method, stream.readable, self.getPeerAddr());
logger.debug('%s - stream.writable %s :: %s', method, stream.writable, self.getPeerAddr());
logger.debug('%s - stream.reading %s :: %s', method, stream.reading, self.getPeerAddr());
logger.debug('%s - stream.read_status %s :: %s', method, stream.read_status, self.getPeerAddr());
logger.debug('%s - stream.received_status %s :: %s', method, stream.received_status, self.getPeerAddr());
}

return state;
}

/*
* Utility method to get the string state from an integer
*/
function getStateText(state) {
let result = null;
try {
result = CONNECTION_STATE[state];
} catch (error) {
logger.error('Connection state conversion - unknown state - %s', state);
}
if (!result) {
result = 'UNKNOWN_STATE';
}
return result;
return ready;
}

/*
Expand Down
2 changes: 1 addition & 1 deletion fabric-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"elliptic": "^6.2.3",
"fs": "0.0.2",
"fs-extra": "^6.0.1",
"grpc": ">=1.3.5 <1.11.0",
"grpc": "1.14.2",
"hoek": "^4.2.1",
"ignore-walk": "^3.0.0",
"js-sha3": "^0.7.0",
Expand Down
35 changes: 17 additions & 18 deletions test/unit/channel-event-hub.js
Original file line number Diff line number Diff line change
Expand Up @@ -871,14 +871,27 @@ test('\n\n** EventHub test reconnect on block registration \n\n', (t) => {
}
);

let state = event_hub.checkConnection();
t.equals(state, 'UNKNOWN_STATE', 'Check the state of the connection');
let ready = event_hub.checkConnection();
if(ready) {
t.fail('Connection should be not ready');
} else {
t.pass('Connection should be not ready');
}

// force the connections
// runs asynchronously, must be an error callback registered to get the
// failure will be reported to an error callback
state = event_hub.checkConnection(true);
t.equals(state, 'UNKNOWN_STATE', 'Check the state of the connection');
try {
ready = event_hub.checkConnection(true);
if(ready) {
t.fail('Connection should be not ready after a force');
} else {
t.pass('Connection should be not ready after a force');
}
} catch(error) {
t.fail('Connection ready test failed with %s', error);
}


return true;
}).then(() => {
Expand All @@ -890,17 +903,3 @@ test('\n\n** EventHub test reconnect on block registration \n\n', (t) => {
});

});

test('\n\n** Test the state conversion\n\n', (t) => {
const getStateText = RewiredChannelEventHub.__get__('getStateText');

t.equals(getStateText(0), 'IDLE', 'Checking that 0 state');
t.equals(getStateText(1), 'CONNECTING', 'Checking that 1 state');
t.equals(getStateText(2), 'READY', 'Checking that 2 state');
t.equals(getStateText(3), 'TRANSIENT_FAILURE', 'Checking that 3 state');
t.equals(getStateText(4), 'FATAL_FAILURE', 'Checking that 4 state');
t.equals(getStateText(5), 'SHUTDOWN', 'Checking that 5 state');
t.equals(getStateText(99), 'UNKNOWN_STATE', 'Checking that 99 state');

t.end();
});

0 comments on commit 6366fac

Please sign in to comment.