Skip to content

Commit

Permalink
FAB-8749 NodeSDK - all EventHub to use admin
Browse files Browse the repository at this point in the history
Allow the legacy EventHub to be able to use the
admin identity if it has been set on the client.
Allow the connect to be called after a registration.

Change-Id: I582e5e92b7656f24153e154bbb40070e91f200f8
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Mar 30, 2018
1 parent 4756e77 commit 4815f5e
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 106 deletions.
55 changes: 32 additions & 23 deletions fabric-client/lib/EventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ var EventHub = class {
if (typeof this._clientContext.getUserContext !== 'function')
throw new Error('Invalid clientContext argument: missing required function "getUserContext"');

if (typeof this._clientContext.getUserContext() === 'undefined' || this._clientContext.getUserContext() === null)
throw new Error('The clientContext has not been properly initialized, missing userContext');
if (typeof this._clientContext._getSigningIdentity(true) === 'undefined')
throw new Error('The clientContext has not been properly initialized, missing identity');

this._connect();
}
Expand All @@ -264,6 +264,12 @@ var EventHub = class {
}
if (!this._ep) throw Error('Must set peer address before connecting.');

// close out the old stream
if(this._stream) {
this._stream.end();
this._stream = null;
}

this._connect_running = true;
this._current_stream++;
var stream_id = this._current_stream;
Expand Down Expand Up @@ -369,7 +375,14 @@ var EventHub = class {
* all listeners that provided an "onError" callback.
*/
disconnect() {
this._disconnect(new Error('EventHub has been shutdown'));
let err = new Error('EventHub has been shutdown');
if(this._connected || this._connect_running) {
this._disconnect(err);
} else {
// close and report to all the listeners
this._closeAllCallbacks(err);
logger.debug('disconnect - EventHub is not connected');
}
}

/* Internal method
Expand Down Expand Up @@ -398,7 +411,7 @@ var EventHub = class {
* and sends it to the peer's event hub.
*/
_sendRegistration(register) {
var user = this._clientContext.getUserContext();
var identity = this._clientContext._getSigningIdentity(true);
var signedEvent = new _events.SignedEvent();
var event = new _events.Event();
var reg = {events: [{event_type: 'BLOCK'}]};
Expand All @@ -410,14 +423,14 @@ var EventHub = class {
event.setUnregister(reg);
}

event.setCreator(user.getIdentity().serialize());
event.setCreator(identity.serialize());
event.setTimestamp(clientUtils.buildCurrentTimestamp());
let client_cert_hash = this._ep.getClientCertHash();
if(client_cert_hash) {
event.setTlsCertHash(client_cert_hash);
}
signedEvent.setEventBytes(event.toBuffer());
var sig = user.getSigningIdentity().sign(event.toBuffer());
var sig = identity.sign(event.toBuffer());
signedEvent.setSignature(Buffer.from(sig));
this._stream.write(signedEvent);
}
Expand Down Expand Up @@ -473,8 +486,13 @@ var EventHub = class {
let state = 0;
if(this._stream) {
state = this._stream.call.channel_.getConnectivityState();
logger.debug('_checkConnection - grpc stream state :%s',state);
} else {
// when there is no stream, then wait for the user to do a 'connect'
return;
}
if(this._connected || this._connect_running) {

if(this._connected || this._connect_running || state == 2) {
logger.debug('_checkConnection - this hub %s is connected or trying to connect with stream channel state %s', this._ep.getUrl(), state);
}
else {
Expand All @@ -487,23 +505,14 @@ var EventHub = class {
//reconnect will only happen when there is error callback
if(force_reconnect) {
try {
if(this._stream) {
var is_paused = this._stream.isPaused();
logger.debug('_checkConnection - grpc isPaused :%s',is_paused);
if(is_paused) {
this._stream.resume();
logger.debug('_checkConnection - grpc resuming ');
}
let state = this._stream.call.channel_.getConnectivityState();
logger.debug('_checkConnection - grpc stream state :%s',state);
if(state != 2) {
// try to reconnect
this._connect(true);
}
}
else {
logger.debug('_checkConnection - stream was shutdown - will reconnected');
var is_paused = this._stream.isPaused();
logger.debug('_checkConnection - grpc isPaused :%s',is_paused);
if(is_paused) {
this._stream.resume();
logger.debug('_checkConnection - grpc resuming ');
} else if(state != 2) {
// try to reconnect
this._connected = false;
this._connect(true);
}
}
Expand Down
1 change: 1 addition & 0 deletions fabric-client/lib/impl/NetworkConfig_1_0.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ var NetworkConfig_1_0 = class {
if(peer_config && peer_config[EVENT_URL]) {
let opts = {};
opts.pem = getTLSCACert(peer_config);
this._client_context.addTlsClientCertAndKey(opts);
Object.assign(opts, peer_config[GRPC_CONNECTION_OPTIONS]);
this.addTimeout(opts, EVENTREG);
event_hub = new EventHub(this._client_context);
Expand Down
7 changes: 4 additions & 3 deletions test/fixtures/network-ad.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,27 @@ organizations:
path: test/fixtures/channel/crypto-config/peerOrganizations/org2.example.com/users/[email protected]/keystore/5a983ddcbefe52a7f9b8ee5b85a590c3e3a43c4ccd70c7795bec504e7f74848d_sk
signedCert:
path: test/fixtures/channel/crypto-config/peerOrganizations/org2.example.com/users/[email protected]/signcerts/[email protected]

orderers:
orderer.example.com:
url: grpcs://localhost:7050
grpcOptions:
ssl-target-name-override: orderer.example.com
grpc-max-send-message-length: 15

tlsCACerts:
path: test/fixtures/channel/crypto-config/ordererOrganizations/example.com/orderers/orderer.example.com/tlscacerts/example.com-cert.pem

peers:
peer0.org1.example.com:
url: grpcs://localhost:7051
eventUrl: grpcs://localhost:7053
grpcOptions:
ssl-target-name-override: peer0.org1.example.com
grpc.http2.keepalive_time: 15
tlsCACerts:
path: test/fixtures/channel/crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/tlscacerts/org1.example.com-cert.pem

peer0.org2.example.com:
url: grpcs://localhost:8051
eventUrl: grpcs://localhost:8053
grpcOptions:
ssl-target-name-override: peer0.org2.example.com
tlsCACerts:
Expand Down
84 changes: 75 additions & 9 deletions test/integration/only-admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,41 +166,60 @@ async function setupChannel(t, client_org1, client_org2, channel_name) {
let genesis_block = await channel_org1.getGenesisBlock(request);
t.pass('Successfully got the genesis block');

let promises = [];
let join_monitor = buildJoinEventMonitor(t, client_org1, channel_name, 'peer0.org1.example.com');
promises.push(join_monitor);

let join_tx_id = client_org1.newTransactionID(true);
request = {
targets: ['peer0.org1.example.com'],
block : genesis_block,
txId : join_tx_id
};
// join request to peer on org1 as admin of org1
let join_promise = channel_org1.joinChannel(request, 30000);
promises.push(join_promise);

let join_results = await Promise.all(promises);
logger.debug(util.format('Join Channel R E S P O N S E : %j', join_results));

// send join request to peer on org2 as admin of org2
let join_results = await channel_org1.joinChannel(request, 30000);
if(join_results && join_results[0] && join_results[0].response && join_results[0].response.status == 200) {
// lets check the results of sending to the peers which is
// last in the results array
let peer_results = join_results.pop();
if(peer_results && peer_results[0] && peer_results[0].response && peer_results[0].response.status == 200) {
t.pass('Successfully joined channnel on org1');
} else {
t.fail('Failed to join channel on org1');
throw new Error('Failed to join channel on org1');
}

promises = [];
join_monitor = buildJoinEventMonitor(t, client_org2, channel_name, 'peer0.org2.example.com');
promises.push(join_monitor);

join_tx_id = client_org2.newTransactionID(true);
request = {
targets: ['peer0.org2.example.com'],
block : genesis_block,
txId : join_tx_id
};
// join request to peer on org2 as admin of org2
join_promise = channel_org2.joinChannel(request, 30000);
promises.push(join_promise);

join_results = await Promise.all(promises);
logger.debug(util.format('Join Channel R E S P O N S E : %j', join_results));

// send join request to peer on org2 as admin of org2
join_results = await channel_org2.joinChannel(request, 30000);
if(join_results && join_results[0] && join_results[0].response && join_results[0].response.status == 200) {
// lets check the results of sending to the peers which is
// last in the results array
peer_results = join_results.pop();
if(peer_results && peer_results[0] && peer_results[0].response && peer_results[0].response.status == 200) {
t.pass('Successfully joined channnel on org2');
} else {
t.fail('Failed to join channel on org2');
throw new Error('Failed to join channel on org2');
}

await sleep(10000);
t.pass('Successfully waited for peers to join the channel');

/*
* I N S T A L L C H A I N C O D E
*/
Expand Down Expand Up @@ -292,6 +311,49 @@ async function setupChannel(t, client_org1, client_org2, channel_name) {
return channel_org1;
}


function buildJoinEventMonitor(t, client, channel_name, peer_name) {
let event_hub = client.getEventHub(peer_name);
let event_block_promise = new Promise((resolve, reject) => {
let registration_id = null;
let event_timeout = setTimeout(() => {
let message = 'REQUEST_TIMEOUT:' + event_hub._ep._endpoint.addr;
logger.error(message);
event_hub.disconnect();
reject(new Error(message));
}, 30000);
registration_id = event_hub.registerBlockEvent((block) => {
clearTimeout(event_timeout);
// A peer may have more than one channel, check that this block came
// is from the channel that is being joined.
// ... also this will be the first block channel, and the channel may
// have many more blocks
if (block.data.data.length === 1) {
var channel_header = block.data.data[0].payload.header.channel_header;
if (channel_header.channel_id === channel_name) {
let message = util.format('EventHub %s has reported a block update for channel %s',event_hub._ep._endpoint.addr,channel_name);
t.pass(message);
event_hub.unregisterBlockEvent(registration_id);
event_hub.disconnect();
t.pass(util.format('EventHub %s has been disconnected',event_hub._ep._endpoint.addr));
resolve(message);
} else {
t.pass('Keep waiting for the right block');
}
}
}, (err) => {
clearTimeout(event_timeout);
let message = 'Problem setting up the event hub :'+ err.toString();
t.fail(message);
event_hub.disconnect();
reject(new Error(message));
});
event_hub.connect();
});

return event_block_promise;
}

async function invoke(t, client, channel) {
let tx_id_string = null;
try {
Expand Down Expand Up @@ -453,6 +515,8 @@ async function queries(t, client, channel, tx_id_string) {
logger.error('catch network config test error:: %s', error.stack ? error.stack : error);
t.fail('Test failed with '+ error);
}

return true;
}

async function manually(t, client) {
Expand Down Expand Up @@ -511,6 +575,8 @@ async function manually(t, client) {
logger.error('catch network config test error:: %s', error.stack ? error.stack : error);
t.fail('Test failed with '+ error);
}

return true;
}

function sleep(ms) {
Expand Down
Loading

0 comments on commit 4815f5e

Please sign in to comment.