Skip to content

Commit

Permalink
FABN-1003 NodeSDK add connect callback
Browse files Browse the repository at this point in the history
Allow the ChannelEventHub connect call to have a callback
for connect and error. Allows the connect to be called
before registrations and get a callback if there is a
problem along with knowning that there is connection.

Change-Id: Ie00fae2f0b5760b5b52f540bcc0dc743e8712252
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Nov 12, 2018
1 parent 4348438 commit 1bc3892
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
25 changes: 24 additions & 1 deletion fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class ChannelEventHub {
// set of clients registered for block events
this._block_registrant_count = 0;
this._blockRegistrations = {};

this.connectCallback = null;

// registered transactional events
this._transactionRegistrations = {};
// grpc event client interface
Expand Down Expand Up @@ -279,8 +282,13 @@ class ChannelEventHub {
* @param {ConnectOptions | boolean} options - Optional. If of type boolean
* then it will be assumed to how to connect to receive full (true)
* or filtered (false) blocks.
* @param {functon} connectCallback - Optional. This callback will report
* completion of the connection to the peer or will report
* any errors encountered during connection to the peer. When there
* is an error, this ChannelEventHub will be shutdown (disconnected).
* Callback function should take two parameters as (error, value).
*/
connect(options) {
connect(options, connectCallback) {
let signedEvent = null;
let full_block = null;

Expand All @@ -296,6 +304,10 @@ class ChannelEventHub {
if (signedEvent) {
signedEvent = this._validateSignedEvent(signedEvent);
}
if (connectCallback) {
this.connectCallback = connectCallback;
}

logger.debug('connect - start peerAddr:%s', this.getPeerAddr());
if (!this._clientContext._userContext && !this._clientContext._adminSigningIdentity && !signedEvent) {
throw new Error('Error connect the ChannelEventhub to peer, either the clientContext has not been properly initialized, missing userContext or admin identity or missing signedEvent');
Expand Down Expand Up @@ -396,6 +408,10 @@ class ChannelEventHub {
} else {
logger.debug('on.data - first block received , this ChannelEventHub now registered');
self._connected = true;
if (this.connectCallback) {
this.connectCallback(null, this); // return this instance, user will be able check with isconnected()
this.connectCallback = null; // clean up so not called again
}
}
try {
let block = null;
Expand Down Expand Up @@ -525,6 +541,13 @@ class ChannelEventHub {
this._closeAllCallbacks(err);
this._shutdown();
this._setReplayDefaults();

// one last thing, report to the connect callback
if (this.connectCallback) {
this.connectCallback(err, this); // report and ourselves so user will know the source
this.connectCallback = null; // clean up
}

logger.debug('_disconnect - end -- called due to:: %s, peer:%s', err.message, this.getPeerAddr());
}

Expand Down
81 changes: 80 additions & 1 deletion test/integration/channel-event-hub.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,86 @@ test('***** Test channel events', async (t) => {
t.equal(event_hub.isconnected(), false, 'Successfully created new channel event hub for peer, isconnected check');
eventhubs.push(event_hub); // add to list so we can shutdown at end of test

// check that we can connect with callbacks
let connecter = new Promise((resolve, reject) => {
const handle = setTimeout(() => {
reject(new Error('timeout connecting to the event service'));
}, 15000);
event_hub.connect({full_block: false}, (error, connected_hub) => {
clearTimeout(handle);
if (error) {
reject(error);
} else {
if (connected_hub.isconnected()) {
t.pass('Successfully able to connect to the event service using a connect callback');
resolve();
} else {
reject(new Error('Event Hub notified us that it was connected however the connect status was false'));
}
}
});
});

try {
await connecter;
t.pass('Successfully checked for connect using a callback');
} catch (error) {
t.fail('Failed to connect to event service ::' + error.toString());
}

/*
* Test
* Creating a ChannelEventHub by name
* --- only works if the channel has this peer
*/
const event_hub_byname = channel.newChannelEventHub('localhost:7051');
t.equal(event_hub_byname.getName(), 'localhost:7051', 'Successfully created new channel event hub for peer, isName check');
t.equal(event_hub_byname.isconnected(), false, 'Successfully created new channel event hub for peer, isconnected check');

/*
* Test
* Connect failure - check error callback on connect
*/
const bad_peer = client.newPeer('grpcs://localhost:1111', {
pem: Buffer.from(data).toString(),
'ssl-target-name-override': 'peer0.org1.example.com'
});
const event_hub_fail = channel.newChannelEventHub(bad_peer);
let got_callback_error = false;
// check that we can connect with callbacks
connecter = new Promise((resolve, reject) => {
const handle = setTimeout(() => {
reject(new Error('timeout connecting to the event service'));
}, 15000);
event_hub_fail.connect({full_block: false}, (error, connected_hub) => {
clearTimeout(handle);
if (error) {
t.pass('Successfully got the connect error on the connect error callback');
got_callback_error = true;
reject(error);
} else {
if (connected_hub.isconnected()) {
t.fail('able to connect to the event service using a connect callback');
resolve();
} else {
t.fail('Connect callback called, however this hub is not connected');
reject(new Error('Event Hub notified us that it was connected however the connect status was false'));
}
}
});
});

try {
await connecter;
t.fail('Should have received the callback error');
} catch (error) {
if (got_callback_error) {
t.pass('Successfully got the expexted error from the event service callback testing::' + error.toString());
} else {
t.fail('FAILED to get the expexted error from the event service callback testing::' + error.toString());
}
}

/*
* Test
* Transaction registration using all defaults
Expand All @@ -143,7 +223,6 @@ test('***** Test channel events', async (t) => {
// send back error
reject(error);
});
event_hub.connect();
});

let send_trans = channel.sendTransaction({proposalResponses: results[0], proposal: results[1]});
Expand Down

0 comments on commit 1bc3892

Please sign in to comment.