diff --git a/build/tasks/test.js b/build/tasks/test.js index cfad4f8648..1e5e1dc991 100644 --- a/build/tasks/test.js +++ b/build/tasks/test.js @@ -121,6 +121,8 @@ gulp.task('test', ['clean-up', 'lint', 'pre-test', 'compile', 'docker-ready', 'c '!test/unit/logger.js', // channel: mychannel, chaincode: end2endnodesdk:v0/v1 'test/integration/e2e.js', + // channel: mychannel, chaincode: e2enodecc:v0 + 'test/integration/nodechaincode/e2e.js', 'test/integration/query.js', 'test/integration/fabric-ca-affiliation-service-tests.js', 'test/integration/fabric-ca-identity-service-tests.js', diff --git a/fabric-client/lib/ChannelEventHub.js b/fabric-client/lib/ChannelEventHub.js index 07a8ec9a08..d3059fecc1 100644 --- a/fabric-client/lib/ChannelEventHub.js +++ b/fabric-client/lib/ChannelEventHub.js @@ -255,7 +255,7 @@ var ChannelEventHub = class { * provide suficient information. */ connect(full_block){ - logger.debug('connect - start'); + logger.debug('connect - start %s', this.getPeerAddr()); if (!this._clientContext._userContext && !this._clientContext._adminSigningIdentity) { throw new Error('The clientContext has not been properly initialized, missing userContext or admin identity'); } @@ -270,6 +270,7 @@ var ChannelEventHub = class { } this._connect(); + logger.debug('connect - end %s', this.getPeerAddr()); } /* @@ -327,9 +328,9 @@ var ChannelEventHub = class { clearTimeout(connecton_setup_timeout); } - logger.debug('on.data - event stream:%s _current_stream:%s',stream_id, self._current_stream); + logger.debug('on.data - event stream:%s _current_stream:%s peer:%s',stream_id, self._current_stream, self.getPeerAddr()); if(stream_id != self._current_stream) { - logger.debug('on.data - incoming event was from a canceled stream'); + logger.debug('on.data - incoming event was from a cancelled stream'); return; } @@ -368,17 +369,16 @@ var ChannelEventHub = class { } } else if(deliverResponse.Type === 'status') { - logger.debug('on.data - status received'); if(self._ending_block_seen) { // this is normal after the last block comes in when we set // an ending block - logger.debug('on.data - status received after last block seen'); + logger.debug('on.data - status received after last block seen: %s', deliverResponse.status); } else { // only blocks should be received .... get status means we need to tell // all registered users that something is wrong and the stream is will be close or // has been closed logger.debug('on.data - status received - %s',deliverResponse.status); - self._disconnect(new Error(util.format('Received status message on the event stream. status:%s',deliverResponse.status))); + self._disconnect(new Error(util.format('Received status message on the event stream. status:%s', deliverResponse.status))); } } else { @@ -388,13 +388,13 @@ var ChannelEventHub = class { }); this._stream.on('status', function (response) { - logger.debug('on status - status received: %j',response); + logger.debug('on status - status received: %j peer:%s', response, self.getPeerAddr()); }); this._stream.on('end', function() { self._connect_running = false; clearTimeout(connecton_setup_timeout); - logger.debug('on.end - event stream:%s _current_stream:%s',stream_id, self._current_stream); + logger.debug('on.end - event stream:%s _current_stream:%s peer:%s', stream_id, self._current_stream, self.getPeerAddr()); if(stream_id != self._current_stream) { logger.debug('on.end - incoming event was from a canceled stream'); return; @@ -408,7 +408,7 @@ var ChannelEventHub = class { this._stream.on('error', function(err) { self._connect_running = false; clearTimeout(connecton_setup_timeout); - logger.debug('on.error - event stream:%s _current_stream:%s',stream_id, self._current_stream); + logger.debug('on.error - event stream:%s _current_stream:%s peer:%s',stream_id, self._current_stream, self.getPeerAddr()); if(stream_id != self._current_stream) { logger.debug('on.error - incoming event was from a canceled stream'); logger.debug('on.error - %s %s',new Date(),err); @@ -445,11 +445,12 @@ var ChannelEventHub = class { * all listeners that provided an "onError" callback. */ _disconnect(err) { - logger.debug('_disconnect - start -- called due to:: %s',err.message); + logger.debug('_disconnect - start -- called due to:: %s, peer:%s',err.message, this.getPeerAddr()); this._connected = false; this._connect_running = false; this._closeAllCallbacks(err); this._shutdown(); + logger.debug('_disconnect - end -- called due to:: %s, peer:%s',err.message, this.getPeerAddr()); } _shutdown() { @@ -537,28 +538,29 @@ var ChannelEventHub = class { * Sends an error to all registered event "onError" callbacks */ _closeAllCallbacks(err) { - logger.debug('_closeAllCallbacks - start'); + let method = '_closeAllCallbacks -' + this.getPeerAddr(); + logger.debug('%s - start', method); - logger.debug('_closeAllCallbacks - blockOnErrors %s', Object.keys(this._blockRegistrations).length); + logger.debug('%s - blockOnErrors %s', method, Object.keys(this._blockRegistrations).length); for (let key in this._blockRegistrations) { let block_registration = this._blockRegistrations[key]; if(block_registration.onError) { - logger.debug('_closeAllCallbacks - calling block error callback for %s',key); + logger.debug('%s - calling block error callback for %s', method, key); block_registration.onError(err); } else { - logger.debug('_closeAllCallbacks - no block error callback to call for %s',key); + logger.debug('%s - no block error callback to call for %s', method, key); } } this._blockRegistrations = {}; - logger.debug('_closeAllCallbacks - transactionOnErrors %s', Object.keys(this._transactionRegistrations).length); + logger.debug('%s - transactionOnErrors %s', method, Object.keys(this._transactionRegistrations).length); for (let key in this._transactionRegistrations) { let trans_reg = this._transactionRegistrations[key]; if(trans_reg.onError) { - logger.debug('_closeAllCallbacks - calling transaction error callback for %s',key); + logger.debug('%s - calling transaction error callback for %s', method, key); trans_reg.onError(err); } else { - logger.debug('_closeAllCallbacks - no transaction error callback to call for %s',key); + logger.debug('%s - no transaction error callback to call for %s', method, key); } } this._transactionRegistrations = {}; @@ -567,16 +569,18 @@ var ChannelEventHub = class { let cc_closer = function(key) { let cbtable = self._chaincodeRegistrants[key]; cbtable.forEach(function(chaincode_reg) { - logger.debug('_closeAllCallbacks - closing this chaincode event ccid:%s eventNameFilter:%s',chaincode_reg.ccid, chaincode_reg.eventNameFilter); + logger.debug('%s - closing this chaincode event ccid:%s eventNameFilter:%s', method, chaincode_reg.ccid, chaincode_reg.eventNameFilter); if(chaincode_reg.event_reg.onError) { chaincode_reg.event_reg.onError(err); } }); }; - logger.debug('_closeAllCallbacks - chaincodeRegistrants %s', Object.keys(this._chaincodeRegistrants).length); + logger.debug('%s - chaincodeRegistrants %s', method, Object.keys(this._chaincodeRegistrants).length); Object.keys(this._chaincodeRegistrants).forEach(cc_closer); this._chaincodeRegistrants = {}; + + logger.debug('%s - end', method); } /* @@ -634,6 +638,7 @@ var ChannelEventHub = class { logger.debug('_checkReplay - Event listening will start at block %s', converted_options.start_block); } + logger.debug('_checkReplay - end'); return result; } @@ -668,6 +673,8 @@ var ChannelEventHub = class { } else { logger.debug('_checkConnection - connection has not been started'); } + + logger.debug('_checkConnection - end'); } /** diff --git a/test/integration/e2e/e2eUtils.js b/test/integration/e2e/e2eUtils.js index 16c799afc8..8796c79463 100644 --- a/test/integration/e2e/e2eUtils.js +++ b/test/integration/e2e/e2eUtils.js @@ -178,43 +178,30 @@ module.exports.installChaincode = installChaincode; function instantiateChaincode(userOrg, chaincode_path, version, language, upgrade, t){ init(); - var channel_name = Client.getConfigSetting('E2E_CONFIGTX_CHANNEL_NAME', testUtil.END2END.channel); + const channel_name = Client.getConfigSetting('E2E_CONFIGTX_CHANNEL_NAME', testUtil.END2END.channel); + + let targets = []; + let eventhubs = []; - var targets = [], - eventhubs = []; - var type = 'instantiate'; + let type = 'instantiate'; if(upgrade) type = 'upgrade'; - // override t.end function so it'll always disconnect the event hub - t.end = ((context, ehs, f) => { - return function() { - for(var key in ehs) { - var eventhub = ehs[key]; - if (eventhub && eventhub.isconnected()) { - logger.debug('Disconnecting the event hub'); - eventhub.disconnect(); - } - } - f.apply(context, arguments); - }; - })(t, eventhubs, t.end); + let client = new Client(); + let channel = client.newChannel(channel_name); - var client = new Client(); - var channel = client.newChannel(channel_name); - - var orgName = ORGS[userOrg].name; - var cryptoSuite = Client.newCryptoSuite(); + const orgName = ORGS[userOrg].name; + let cryptoSuite = Client.newCryptoSuite(); cryptoSuite.setCryptoKeyStore(Client.newCryptoKeyStore({path: testUtil.storePathForOrg(orgName)})); client.setCryptoSuite(cryptoSuite); - var caRootsPath = ORGS.orderer.tls_cacerts; + const caRootsPath = ORGS.orderer.tls_cacerts; let data = fs.readFileSync(path.join(__dirname, caRootsPath)); let caroots = Buffer.from(data).toString(); - targets = []; - var badTransientMap = { 'test1': 'transientValue' }; // have a different key than what the chaincode example_cc1.go expects in Init() - var transientMap = { 'test': 'transientValue' }; + const badTransientMap = { 'test1': 'transientValue' }; // have a different key than what the chaincode example_cc1.go expects in Init() + const transientMap = { 'test': 'transientValue' }; let tlsInfo = null; + let request = null; return e2eUtils.tlsEnroll(userOrg) .then((enrollment) => { @@ -260,24 +247,11 @@ function instantiateChaincode(userOrg, chaincode_path, version, language, upgrad targets.push(peer); channel.addPeer(peer); - } - } - // an event listener can only register with a peer in its own org - logger.debug(' create new eventhub %s', ORGS[userOrg]['peer1'].events); - let data = fs.readFileSync(path.join(__dirname, ORGS[userOrg]['peer1']['tls_cacerts'])); - let eh = client.newEventHub(); - eh.setPeerAddr( - ORGS[userOrg]['peer1'].events, - { - pem: Buffer.from(data).toString(), - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, - 'ssl-target-name-override': ORGS[userOrg]['peer1']['server-hostname'] + let eh = channel.newChannelEventHub(peer); + eventhubs.push(eh); } - ); - eh.connect(); - eventhubs.push(eh); + } // read the config block from the peer for the channel // and initialize the verify MSPs based on the participating @@ -359,97 +333,75 @@ function instantiateChaincode(userOrg, chaincode_path, version, language, upgrad }).then((results) => { - var proposalResponses = results[0]; + let proposalResponses = results[0]; - var proposal = results[1]; - var all_good = true; + let proposal = results[1]; + let all_good = true; for(var i in proposalResponses) { - let one_good = false; if (proposalResponses && proposalResponses[i].response && proposalResponses[i].response.status === 200) { - // special check only to test transient map support during chaincode upgrade - one_good = true; logger.info(type +' proposal was good'); } else { logger.error(type +' proposal was bad'); + all_good = false; } - all_good = all_good & one_good; } if (all_good) { t.pass('Successfully sent Proposal and received ProposalResponse'); logger.debug(util.format('Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s', proposalResponses[0].response.status, proposalResponses[0].response.message, proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature)); - var request = { + request = { proposalResponses: proposalResponses, proposal: proposal }; - - // set the transaction listener and set a timeout of 30sec - // if the transaction did not get committed within the timeout period, - // fail the test - var deployId = tx_id.getTransactionID(); - - var eventPromises = []; - eventhubs.forEach((eh) => { - let txPromise = new Promise((resolve, reject) => { - let handle = setTimeout(reject, 120000); - - eh.registerTxEvent(deployId.toString(), (tx, code) => { - t.pass('The chaincode ' + type + ' transaction has been committed on peer '+ eh.getPeerAddr()); - clearTimeout(handle); - eh.unregisterTxEvent(deployId); - - if (code !== 'VALID') { - t.fail('The chaincode ' + type + ' transaction was invalid, code = ' + code); - reject(); - } else { - t.pass('The chaincode ' + type + ' transaction was valid.'); - resolve(); - } - }, (err) => { - t.fail('There was a problem with the instantiate event '+err); - clearTimeout(handle); - eh.unregisterTxEvent(deployId); - }); - }); - logger.debug('register eventhub %s with tx=%s',eh.getPeerAddr(),deployId); - eventPromises.push(txPromise); - }); - - var sendPromise = channel.sendTransaction(request); - return Promise.all([sendPromise].concat(eventPromises)) - .then((results) => { - - logger.debug('Event promise all complete and testing complete'); - return results[0]; // just first results are from orderer, the rest are from the peer events - - }).catch((err) => { - - t.fail('Failed to send ' + type + ' transaction and get notifications within the timeout period.'); - throw new Error('Failed to send ' + type + ' transaction and get notifications within the timeout period.'); - - }); - } else { - t.fail('Failed to send ' + type + ' Proposal or receive valid response. Response null or status is not 200. exiting...'); - throw new Error('Failed to send ' + type + ' Proposal or receive valid response. Response null or status is not 200. exiting...'); + throw new Error('All proposals were not good'); } - }, (err) => { - t.fail('Failed to send ' + type + ' proposal due to error: ' + err.stack ? err.stack : err); - throw new Error('Failed to send ' + type + ' proposal due to error: ' + err.stack ? err.stack : err); + let deployId = tx_id.getTransactionID(); + let eventPromises = []; + eventPromises.push(channel.sendTransaction(request)); + + eventhubs.forEach((eh) => { + let txPromise = new Promise((resolve, reject) => { + let handle = setTimeout(() => { + t.fail('Timeout - Failed to receive the event for instantiate: waiting on '+ eh.getPeerAddr()); + eh.disconnect(); + reject('TIMEOUT waiting on '+ eh.getPeerAddr()); + }, 120000); + + eh.registerTxEvent(deployId.toString(), (tx, code) => { + t.pass('The chaincode ' + type + ' transaction has been committed on peer '+ eh.getPeerAddr()); + clearTimeout(handle); + if (code !== 'VALID') { + t.fail('The chaincode ' + type + ' transaction was invalid, code = ' + code); + reject(); + } else { + t.pass('The chaincode ' + type + ' transaction was valid.'); + resolve(); + } + }, (err) => { + t.fail('There was a problem with the instantiate event '+err); + clearTimeout(handle); + reject(); + }, { + disconnect: true + }); + eh.connect(); + }); + logger.debug('register eventhub %s with tx=%s',eh.getPeerAddr(),deployId); + eventPromises.push(txPromise); + }); - }).then((response) => { - //TODO should look into the event responses - if (!(response instanceof Error) && response.status === 'SUCCESS') { + return Promise.all(eventPromises); + }).then((results) => { + if (results && !(results[0] instanceof Error) && results[0].status === 'SUCCESS') { t.pass('Successfully sent ' + type + 'transaction to the orderer.'); return true; } else { - t.fail('Failed to order the ' + type + 'transaction. Error code: ' + response.status); - Promise.reject(new Error('Failed to order the ' + type + 'transaction. Error code: ' + response.status)); + t.fail('Failed to order the ' + type + 'transaction. Error code: ' + results[0].status); + Promise.reject(new Error('Failed to order the ' + type + 'transaction. Error code: ' + results[0].status)); } - }, (err) => { - - t.fail('Failed to send ' + type + ' due to error: ' + err.stack ? err.stack : err); - Promise.reject(new Error('Failed to send instantiate due to error: ' + err.stack ? err.stack : err)); + }).catch((err) => { + t.fail('Failed to instantiate ' + type + ' due to error: ' + err.stack ? err.stack : err); }); } @@ -512,25 +464,10 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore){ Client.setConfigSetting('request-timeout', 60000); var channel_name = Client.getConfigSetting('E2E_CONFIGTX_CHANNEL_NAME', testUtil.END2END.channel); - var targets = [], - eventhubs = []; + var targets = []; + let eventhubs = []; var pass_results = null; - // override t.end function so it'll always disconnect the event hub - t.end = ((context, ehs, f) => { - return function() { - for(var key in ehs) { - var eventhub = ehs[key]; - if (eventhub && eventhub.isconnected()) { - logger.debug('Disconnecting the event hub'); - eventhub.disconnect(); - } - } - - f.apply(context, arguments); - }; - })(t, eventhubs, t.end); - // this is a transaction, will just use org's identity to // submit the request. intentionally we are using a different org // than the one that instantiated the chaincode, although either org @@ -560,7 +497,6 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore){ promise = Promise.resolve(useStore); } - return e2eUtils.tlsEnroll(userOrg) .then((enrollment) => { t.pass('Successfully retrieved TLS certificate'); @@ -603,26 +539,10 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore){ } ); channel.addPeer(peer); + eventhubs.push(channel.newChannelEventHub(peer)); } } - // an event listener can only register with a peer in its own org - let data = fs.readFileSync(path.join(__dirname, ORGS[userOrg].peer1['tls_cacerts'])); - let eh = client.newEventHub(); - eh.setPeerAddr( - ORGS[userOrg].peer1.events, - { - pem: Buffer.from(data).toString(), - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, - 'ssl-target-name-override': ORGS[userOrg].peer1['server-hostname'], - 'grpc.keepalive_timeout_ms' : 3000, // time to respond to the ping, 3 seconds - 'grpc.keepalive_time_ms' : 360000, // time to wait for ping response, 6 minutes - } - ); - eh.connect(); - eventhubs.push(eh); - return channel.initialize(); }).then((nothing) => { @@ -718,29 +638,30 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore){ var eventPromises = []; eventhubs.forEach((eh) => { let txPromise = new Promise((resolve, reject) => { - let handle = setTimeout(reject, 120000); - - eh.registerTxEvent(deployId.toString(), - (tx, code) => { - clearTimeout(handle); - eh.unregisterTxEvent(deployId); - - if (code !== 'VALID') { - t.fail('The balance transfer transaction was invalid, code = ' + code); - reject(); - } else { - t.pass('The balance transfer transaction has been committed on peer '+ eh.getPeerAddr()); - resolve(); - } - }, - (err) => { - clearTimeout(handle); - t.pass('Successfully received notification of the event call back being cancelled for '+ deployId); + let handle = setTimeout(() => { + t.fail('Timeout - Failed to receive the event for commit: waiting on '+ eh.getPeerAddr()); + eh.disconnect(); // will not be using this event hub + reject('TIMEOUT waiting on '+ eh.getPeerAddr()); + }, 30000); + + eh.registerTxEvent(deployId.toString(), (tx, code) => { + clearTimeout(handle); + if (code !== 'VALID') { + t.fail('The balance transfer transaction was invalid, code = ' + code); + reject(); + } else { + t.pass('The balance transfer transaction has been committed on peer '+ eh.getPeerAddr()); resolve(); } - ); + }, (err) => { + clearTimeout(handle); + t.pass('Successfully received notification of the event call back being cancelled for '+ deployId); + resolve(); + },{ + disconnect: true // will not be using this event hub + }); + eh.connect(); }); - eventPromises.push(txPromise); }); diff --git a/test/integration/query.js b/test/integration/query.js index 0432cdb83e..059b4dc9fc 100644 --- a/test/integration/query.js +++ b/test/integration/query.js @@ -164,7 +164,7 @@ test(' ---->>>>> Query channel working <<<<<-----', (t) => { .payload.action.proposal_response_payload.extension.results.ns_rwset['0'] .rwset.writes['0'].key, 'test for write set key value'); - t.equals('2', processed_transaction.transactionEnvelope.payload.data.actions['0'] + t.equals('6', processed_transaction.transactionEnvelope.payload.data.actions['0'] .payload.action.proposal_response_payload.extension.results.ns_rwset['0'] .rwset.reads[1].version.block_num.toString(), 'test for read set block num');