Skip to content

Commit

Permalink
FAB-6400 Balance-transfer filtered events
Browse files Browse the repository at this point in the history
Update sample code to use the channel-based events.
The sample will also use the new connection profile
API to get a list of channel-based NodeSDK event hubs
using filtered blocks, automatic unregistration, and
automatic disconnect, all new features of channel-based
events. This will demostrate the most common use case
for events. The sample code will require the NodeSDK
to be at 1.1 alpha.

Change-Id: Id9f2b37f02d7d662b7ca1016586560ee4c595992
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Feb 27, 2018
1 parent 831e9bf commit ffd7a25
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 50 deletions.
44 changes: 20 additions & 24 deletions balance-transfer/app/instantiate-chaincode.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
logger.debug('\n\n============ Instantiate chaincode on channel ' + channelName +
' ============\n');
var error_message = null;
var eventhubs_in_use = [];

try {
// first setup the client for this org
var client = await helper.getClientForOrg(org_name, username);
Expand Down Expand Up @@ -84,27 +84,25 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
logger.info(util.format(
'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s',
proposalResponses[0].response.status, proposalResponses[0].response.message,
proposalResponses[0].response.payload, proposalResponses[0].endorsement
.signature));
proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature));

// tell each peer to join and wait for the event hub of each peer to tell us
// that the channel has been created on each peer
// wait for the channel-based event hub to tell us that the
// instantiate transaction was committed on the peer
var promises = [];
let event_hubs = client.getEventHubsForOrg(org_name);
let event_hubs = channel.getChannelEventHubsForOrg();
logger.debug('found %s eventhubs for this organization %s',event_hubs.length, org_name);
event_hubs.forEach((eh) => {
let instantiateEventPromise = new Promise((resolve, reject) => {
logger.debug('instantiateEventPromise - setting up event');
let event_timeout = setTimeout(() => {
let message = 'REQUEST_TIMEOUT:' + eh._ep._endpoint.addr;
let message = 'REQUEST_TIMEOUT:' + eh.getPeerAddr();
logger.error(message);
eh.disconnect();
reject(new Error(message));
}, 60000);
eh.registerTxEvent(deployId, (tx, code) => {
logger.info('The chaincode instantiate transaction has been committed on peer %s',eh._ep._endpoint.addr);
eh.registerTxEvent(deployId, (tx, code, block_num) => {
logger.info('The chaincode instantiate transaction has been committed on peer %s',eh.getPeerAddr());
logger.info('Transaction %s has status of %s in blocl %s', tx, code, block_num);
clearTimeout(event_timeout);
eh.unregisterTxEvent(deployId);

if (code !== 'VALID') {
let message = until.format('The chaincode instantiate transaction was invalid, code:%s',code);
Expand All @@ -117,15 +115,18 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
}
}, (err) => {
clearTimeout(event_timeout);
eh.unregisterTxEvent(deployId);
let message = 'Problem setting up the event hub :'+ err.toString();
logger.error(message);
reject(new Error(message));
});
logger.error(err);
reject(err);
},
// the default for 'unregister' is true for transaction listeners
// so no real need to set here, however for 'disconnect'
// the default is false as most event hubs are long running
// in this use case we are using it only once
{unregister: true, disconnect: true}
);
eh.connect();
});
promises.push(instantiateEventPromise);
eh.connect();
eventhubs_in_use.push(eh);
});

var orderer_request = {
Expand Down Expand Up @@ -155,7 +156,7 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
for(let i in results) {
let event_hub_result = results[i];
let event_hub = event_hubs[i];
logger.debug('Event results for event hub :%s',event_hub._ep._endpoint.addr);
logger.debug('Event results for event hub :%s',event_hub.getPeerAddr());
if(typeof event_hub_result === 'string') {
logger.debug(event_hub_result);
} else {
Expand All @@ -172,11 +173,6 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
error_message = error.toString();
}

// need to shutdown open event streams
eventhubs_in_use.forEach((eh) => {
eh.disconnect();
});

if (!error_message) {
let message = util.format(
'Successfully instantiate chaingcode in organization %s to the channel \'%s\'',
Expand Down
43 changes: 19 additions & 24 deletions balance-transfer/app/invoke-transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var logger = helper.getLogger('invoke-chaincode');
var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn, args, username, org_name) {
logger.debug(util.format('\n============ invoke transaction on channel %s ============\n', channelName));
var error_message = null;
var eventhubs_in_use = [];
var tx_id_string = null;
try {
// first setup the client for this org
Expand Down Expand Up @@ -78,26 +77,24 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
logger.info(util.format(
'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s',
proposalResponses[0].response.status, proposalResponses[0].response.message,
proposalResponses[0].response.payload, proposalResponses[0].endorsement
.signature));
proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature));

// tell each peer to join and wait for the event hub of each peer to tell us
// that the channel has been created on each peer
// wait for the channel-based event hub to tell us
// that the commit was good or bad on each peer in our organization
var promises = [];
let event_hubs = client.getEventHubsForOrg(org_name);
let event_hubs = channel.getChannelEventHubsForOrg();
event_hubs.forEach((eh) => {
logger.debug('invokeEventPromise - setting up event');
let invokeEventPromise = new Promise((resolve, reject) => {
let event_timeout = setTimeout(() => {
let message = 'REQUEST_TIMEOUT:' + eh._ep._endpoint.addr;
let message = 'REQUEST_TIMEOUT:' + eh.getPeerAddr();
logger.error(message);
eh.disconnect();
reject(new Error(message));
}, 3000);
eh.registerTxEvent(tx_id_string, (tx, code) => {
logger.info('The chaincode invoke chaincode transaction has been committed on peer %s',eh._ep._endpoint.addr);
eh.registerTxEvent(tx_id_string, (tx, code, block_num) => {
logger.info('The chaincode invoke chaincode transaction has been committed on peer %s',eh.getPeerAddr());
logger.info('Transaction %s has status of %s in blocl %s', tx, code, block_num);
clearTimeout(event_timeout);
eh.unregisterTxEvent(tx_id_string);

if (code !== 'VALID') {
let message = util.format('The invoke chaincode transaction was invalid, code:%s',code);
Expand All @@ -110,15 +107,18 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
}
}, (err) => {
clearTimeout(event_timeout);
eh.unregisterTxEvent(tx_id_string);
let message = 'Problem setting up the event hub :'+ err.toString();
logger.error(message);
reject(new Error(message));
});
logger.error(err);
reject(err);
},
// the default for 'unregister' is true for transaction listeners
// so no real need to set here, however for 'disconnect'
// the default is false as most event hubs are long running
// in this use case we are using it only once
{unregister: true, disconnect: true}
);
eh.connect();
});
promises.push(invokeEventPromise);
eh.connect();
eventhubs_in_use.push(eh);
});

var orderer_request = {
Expand All @@ -144,7 +144,7 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
for(let i in results) {
let event_hub_result = results[i];
let event_hub = event_hubs[i];
logger.debug('Event results for event hub :%s',event_hub._ep._endpoint.addr);
logger.debug('Event results for event hub :%s',event_hub.getPeerAddr());
if(typeof event_hub_result === 'string') {
logger.debug(event_hub_result);
} else {
Expand All @@ -161,11 +161,6 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
error_message = error.toString();
}

// need to shutdown open event streams
eventhubs_in_use.forEach((eh) => {
eh.disconnect();
});

if (!error_message) {
let message = util.format(
'Successfully invoked the chaincode %s to the channel \'%s\' for transaction ID: %s',
Expand Down
4 changes: 2 additions & 2 deletions balance-transfer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"v1.0 fabric nodesdk sample"
],
"engines": {
"node": ">=6.9.5 <7.0",
"npm": ">=3.10.10 <4.0"
"node": ">=8.9.4 <9.0",
"npm": ">=5.6.0 <6.0"
},
"license": "Apache-2.0",
"dependencies": {
Expand Down

0 comments on commit ffd7a25

Please sign in to comment.