Skip to content

Commit

Permalink
NodeSDK update event and query to latest protopuf
Browse files Browse the repository at this point in the history
Update query test and end-to-end to handle latest
event status code change. Update end-to-end to monitor
all peers for events.

Change-Id: I6e0b39b7117fa4b8326a276bb0fafad75257ead7
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Feb 28, 2017
1 parent d3fcbe2 commit 999db30
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 57 deletions.
7 changes: 4 additions & 3 deletions fabric-client/lib/Chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,10 @@ var Chain = class {
if(response.response) {
logger.debug('queryTransaction - response status :: %d', response.response.status);
var processTrans = _transProto.ProcessedTransaction.decode(response.response.payload);
logger.debug('queryTransaction - ProcessedTransaction.valid :: %s', processTrans.valid);
var payload = _commonProto.Payload.decode(processTrans.transactionEnvelope.payload);
logger.debug('queryTransaction - transaction ID :: %s:', payload.header.channel_header.tx_id);
logger.debug('queryTransaction - ProcessedTransaction.validationCode :: %s', processTrans.validationCode);
// var payload = _commonProto.Payload.decode(processTrans.transactionEnvelope.payload);
// var channel_header = _commonProto.ChannelHeader.decode(payload.header.channel_header);
// logger.debug('queryTransaction - transaction ID :: %s:', channel_header.tx_id);
return Promise.resolve(processTrans);
}
// no idea what we have, lets fail it and send it back
Expand Down
31 changes: 21 additions & 10 deletions fabric-client/lib/EventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ var _responseProto = grpc.load(__dirname + '/protos/peer/proposal_response.proto
var _ccProposalProto = grpc.load(__dirname + '/protos/peer/proposal.proto').protos;
var _ccEventProto = grpc.load(__dirname + '/protos/peer/chaincodeevent.proto').protos;

var _validation_codes = {};
var keys = Object.keys(_transProto.TxValidationCode);
for(var i = 0;i<keys.length;i++) {
var new_key = _transProto.TxValidationCode[keys[i]];
_validation_codes[new_key] = keys[i];
}

/**
* The ChainCodeCBE is used internal to the EventHub to hold chaincode
* event registration callbacks.
Expand Down Expand Up @@ -289,9 +296,10 @@ var EventHub = class {
txCallback(block) {
logger.debug('txCallback block=%s', block.header.number);
var eh = this;
var invalidTxs = block.metadata.metadata[_common.BlockMetadataIndex.TRANSACTIONS_FILTER];
var txStatusCodes = block.metadata.metadata[_common.BlockMetadataIndex.TRANSACTIONS_FILTER];

for (var index=0; index < block.data.data.length; index++) {
logger.debug('txCallback - trans index=%s',index);
try {
var env = _common.Envelope.decode(block.data.data[index]);
var payload = _common.Payload.decode(env.payload);
Expand All @@ -301,17 +309,20 @@ var EventHub = class {
break;
}

var metaval = utils.getBufferBit(invalidTxs, index);
if ( metaval.error == true ) {
logger.error('Metadata invalid buffer too small for tx index');
break;
}
logger.debug('txid=' + channel_header.tx_id);
var val_code = convertValidationCode(txStatusCodes[index]);
logger.debug('txCallback - txid=%s val_code=%s',val_code, channel_header.tx_id);
var cb = eh.txRegistrants.get(channel_header.tx_id);
if (cb)
cb(payload.header.channel_header.tx_id, metaval.invalid);
if (cb){
logger.debug('txCallback - about to call the transaction call back for code=%s tx=%s', val_code, channel_header.tx_id);
cb(channel_header.tx_id, val_code);
}

}
};
};

module.exports = EventHub;
function convertValidationCode(code) {
return _validation_codes[code];
}

module.exports = EventHub;
23 changes: 22 additions & 1 deletion fabric-client/lib/protos/peer/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ message ProcessedTransaction {
common.Envelope transactionEnvelope = 1;

// An indication of whether the transaction was validated or invalidated by committing peer
bool valid = 2;
int32 validationCode = 2;
}

// The transaction to be sent to the ordering service. A transaction contains
Expand Down Expand Up @@ -119,3 +119,24 @@ message ChaincodeEndorsedAction {
// proposalResponsePayload
repeated Endorsement endorsements = 2;
}

enum TxValidationCode {
VALID = 0;
NIL_ENVELOPE = 1;
BAD_PAYLOAD = 2;
BAD_COMMON_HEADER = 3;
BAD_CREATOR_SIGNATURE = 4;
INVALID_ENDORSER_TRANSACTION = 5;
INVALID_CONFIG_TRANSACTION = 6;
UNSUPPORTED_TX_PAYLOAD = 7;
BAD_PROPOSAL_TXID = 8;
DUPLICATE_TXID = 9;
ENDORSEMENT_POLICY_FAILURE = 10;
MVCC_READ_CONFLICT = 11;
PHANTOM_READ_CONFLICT = 12;
UNKNOWN_TX_TYPE = 13;
TARGET_CHAIN_NOT_FOUND = 14;
MARSHAL_TX_ERROR = 15;
NIL_TXACTION = 16;
INVALID_OTHER_REASON = 255;
}
1 change: 1 addition & 0 deletions test/fixtures/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,6 @@ services:
- vp0
ports:
- 7056:7051
- 7058:7053
depends_on:
- orderer
117 changes: 82 additions & 35 deletions test/integration/end-to-end.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,30 @@ test('End-to-end flow of chaincode install, instantiate, transaction invocation,
client.setStateStore(store);
var promise = testUtil.getSubmitter(client, t);

// setup event hub to get notified when transactions are committed
var eh = new EventHub();
eh.setPeerAddr('grpc://localhost:7053');
eh.connect();
// setup event hub for peer0 to get notified when transactions are committed
var eh1 = new EventHub();
eh1.setPeerAddr('grpc://localhost:7053');
eh1.connect();

// setup event hub or peer1 to get notified when transactions are committed
var eh2 = new EventHub();
eh2.setPeerAddr('grpc://localhost:7058');
eh2.connect();

// override t.end function so it'll always disconnect the event hub
t.end = ((context, eventhub, f) => {
t.end = ((context, eventhubs, f) => {
return function() {
if (eventhub && eventhub.isconnected()) {
logger.info('Disconnecting the event hub');
eventhub.disconnect();
for(var key in eventhubs) {
var eventhub = eventhubs[key];
if (eventhub && eventhub.isconnected()) {
logger.info('Disconnecting the event hub');
eventhub.disconnect();
}
}

f.apply(context, arguments);
};
})(t, eh, t.end);
})(t, [eh1, eh2], t.end);

if (!useSteps || steps.indexOf('step1') >= 0) {
logger.info('Executing step1');
Expand Down Expand Up @@ -207,34 +215,53 @@ test('End-to-end flow of chaincode install, instantiate, transaction invocation,
// if the transaction did not get committed within the timeout period,
// fail the test
var deployId = tx_id.toString();
var txPromise = new Promise((resolve, reject) => {
var handle = setTimeout(reject, 30000);
var txPromise1 = new Promise((resolve, reject) => {
let handle = setTimeout(reject, 30000);

eh.registerTxEvent(deployId, (tx, invalid) => {
t.pass('The chaincode deploy transaction has been successfully committed');
eh1.registerTxEvent(deployId.toString(), (tx, code) => {
t.pass('The chaincode deploy transaction has been committed on this '+ peer0);
clearTimeout(handle);
eh.unregisterTxEvent(deployId);
eh1.unregisterTxEvent(deployId);

if (invalid) {
if (code !== 'VALID') {
t.pass('The chaincode deploy transaction was valid');
reject();
} else {
if (!useSteps) {
resolve();
} else if (steps.length === 1 && steps[0] === 'step2') {
t.end();
resolve();
}
t.pass('The chaincode deploy transaction was not valid code='+code);
resolve();
}
});
});
var txPromise2 = new Promise((resolve, reject) => {
let handle = setTimeout(reject, 30000);

eh2.registerTxEvent(deployId.toString(), (tx, code) => {
t.pass('The chaincode deploy transaction has been committed on this '+ peer1);
clearTimeout(handle);
eh2.unregisterTxEvent(deployId);

if (code !== 'VALID') {
reject();
} else {
resolve();
}
});
});

var sendPromise = chain.sendTransaction(request);
return Promise.all([sendPromise, txPromise]).then((results) => {
return Promise.all([sendPromise, txPromise1, txPromise2]).then((results) => {
if (!useSteps) {
logger.debug(' event promise all complete');
} else if (steps.length === 1 && steps[0] === 'step3') {
logger.debug(' event promise all complete and testing complete');
t.end();
}
return results[0]; // the first returned value is from the 'sendPromise' which is from the 'sendTransaction()' call
}).catch((err) => {
t.fail('Failed to send instantiate transaction and get notifications within the timeout period. ');
t.end();
});

} else {
t.fail('Failed to send instantiate Proposal or receive valid response. Response null or status is not 200. exiting...');
t.end();
Expand Down Expand Up @@ -265,6 +292,8 @@ test('End-to-end flow of chaincode install, instantiate, transaction invocation,
the_user = admin;
}
nonce = Buffer.from('12');//hard coded this so that we have a known transaction id that may be queried later
// nonce = utils.getNonce();

tx_id = chain.buildTransactionID(nonce, the_user);

// send proposal to endorser
Expand Down Expand Up @@ -306,30 +335,48 @@ test('End-to-end flow of chaincode install, instantiate, transaction invocation,
};

var txId = tx_id.toString();
var txPromise = new Promise((resolve, reject) => {
var handle = setTimeout(reject, 30000);
var txPromise1 = new Promise((resolve, reject) => {
let handle = setTimeout(reject, 30000);

eh.registerTxEvent(txId.toString(), (tx, invalid) => {
t.pass('The chaincode deploy transaction has been successfully committed');
eh1.registerTxEvent(txId.toString(), (tx, code) => {
t.pass('The chaincode invoke move transaction has been successfully committed on this '+ peer0);
clearTimeout(handle);
eh.unregisterTxEvent(txId);
eh1.unregisterTxEvent(txId);

if (invalid) {
if (code !== 'VALID') {
reject();
} else {
if (!useSteps) {
resolve();
} else if (steps.length === 1 && steps[0] === 'step3') {
t.end();
resolve();
}
resolve();
}
});
});
var txPromise2 = new Promise((resolve, reject) => {
let handle = setTimeout(reject, 30000);

eh2.registerTxEvent(txId.toString(), (tx, code) => {
t.pass('The chaincode invoke move transaction has been successfully committed on this '+ peer1);
clearTimeout(handle);
eh2.unregisterTxEvent(txId);

if (code !== 'VALID') {
t.pass('The chaincode invoke transaction was valid');
reject();
} else {
t.pass('The chaincode invoke transaction was not valid code='+code);
resolve();
}
});
});

var sendPromise = chain.sendTransaction(request);

return Promise.all([sendPromise, txPromise]).then((results) => {
return Promise.all([sendPromise, txPromise1, txPromise2]).then((results) => {
if (!useSteps) {
logger.debug(' event promise all complete');
} else if (steps.length === 1 && steps[0] === 'step3') {
logger.debug(' event promise all complete and testing complete');
t.end();
}
return results[0];
}).catch((err) => {
t.fail('Failed to send invoke transaction and get notifications within the timeout period. ');
Expand Down
36 changes: 28 additions & 8 deletions test/integration/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _test = require('tape-promise');
var test = _test(tape);

var log4js = require('log4js');
var logger = log4js.getLogger('E2E');
var logger = log4js.getLogger('QUERY TEST');
logger.setLevel('DEBUG');

var path = require('path');
Expand Down Expand Up @@ -97,8 +97,9 @@ test(' ---->>>>> Query chain working <<<<<-----', function(t) {
t.end();
}
).then(
function(response) {
t.equal(response.header.number.toString(),'0','checking query results are correct that we got zero block back');
function(block) {
logger.info(' Chain getBlock() returned block number=%s',block.header.number);
t.equal(block.header.number.toString(),'0','checking query results are correct that we got zero block back');
chain.setPrimaryPeer(peer0);
nonce = Buffer.from('12');//hard coded to be the same as end-to-end.js, this transaction will only exist if
// end-to-end runs first
Expand All @@ -111,8 +112,23 @@ test(' ---->>>>> Query chain working <<<<<-----', function(t) {
t.end();
}
).then(
function(transaction) {
t.pass('got back ProcessedTransaction that is was a valid transaction='+transaction.valid); // + JSON.stringify(response_payloads));
function(processed_transaction) {
// set to be able to decode grpc objects
var grpc = require('grpc');
var commonProto = grpc.load(__dirname + '/../../fabric-client/lib/protos/common/common.proto').common;
var transProto = grpc.load(__dirname + '/../../fabric-client/lib/protos/peer/transaction.proto').protos;
logger.info(' Chain queryTransaction() returned processed tranaction is valid='+processed_transaction.validationCode);
t.equals(transProto.TxValidationCode.VALID,processed_transaction.validationCode,'got back ProcessedTransaction that is a valid transaction');

try {
var payload = commonProto.Payload.decode(processed_transaction.transactionEnvelope.payload);
var channel_header = commonProto.ChannelHeader.decode(payload.header.channel_header);
logger.debug('queryTransaction - transaction ID :: %s:', channel_header.tx_id);
}
catch(err) {
logger.error(err);
}

chain.setPrimaryPeer(peer1);
// send query
return chain.queryInfo();
Expand All @@ -122,9 +138,12 @@ test(' ---->>>>> Query chain working <<<<<-----', function(t) {
t.end();
}
).then(
function(response) {
t.pass('got back blockchain info '); // + JSON.stringify(response_payloads[i]));
var block_hash = response.previousBlockHash;
function(blockchainInfo) {
t.pass('got back blockchain info ');
logger.info(' Chain queryInfo() returned block height='+blockchainInfo.height);
logger.info(' Chain queryInfo() returned block previousBlockHash='+blockchainInfo.previousBlockHash);
logger.info(' Chain queryInfo() returned block currentBlockHash='+blockchainInfo.currentBlockHash);
var block_hash = blockchainInfo.previousBlockHash;
chain.setPrimaryPeer(peer0);
// send query
return chain.queryBlockByHash(block_hash);
Expand All @@ -135,6 +154,7 @@ test(' ---->>>>> Query chain working <<<<<-----', function(t) {
}
).then(
function(block) {
logger.info(' Chain getBlockByHash() returned block number=%s',block.header.number);
t.pass('got back block number '+ block.header.number);
t.end();
},
Expand Down

0 comments on commit 999db30

Please sign in to comment.