From 4815f5e1318d46d1fa6e33d94a164f8bbb3aaa59 Mon Sep 17 00:00:00 2001 From: Bret Harrison Date: Fri, 30 Mar 2018 12:53:32 -0400 Subject: [PATCH] FAB-8749 NodeSDK - all EventHub to use admin Allow the legacy EventHub to be able to use the admin identity if it has been set on the client. Allow the connect to be called after a registration. Change-Id: I582e5e92b7656f24153e154bbb40070e91f200f8 Signed-off-by: Bret Harrison --- fabric-client/lib/EventHub.js | 55 ++++++---- fabric-client/lib/impl/NetworkConfig_1_0.js | 1 + test/fixtures/network-ad.yaml | 7 +- test/integration/only-admin.js | 84 +++++++++++++-- test/unit/event-hub.js | 113 ++++++++------------ 5 files changed, 154 insertions(+), 106 deletions(-) diff --git a/fabric-client/lib/EventHub.js b/fabric-client/lib/EventHub.js index 0a176029f5..ea344b3111 100644 --- a/fabric-client/lib/EventHub.js +++ b/fabric-client/lib/EventHub.js @@ -240,8 +240,8 @@ var EventHub = class { if (typeof this._clientContext.getUserContext !== 'function') throw new Error('Invalid clientContext argument: missing required function "getUserContext"'); - if (typeof this._clientContext.getUserContext() === 'undefined' || this._clientContext.getUserContext() === null) - throw new Error('The clientContext has not been properly initialized, missing userContext'); + if (typeof this._clientContext._getSigningIdentity(true) === 'undefined') + throw new Error('The clientContext has not been properly initialized, missing identity'); this._connect(); } @@ -264,6 +264,12 @@ var EventHub = class { } if (!this._ep) throw Error('Must set peer address before connecting.'); + // close out the old stream + if(this._stream) { + this._stream.end(); + this._stream = null; + } + this._connect_running = true; this._current_stream++; var stream_id = this._current_stream; @@ -369,7 +375,14 @@ var EventHub = class { * all listeners that provided an "onError" callback. */ disconnect() { - this._disconnect(new Error('EventHub has been shutdown')); + let err = new Error('EventHub has been shutdown'); + if(this._connected || this._connect_running) { + this._disconnect(err); + } else { + // close and report to all the listeners + this._closeAllCallbacks(err); + logger.debug('disconnect - EventHub is not connected'); + } } /* Internal method @@ -398,7 +411,7 @@ var EventHub = class { * and sends it to the peer's event hub. */ _sendRegistration(register) { - var user = this._clientContext.getUserContext(); + var identity = this._clientContext._getSigningIdentity(true); var signedEvent = new _events.SignedEvent(); var event = new _events.Event(); var reg = {events: [{event_type: 'BLOCK'}]}; @@ -410,14 +423,14 @@ var EventHub = class { event.setUnregister(reg); } - event.setCreator(user.getIdentity().serialize()); + event.setCreator(identity.serialize()); event.setTimestamp(clientUtils.buildCurrentTimestamp()); let client_cert_hash = this._ep.getClientCertHash(); if(client_cert_hash) { event.setTlsCertHash(client_cert_hash); } signedEvent.setEventBytes(event.toBuffer()); - var sig = user.getSigningIdentity().sign(event.toBuffer()); + var sig = identity.sign(event.toBuffer()); signedEvent.setSignature(Buffer.from(sig)); this._stream.write(signedEvent); } @@ -473,8 +486,13 @@ var EventHub = class { let state = 0; if(this._stream) { state = this._stream.call.channel_.getConnectivityState(); + logger.debug('_checkConnection - grpc stream state :%s',state); + } else { + // when there is no stream, then wait for the user to do a 'connect' + return; } - if(this._connected || this._connect_running) { + + if(this._connected || this._connect_running || state == 2) { logger.debug('_checkConnection - this hub %s is connected or trying to connect with stream channel state %s', this._ep.getUrl(), state); } else { @@ -487,23 +505,14 @@ var EventHub = class { //reconnect will only happen when there is error callback if(force_reconnect) { try { - if(this._stream) { - var is_paused = this._stream.isPaused(); - logger.debug('_checkConnection - grpc isPaused :%s',is_paused); - if(is_paused) { - this._stream.resume(); - logger.debug('_checkConnection - grpc resuming '); - } - let state = this._stream.call.channel_.getConnectivityState(); - logger.debug('_checkConnection - grpc stream state :%s',state); - if(state != 2) { - // try to reconnect - this._connect(true); - } - } - else { - logger.debug('_checkConnection - stream was shutdown - will reconnected'); + var is_paused = this._stream.isPaused(); + logger.debug('_checkConnection - grpc isPaused :%s',is_paused); + if(is_paused) { + this._stream.resume(); + logger.debug('_checkConnection - grpc resuming '); + } else if(state != 2) { // try to reconnect + this._connected = false; this._connect(true); } } diff --git a/fabric-client/lib/impl/NetworkConfig_1_0.js b/fabric-client/lib/impl/NetworkConfig_1_0.js index a538e3b7cb..3c12c77f89 100644 --- a/fabric-client/lib/impl/NetworkConfig_1_0.js +++ b/fabric-client/lib/impl/NetworkConfig_1_0.js @@ -232,6 +232,7 @@ var NetworkConfig_1_0 = class { if(peer_config && peer_config[EVENT_URL]) { let opts = {}; opts.pem = getTLSCACert(peer_config); + this._client_context.addTlsClientCertAndKey(opts); Object.assign(opts, peer_config[GRPC_CONNECTION_OPTIONS]); this.addTimeout(opts, EVENTREG); event_hub = new EventHub(this._client_context); diff --git a/test/fixtures/network-ad.yaml b/test/fixtures/network-ad.yaml index 8d01a62015..f98fa5e583 100644 --- a/test/fixtures/network-ad.yaml +++ b/test/fixtures/network-ad.yaml @@ -40,26 +40,27 @@ organizations: path: test/fixtures/channel/crypto-config/peerOrganizations/org2.example.com/users/Admin@org2.example.com/keystore/5a983ddcbefe52a7f9b8ee5b85a590c3e3a43c4ccd70c7795bec504e7f74848d_sk signedCert: path: test/fixtures/channel/crypto-config/peerOrganizations/org2.example.com/users/Admin@org2.example.com/signcerts/Admin@org2.example.com-cert.pem + orderers: orderer.example.com: url: grpcs://localhost:7050 grpcOptions: ssl-target-name-override: orderer.example.com - grpc-max-send-message-length: 15 - tlsCACerts: path: test/fixtures/channel/crypto-config/ordererOrganizations/example.com/orderers/orderer.example.com/tlscacerts/example.com-cert.pem + peers: peer0.org1.example.com: url: grpcs://localhost:7051 + eventUrl: grpcs://localhost:7053 grpcOptions: ssl-target-name-override: peer0.org1.example.com - grpc.http2.keepalive_time: 15 tlsCACerts: path: test/fixtures/channel/crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/tlscacerts/org1.example.com-cert.pem peer0.org2.example.com: url: grpcs://localhost:8051 + eventUrl: grpcs://localhost:8053 grpcOptions: ssl-target-name-override: peer0.org2.example.com tlsCACerts: diff --git a/test/integration/only-admin.js b/test/integration/only-admin.js index bfb70a1663..80dbd80fb9 100644 --- a/test/integration/only-admin.js +++ b/test/integration/only-admin.js @@ -166,41 +166,60 @@ async function setupChannel(t, client_org1, client_org2, channel_name) { let genesis_block = await channel_org1.getGenesisBlock(request); t.pass('Successfully got the genesis block'); + let promises = []; + let join_monitor = buildJoinEventMonitor(t, client_org1, channel_name, 'peer0.org1.example.com'); + promises.push(join_monitor); + let join_tx_id = client_org1.newTransactionID(true); request = { targets: ['peer0.org1.example.com'], block : genesis_block, txId : join_tx_id }; + // join request to peer on org1 as admin of org1 + let join_promise = channel_org1.joinChannel(request, 30000); + promises.push(join_promise); + + let join_results = await Promise.all(promises); + logger.debug(util.format('Join Channel R E S P O N S E : %j', join_results)); - // send join request to peer on org2 as admin of org2 - let join_results = await channel_org1.joinChannel(request, 30000); - if(join_results && join_results[0] && join_results[0].response && join_results[0].response.status == 200) { + // lets check the results of sending to the peers which is + // last in the results array + let peer_results = join_results.pop(); + if(peer_results && peer_results[0] && peer_results[0].response && peer_results[0].response.status == 200) { t.pass('Successfully joined channnel on org1'); } else { t.fail('Failed to join channel on org1'); throw new Error('Failed to join channel on org1'); } + promises = []; + join_monitor = buildJoinEventMonitor(t, client_org2, channel_name, 'peer0.org2.example.com'); + promises.push(join_monitor); + join_tx_id = client_org2.newTransactionID(true); request = { targets: ['peer0.org2.example.com'], block : genesis_block, txId : join_tx_id }; + // join request to peer on org2 as admin of org2 + join_promise = channel_org2.joinChannel(request, 30000); + promises.push(join_promise); + + join_results = await Promise.all(promises); + logger.debug(util.format('Join Channel R E S P O N S E : %j', join_results)); - // send join request to peer on org2 as admin of org2 - join_results = await channel_org2.joinChannel(request, 30000); - if(join_results && join_results[0] && join_results[0].response && join_results[0].response.status == 200) { + // lets check the results of sending to the peers which is + // last in the results array + peer_results = join_results.pop(); + if(peer_results && peer_results[0] && peer_results[0].response && peer_results[0].response.status == 200) { t.pass('Successfully joined channnel on org2'); } else { t.fail('Failed to join channel on org2'); throw new Error('Failed to join channel on org2'); } - await sleep(10000); - t.pass('Successfully waited for peers to join the channel'); - /* * I N S T A L L C H A I N C O D E */ @@ -292,6 +311,49 @@ async function setupChannel(t, client_org1, client_org2, channel_name) { return channel_org1; } + +function buildJoinEventMonitor(t, client, channel_name, peer_name) { + let event_hub = client.getEventHub(peer_name); + let event_block_promise = new Promise((resolve, reject) => { + let registration_id = null; + let event_timeout = setTimeout(() => { + let message = 'REQUEST_TIMEOUT:' + event_hub._ep._endpoint.addr; + logger.error(message); + event_hub.disconnect(); + reject(new Error(message)); + }, 30000); + registration_id = event_hub.registerBlockEvent((block) => { + clearTimeout(event_timeout); + // A peer may have more than one channel, check that this block came + // is from the channel that is being joined. + // ... also this will be the first block channel, and the channel may + // have many more blocks + if (block.data.data.length === 1) { + var channel_header = block.data.data[0].payload.header.channel_header; + if (channel_header.channel_id === channel_name) { + let message = util.format('EventHub %s has reported a block update for channel %s',event_hub._ep._endpoint.addr,channel_name); + t.pass(message); + event_hub.unregisterBlockEvent(registration_id); + event_hub.disconnect(); + t.pass(util.format('EventHub %s has been disconnected',event_hub._ep._endpoint.addr)); + resolve(message); + } else { + t.pass('Keep waiting for the right block'); + } + } + }, (err) => { + clearTimeout(event_timeout); + let message = 'Problem setting up the event hub :'+ err.toString(); + t.fail(message); + event_hub.disconnect(); + reject(new Error(message)); + }); + event_hub.connect(); + }); + + return event_block_promise; +} + async function invoke(t, client, channel) { let tx_id_string = null; try { @@ -453,6 +515,8 @@ async function queries(t, client, channel, tx_id_string) { logger.error('catch network config test error:: %s', error.stack ? error.stack : error); t.fail('Test failed with '+ error); } + + return true; } async function manually(t, client) { @@ -511,6 +575,8 @@ async function manually(t, client) { logger.error('catch network config test error:: %s', error.stack ? error.stack : error); t.fail('Test failed with '+ error); } + + return true; } function sleep(ms) { diff --git a/test/unit/event-hub.js b/test/unit/event-hub.js index a083abb103..f94f56d3d9 100644 --- a/test/unit/event-hub.js +++ b/test/unit/event-hub.js @@ -53,23 +53,14 @@ test('\n\n** EventHub tests\n\n', (t) => { t.throws( () => { - eh = new EventHub({ getUserContext: function() {} }); + eh = new EventHub({ getUserContext: function() {}, _getSigningIdentity: function() {} }); eh.connect(); }, - /The clientContext has not been properly initialized, missing userContext/, + /The clientContext has not been properly initialized, missing identity/, 'Must pass in a clientContext that has the user context already initialized' ); - t.throws( - () => { - eh = new EventHub({ getUserContext: function() { return null; } }); - eh.connect(); - }, - /The clientContext has not been properly initialized, missing userContext/, - 'Must pass in a clientContext that has the user context already initialized' - ); - - eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh = new EventHub({ getUserContext: function() {}, _getSigningIdentity: function() { return 'dummyUser'; } }); t.throws( () => { eh.connect(); @@ -174,27 +165,6 @@ test('\n\n** EventHub tests\n\n', (t) => { /Missing "listener_handle" parameter/, 'Check the Missing "listener_handle" parameter' ); - t.throws( - () => { - eh.registerBlockEvent({}); - }, - /The event hub has not been connected to the event source/, - 'Check the event hub must be connected before the block event listener can be registered' - ); - t.throws( - () => { - eh.registerChaincodeEvent('ccid', 'eventname', {}); - }, - /The event hub has not been connected to the event source/, - 'Check the event hub must be connected before the chaincode event listener can be registered' - ); - t.throws( - () => { - eh.registerTxEvent('txid', {}); - }, - /The event hub has not been connected to the event source/, - 'Check the event hub must be connected before the tranaction event listener can be registered' - ); t.end(); }); @@ -566,24 +536,24 @@ test('\n\n** EventHub test actions when connect failures on transaction registra t.pass('Sleep complete'); // eventhub is now actually not connected - t.throws( + t.doesNotThrow( () => { - event_hub.registerTxEvent('123', (tx_id, code) => { + event_hub.registerTxEvent('123', + (tx_id, code) => { t.fail('Failed callback should not have been called - tx test 3'); + }, + (error) =>{ + if(error.toString().indexOf('Shutdown')) { + t.pass('Successfully got the error call back tx test 3 ::'+error); + } else { + t.failed('Failed to get shutdown error tx test 3 :: '+error); + } }); }, - /The event hub has not been connected to the event source/, - 'Check for The event hub has not been connected to the event source - tx test 3' + null, + 'Check for The event hub has been shutdown - tx test 3' ); - - // test 4 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9999'); - event_hub.connect(); - - let sleep_time = 3000; - t.comment('about to sleep '+sleep_time); - return sleep(sleep_time); + event_hub.disconnect(); }).then((nothing) => { t.pass('Sleep complete'); // eventhub is now actually not connected @@ -681,24 +651,24 @@ test('\n\n** EventHub test actions when connect failures on block registration \ t.pass('Sleep complete'); // eventhub is now actually not connected - t.throws( + t.doesNotThrow( () => { - event_hub.registerBlockEvent((tx_id, code) => { + event_hub.registerBlockEvent( + (tx_id, code) => { t.fail('Failed callback should not have been called - block test 3'); + }, + (error) =>{ + if(error.toString().indexOf('Shutdown')) { + t.pass('Successfully got the error call back block test 3 ::'+error); + } else { + t.failed('Failed to get Shutdown error block test 3 :: '+error); + } }); }, - /The event hub has not been connected to the event source/, - 'Check for The event hub has not been connected to the event source - block test 3' + null, + 'Check for The event hub disconnect - block test 3' ); - - // block test 4 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9997'); - event_hub.connect(); - - let sleep_time = 3000; - t.comment('about to sleep '+sleep_time); - return sleep(sleep_time); + event_hub.disconnect(); }).then((nothing) => { t.pass('Sleep complete'); // eventhub is now actually not connected @@ -796,24 +766,25 @@ test('\n\n** EventHub test actions when connect failures on chaincode registrati t.pass('Sleep complete'); // eventhub is now actually not connected - t.throws( + t.doesNotThrow( () => { - event_hub.registerChaincodeEvent('123', 'event', (tx_id, code) => { + event_hub.registerChaincodeEvent('123', 'event', + (tx_id, code) => { t.fail('Failed callback should not have been called - chaincode test 3'); + }, + (error) =>{ + if(error.toString().indexOf('Shutdown')) { + t.pass('Successfully got the error call back chaincode test 3 ::'+error); + } else { + t.failed('Failed to get Shutdown error chaincode test 3:: '+error); + } }); }, - /The event hub has not been connected to the event source/, - 'Check for The event hub has not been connected to the event source - chaincode test 3' + null, + 'Check for The event hub disconnect- chaincode test 4' ); + event_hub.disconnect(); - // chaincode test 4 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9998'); - event_hub.connect(); - - let sleep_time = 3000; - t.comment('about to sleep '+sleep_time); - return sleep(sleep_time); }).then((nothing) => { t.pass('Sleep complete'); // eventhub is now actually not connected