From 4506da9c06f71b5b57f8e0009dcf495eaf895875 Mon Sep 17 00:00:00 2001 From: Bret Harrison Date: Mon, 8 May 2017 14:08:29 -0400 Subject: [PATCH] [FAB-2843] NodeSDK - Handle network issues part1 The first phase of handling network issues will be for the event hub to report to the application that the event stream has been shutdown. The contract with the applications now includes an error callback similar to the Promise resolve and reject. Block decoding was enhanced so that the decoding of the event block could be shared by all three event types we provide, block, transaction, and chaincode. Enhanced the jsDoc of Block.js. Change-Id: I72082693e38bf4635b5f139f43205aabee15909d Signed-off-by: Bret Harrison --- build/tasks/doc.js | 2 +- .../lib/{Block.js => BlockDecoder.js} | 643 ++++++++++++++++-- fabric-client/lib/Chain.js | 8 +- fabric-client/lib/EventHub.js | 527 +++++++++----- test/integration/e2e/e2eUtils.js | 51 +- test/integration/e2e/join-channel.js | 12 +- test/integration/query.js | 1 + test/unit/block.js | 12 +- test/unit/event-hub.js | 297 +++++++- 9 files changed, 1289 insertions(+), 264 deletions(-) rename fabric-client/lib/{Block.js => BlockDecoder.js} (53%) diff --git a/build/tasks/doc.js b/build/tasks/doc.js index d7226b030d..278c3d9a25 100644 --- a/build/tasks/doc.js +++ b/build/tasks/doc.js @@ -30,7 +30,7 @@ gulp.task('doc', function () { 'fabric-client/lib/impl/bccsp_pkcs11.js', 'fabric-client/lib/impl/ecdsa/*', 'fabric-client/lib/impl/aes/*', - 'fabric-client/lib/Block.js', + 'fabric-client/lib/BlockDecoder.js', 'fabric-client/lib/msp/msp.js', 'fabric-client/lib/Chain.js', 'fabric-client/lib/Orderer.js', diff --git a/fabric-client/lib/Block.js b/fabric-client/lib/BlockDecoder.js similarity index 53% rename from fabric-client/lib/Block.js rename to fabric-client/lib/BlockDecoder.js index 39781bd3ba..fe65f50c3e 100644 --- a/fabric-client/lib/Block.js +++ b/fabric-client/lib/BlockDecoder.js @@ -20,9 +20,10 @@ var grpc = require('grpc'); var util = require('util'); var path = require('path'); var utils = require('./utils.js'); -var logger = utils.getLogger('Block.js'); +var logger = utils.getLogger('BlockDecoder.js'); var _ccProto = grpc.load(__dirname + '/protos/peer/chaincode.proto').protos; +var _ccEventProto = grpc.load(__dirname + '/protos/peer/chaincode_event.proto').protos; var _transProto = grpc.load(__dirname + '/protos/peer/transaction.proto').protos; var _proposalProto = grpc.load(__dirname + '/protos/peer/proposal.proto').protos; var _responseProto = grpc.load(__dirname + '/protos/peer/proposal_response.proto').protos; @@ -43,19 +44,378 @@ var _rwsetProto = grpc.load(path.join(__dirname, '/protos/ledger/rwset/rwset.pro var _kv_rwsetProto = grpc.load(path.join(__dirname, '/protos/ledger/rwset/kvrwset/kv_rwset.proto')).kvrwset; - /** * Utility class to convert a grpc protobuf encoded byte array into a pure JSON object representing * a hyperledger fabric `Block`. * @class */ -var Block = class { +var BlockDecoder = class { + /** + * The JSON representation of a GRPC message "Block". + *
A Block will contain the configuration of the channel or + * transactions on the channel. The description area below shows the + * properties of the "Block" object. When the property name does not have a + * data type next to it (e.g. -- {int}) then assume it is an object and + * the indented names below it are it's properties. Complex properties + * shown within the description area are listed under the "Properties" + * section (e.g. {Header}). Use the links under the "Type" column + * to see the description of that object type. + *

When assigning the results of a block + * query or block event to a variable called block for example, + * then to get the block number use + *
var block_num = block.header.number; + *
or to see the first transaction id in the block use + *
var tx_id = block.data.data[0].payload.header.channel_header.tx_id; + *

A "Block" will have the following JSON layout. +
+header
+   number -- {int}
+   previous_hash -- {byte}
+   data_hash -- {byte}
+data
+   data -- [array]
+      signature -- {byte}
+      payload
+         header -- {Header}
+         data -- {Config | Transaction}
+metadata
+   metadata -- [array] #each array item has it's own layout
+      [0] #SIGNATURES
+         signatures -- [array]
+            signature_header
+               creator
+                  Mspid -- {string}
+                  IdBytes -- {byte}
+               nonce -- {byte}
+            signature -- {byte}
+      [1] #LAST_CONFIG
+         value
+            index -- {number}
+         signatures -- [array]
+            signature_header
+               creator
+                  Mspid -- {string}
+                  IdBytes -- {byte}
+               nonce -- {byte}
+            signature -- {byte}
+      [2] #TRANSACTIONS_FILTER
+          [int] #see TxValidationCode in proto/peer/transaction.proto
+ 
+ * @typedef {Object} Block + * @property {Header} header The header of the block + * @property {Config | Transaction} data The data bytes will be based on the block.header.channel_header.type + * @see protos/common/common.proto + */ + + /** + * The JSON representation of a GRPC message "Header". + *

A "Header" will have the following JSON layout. +
+   channel_header
+      type -- {string}
+      version -- {int}
+      timestamp -- {time}
+      channel_id -- {string}
+      tx_id -- {string}
+      epoch -- {int}
+   signature_header
+      creator
+         Mspid -- {string}
+            IdBytes -- {byte}
+            nonce -- {string}
+
+ * @typedef {Object} Header + * @see protos/common/common.proto + */ + + /** + * The JSON representation of a GRPC message "Config". + *
The config object will contain the current channel configuration. + *

A "Config" will have the following JSON layout. +
+config
+   sequence -- {int}
+   channel_group -- {ConfigGroup}
+last_update
+   signature -- {byte}
+   payload
+      header -- {Header}
+      data -- {ConfigUpdate}
+
+ * @typedef {Object} Config + * @property {Header} header + * @property {ConfigGroup} channel_group + * @property {ConfigUpdate} data + * @see protos/common/common.proto + */ + + /** + * The JSON representation of a GRPC message "Transaction" + *

A "Transaction" will have the following JSON layout. +
+actions
+   chaincode_proposal_payload
+      input -- {byte}
+   action
+      proposal_response_payload
+         proposal_hash -- {byte}
+         extension
+            results
+               data_model -- {int}
+               ns_rwset -- [array]
+                  namespace -- {string}
+                  rwset
+                     reads -- [array]
+                        key -- {string}
+                        version
+                           block_num -- {number}
+                           tx_num -- {number}
+                     range_queries_info -- [array]
+                     writes -- [array]
+                        key -- {string}
+                        is_delete -- {boolean}
+                        value -- {string}
+            events
+               chaincode_id --  {string}
+               tx_id -- {string}
+               event_name -- {string}
+               payload -- {byte}
+            response
+               status -- {int}
+               message -- {string}
+               payload -- {byte}
+      endorsements -- [array]
+         endorser
+            Mspid -- {string]
+            IdBytes -- {string}
+         signature -- {byte}
+
+ * @typedef {Object} Transaction + * @see protos/peer/transaction.proto + */ + + /** + * The JSON representation of a GRPC message "ConfigUpdate". + *

A "ConfigUpdate" will have the following JSON layout. +
+   channel_id -- {string}
+   read_set -- {ConfigGroup}
+   write_set -- {ConfigGroup}
+   signatures -- [array]
+      signature_header
+         creator
+            Mspid -- {string}
+            Idbytes -- {byte}
+         signature -- {byte}
+
+ * @typedef {Object} ConfigUpdate + * @property {ConfigGroup} read_set A top level GRPC message "ConfigGroup" that + * represents the current version numbers of the all configuration items + * being updated + * @property {ConfigGroup} write_set A top level GRPC message "ConfigGroup that + * represents the all configuration items being updated. Must have a + * version number one greater than the version number of the same item + * in the read_set along with the new value. + * @see protos/common/configtx.proto + */ + + /** + * The JSON representation of a GRPC message "ConfigGroup". + *
The "ConfigGroup" described here is one that represents the + * whole configuration of the channel. A "ConfigGroup" object + * is used to describe the current configuration and it is also + * used to describe the updates to a configuration. + * Only those fields being updated will be present when used to describe + * a configuration update. + *

A channel level "ConfigGroup" will have the following JSON layout. +
+      version -- {int}
+      mod_policy -- {string}
+      groups
+         Orderer
+            version -- {int}
+            groups -- [array]
+               <orderer> -- {Organization}
+            values
+               ConsensusType
+                  version -- {int}
+                  mod_policy -- {string}
+                  value
+                     type -- {string}
+               BatchSize
+                  version -- {int}
+                  mod_policy -- {string}
+                  value
+                     maxMessageCount -- {int}
+                     absoluteMaxBytes -- {int}
+                     preferredMaxBytes -- {int}
+               BatchTimeout
+                  version -- {int}
+                  mod_policy -- {string}
+                  value
+                     timeout -- {duration}
+               ChannelRestrictions
+                  version -- {int}
+                  mod_policy -- {string}
+                  value
+                     max_count -- {int}
+            policies
+               Admins
+                  version -- {int}
+                  mod_policy -- {string}
+                  policy -- {ImplicitMetaPolicy}
+               Writers
+                  version -- {int}
+                  mod_policy -- {string}
+                  policy -- {ImplicitMetaPolicy}
+               Readers
+                  version -- {int}
+                  mod_policy -- {string}
+                  policy -- {ImplicitMetaPolicy}
+               BlockValidation
+                  version -- {int}
+                  mod_policy -- {string}
+                  policy -- {SignaturePolicy}
+         Application
+            version -- {int}
+            groups
+               <peer> -- {Organization}
+            values
+            policies
+               Admins -- {ImplicitMetaPolicy}
+                  version -- {int}
+                  mod_policy -- {string}
+                  policy -- {ImplicitMetaPolicy}
+               Writers -- {ImplicitMetaPolicy}
+                  version -- {int}
+                  mod_policy -- {string}
+                  policy -- {ImplicitMetaPolicy}
+               Readers -- {ImplicitMetaPolicy}
+                  version -- {int}
+                  mod_policy -- {string}
+                  policy -- {ImplicitMetaPolicy}
+      values
+         OrdererAddresses
+            version -- {int}
+            mod_policy -- {string}
+            value
+               addresses -- [array]
+                   {string - host:port}
+         HashingAlgorithm
+            version -- {int}
+            mod_policy -- {string}
+            value
+               name -- {string}
+         BlockDataHashingStructure
+            version -- {int}
+            mod_policy -- {string}
+            value
+               width -- {int}
+         Consortium
+            version -- {int}
+            mod_policy -- {string}
+            value
+               name -- {string}
+
+ * @typedef {Object} ConfigGroup + * @property {Organization} <orderer> These are the defined "Orderer"s on the network + * @property {Organization} <peer> These are the defined "Peer"s on the network + * @property {ImplicitMetaPolicy} policy These policies point to other policies + * @property {SignaturePolicy} policy + * @see protos/common/configtx.proto + */ + + /** + * The JSON representation of a GRPC message "ConfigGroup". + *
The "ConfigGroup" described here is one that a single + * Organization on the Channel. + *

A organizational "ConfigGroup" will have the following JSON layout. +
+version -- {int}
+mod_policy -- {string}
+groups
+values
+   MSP
+      version -- {int}
+      mod_policy -- {string}
+      value
+         type -- {int}
+         config
+            name -- {string}
+            root_certs -- [array]
+                {string}
+            intermediate_certs -- [array]
+                {string}
+            admins -- [array]
+                {string}
+            revocation_list -- [array]
+                {string}
+            signing_identity -- {byte}
+            organizational_unit_identifiers -- [array]
+                {string}
+policies
+   Admins
+      version -- {int}
+      mod_policy -- {string}
+      policy -- {SignaturePolicy}
+   Writers
+      version -- {int}
+      mod_policy -- {string}
+      policy -- {SignaturePolicy}
+   Readers
+      version -- {int}
+      mod_policy -- {string}
+      policy -- {SignaturePolicy}
+
+ * @typedef {Object} Organization + * @property {SignaturePolicy} policy These are the polices that have been pointed to by the implicit policies + * @see protos/common/configtx.proto + */ + + /** + * The JSON representation of a GRPC message "Policy" that are + * of type "ImplicitMetaPolicy". + *

A "ImplicitMetaPolicy" will have the following JSON layout. +
+type -- IMPLICIT_META
+policy
+   policy
+      sub_policy -- {string}
+      rule -- ANY | ALL | MAJORITY
+
+ * @typedef {Object} ImplicitMetaPolicy + * @see protos/common/policies + */ + + /** + * The JSON representation of a GRPC message "Policy" that are + * of type "SignaturePolicy". + *

A "SignaturePolicy" will have the following JSON layout. +
+type -- SIGNATURE
+policy
+   policy
+      Type -- n_out_of
+      n_out_of
+         N -- {int}
+         policies -- [array]
+            Type -- signed_by
+            signed_by -- {int}
+      identities -- [array]
+         principal_classification -- {int}
+         msp_identifier -- {string}
+         Role -- MEMBER | ADMIN
+
+ * @typedef {Object} SignaturePolicy + * @see protos/common/policies + */ + /** * Constructs a JSON object containing all decoded values from the * grpc encoded `Block` bytes * * @param {byte[]} block_bytes - The encode bytes of a hyperledger fabric message Block - * @returns {Object} The JSON representation of the Protobuf common.Block + * @returns {Block} The JSON representation of the Protobuf common.Block * @see /protos/common/common.proto */ static decode(block_bytes) { @@ -79,12 +439,43 @@ var Block = class { /** * Constructs a JSON object containing all decoded values from the - * grpc encoded `Transaction` bytes + * grpc encoded `Block` object * - * @param {byte[]} block_bytes - The encode bytes of a hyperledger fabric message Block - * @returns {Object} The JSON representation of the Protobuf common.Block + * @param {Object} block - a Protobuf common.Block object + * @returns {Block} The JSON representation of the Protobuf common.Block * @see /protos/common/common.proto */ + static decodeBlock(proto_block) { + if(!proto_block) { + throw new Error('Block input data is not a protobuf Block'); + } + var block = {}; + try { + block.header = { + number : proto_block.header.number, + previous_hash : proto_block.header.previous_hash.toString('hex'), + data_hash : proto_block.header.data_hash.toString('hex') + }; + block.data = decodeBlockData(proto_block.data, true); + block.metadata = decodeBlockMetaData(proto_block.metadata); + } + catch(error) { + logger.error('decode - ::' + error.stack ? error.stack : error); + throw error; + } + + return block; + }; + + /** + * Constructs a JSON object containing all decoded values from the + * grpc encoded `Transaction` bytes + * + * @param {byte[]} processed_transaction_bytes - The encode bytes of a hyperledger + * fabric message ProcessedTransaction + * @returns {Object} The JSON representation of the Protobuf transaction.ProcessedTransaction + * @see /protos/peer/transaction.proto + */ static decodeTransaction(processed_transaction_bytes) { if(!(processed_transaction_bytes instanceof Buffer)) { throw new Error('Proccesed transaction data is not a byte buffer'); @@ -107,11 +498,17 @@ function decodeBlockHeader(proto_block_header) { return block_header; }; -function decodeBlockData(proto_block_data) { +function decodeBlockData(proto_block_data, not_proto) { var data = {}; data.data = []; for(var i in proto_block_data.data) { - var proto_envelope = _commonProto.Envelope.decode(proto_block_data.data[i].toBuffer()); + var proto_envelope = null; + if(not_proto) { + proto_envelope = _commonProto.Envelope.decode(proto_block_data.data[i]); + } + else { + proto_envelope = _commonProto.Envelope.decode(proto_block_data.data[i].toBuffer()); + } var envelope = decodeBlockDataEnvelope(proto_envelope); data.data.push(envelope); } @@ -122,14 +519,63 @@ function decodeBlockData(proto_block_data) { function decodeBlockMetaData(proto_block_metadata) { var metadata = {}; metadata.metadata = []; - for(var i in proto_block_metadata.metadata) { - let proto_block_metadata_metadata = proto_block_metadata.metadata[i]; - metadata.metadata.push(proto_block_metadata_metadata.toBuffer()); + if(proto_block_metadata && proto_block_metadata.metadata) { + var signatures = decodeMetadataSignatures(proto_block_metadata.metadata[0]); + metadata.metadata.push(signatures); + + var last_config = decodeLastConfigSequenceNumber(proto_block_metadata.metadata[1]); + metadata.metadata.push(last_config); + + var transaction_filter = decodeTransactionFilter(proto_block_metadata.metadata[2]); + metadata.metadata.push(transaction_filter); } return metadata; }; +function decodeTransactionFilter(metadata_bytes) { + var transaction_filter = []; + for(let i=0; i Config item ::%s', proto_config_value.key); - config_value.version = proto_config_value.value.getVersion(); + config_value.version = decodeVersion(proto_config_value.value.getVersion()); config_value.mod_policy = proto_config_value.value.getModPolicy(); config_value.value = {}; switch(proto_config_value.key) { @@ -353,24 +799,26 @@ function decodeConfigPolicies(config_policy_map) { var Policy_PolicyType = [ 'UNKNOWN','SIGNATURE','MSP','IMPLICIT_META']; function decodeConfigPolicy(proto_config_policy) { var config_policy = {}; - config_policy.version = proto_config_policy.value.getVersion(); + config_policy.version = decodeVersion(proto_config_policy.value.getVersion()); config_policy.mod_policy = proto_config_policy.value.getModPolicy(); config_policy.policy = {}; - config_policy.policy.type = Policy_PolicyType[proto_config_policy.value.policy.type]; - logger.debug('decodeConfigPolicy ======> Policy item ::%s', proto_config_policy.key); - switch(proto_config_policy.value.policy.type) { - case _policiesProto.Policy.PolicyType.SIGNATURE: - config_policy.policy.policy = decodeSignaturePolicyEnvelope(proto_config_policy.value.policy.policy); - break; - case _policiesProto.Policy.PolicyType.MSP: - var proto_msp = _policiesProto.Policy.decode(proto_config_policy.value.policy.policy); - logger.warn('decodeConfigPolicy - found a PolicyType of MSP. This policy type has not been implemented yet.'); - break; - case _policiesProto.Policy.PolicyType.IMPLICIT_META: - config_policy.policy.policy = decodeImplicitMetaPolicy(proto_config_policy.value.policy.policy); - break; - default: - throw new Error('Unknown Policy type'); + if(proto_config_policy.value.policy) { + config_policy.policy.type = Policy_PolicyType[proto_config_policy.value.policy.type]; + logger.debug('decodeConfigPolicy ======> Policy item ::%s', proto_config_policy.key); + switch(proto_config_policy.value.policy.type) { + case _policiesProto.Policy.PolicyType.SIGNATURE: + config_policy.policy.policy = decodeSignaturePolicyEnvelope(proto_config_policy.value.policy.policy); + break; + case _policiesProto.Policy.PolicyType.MSP: + var proto_msp = _policiesProto.Policy.decode(proto_config_policy.value.policy.policy); + logger.warn('decodeConfigPolicy - found a PolicyType of MSP. This policy type has not been implemented yet.'); + break; + case _policiesProto.Policy.PolicyType.IMPLICIT_META: + config_policy.policy.policy = decodeImplicitMetaPolicy(proto_config_policy.value.policy.policy); + break; + default: + throw new Error('Unknown Policy type'); + } } return config_policy; @@ -388,7 +836,7 @@ function decodeImplicitMetaPolicy(implicit_meta_policy_bytes) { function decodeSignaturePolicyEnvelope(signature_policy_envelope_bytes) { var signature_policy_envelope = {}; var porto_signature_policy_envelope = _policiesProto.SignaturePolicyEnvelope.decode(signature_policy_envelope_bytes); - signature_policy_envelope.version = porto_signature_policy_envelope.getVersion(); + signature_policy_envelope.version = decodeVersion(porto_signature_policy_envelope.getVersion()); signature_policy_envelope.policy = decodeSignaturePolicy(porto_signature_policy_envelope.getPolicy()); var identities = []; var proto_identities = porto_signature_policy_envelope.getIdentities(); @@ -443,7 +891,7 @@ function decodeMSPPrincipal(proto_msp_principal) { proto_principal = _mspPrProto.OrganizationUnit.decode(proto_msp_principal.getPrincipal()); msp_principal.msp_identifier = proto_principal.getMspIdendifier();//string msp_principal.organizational_unit_identifier = proto_principal.getOrganizationalUnitIdentifier(); //string - msp_principal.certifiers_identifier = proto_principal.getCertifiersIdentifier().toBuffer().toString('utf8'); //bytes + msp_principal.certifiers_identifier = proto_principal.getCertifiersIdentifier().toBuffer().toString('hex'); //bytes break; case _mspPrProto.MSPPrincipal.Classification.IDENTITY: msp_principal = decodeIdentity(proto_msp_principal.getPrincipal()); @@ -462,7 +910,7 @@ function decodeConfigSignature(proto_configSignature) { }; function decodeSignatureHeader(signature_header_bytes) { - logger.debug('decodeSignatureHeader - %s',signature_header_bytes); + //logger.debug('decodeSignatureHeader - %s',signature_header_bytes); var signature_header = {}; var proto_signature_header = _commonProto.SignatureHeader.decode(signature_header_bytes); signature_header.creator = decodeIdentity(proto_signature_header.getCreator().toBuffer()); @@ -472,7 +920,7 @@ function decodeSignatureHeader(signature_header_bytes) { }; function decodeIdentity(id_bytes) { - logger.debug('decodeIdentity - %s',id_bytes); + //logger.debug('decodeIdentity - %s',id_bytes); var identity = {}; try { var proto_identity = _identityProto.SerializedIdentity.decode(id_bytes); @@ -489,49 +937,93 @@ function decodeIdentity(id_bytes) { function decodeFabricMSPConfig(msp_config_bytes) { var msp_config = {}; var proto_msp_config = _mspConfigProto.FabricMSPConfig.decode(msp_config_bytes); - // get the application org names - var orgs = []; - let org_units = proto_msp_config.getOrganizationalUnitIdentifiers(); - if(org_units) for(let i = 0; i < org_units.length; i++) { - let org_unit = org_units[i]; - let org_id = org_unit.organizational_unit_identifier; - orgs.push(org_id); - } + msp_config.name = proto_msp_config.getName(); - msp_config.root_certs = actualBuffers(proto_msp_config.getRootCerts()); - msp_config.intermediate_certs = actualBuffers(proto_msp_config.getIntermediateCerts()); - msp_config.admins = actualBuffers(proto_msp_config.getAdmins()); - msp_config.revocation_list = actualBuffers(proto_msp_config.getRevocationList()); - msp_config.signing_identity = proto_msp_config.getSigningIdentity(); - msp_config.organizational_unit_identifiers = orgs; + msp_config.root_certs = toPEMcerts(proto_msp_config.getRootCerts()); + msp_config.intermediate_certs = toPEMcerts(proto_msp_config.getIntermediateCerts()); + msp_config.admins = toPEMcerts(proto_msp_config.getAdmins()); + msp_config.revocation_list = toPEMcerts(proto_msp_config.getRevocationList()); + msp_config.signing_identity = decodeSigningIdentityInfo(proto_msp_config.getSigningIdentity()); + msp_config.organizational_unit_identifiers = decodeFabricOUIdentifier(proto_msp_config.getOrganizationalUnitIdentifiers()); return msp_config; }; -function actualBuffers(buffer_array_in) { +function decodeFabricOUIdentifier(proto_organizational_unit_identitfiers) { + var organizational_unit_identitfiers = []; + if(proto_organizational_unit_identitfiers) { + for(let i = 0; i < proto_organizational_unit_identitfiers.length; i++) { + var proto_organizational_unit_identitfier = proto_organizational_unit_identitfiers[i]; + var organizational_unit_identitfier = {}; + organizational_unit_identitfier.certificate = + proto_organizational_unit_identitfier.getCertificate().toBuffer().toString(); + organizational_unit_identitfier.organizational_unit_identifier = + proto_organizational_unit_identitfier.getOrganizationalUnitIdentifier(); + organizational_unit_identitfiers.push(organizational_unit_identitfier); + } + } + + return organizational_unit_identitfiers; +} + +function toPEMcerts(buffer_array_in) { var buffer_array_out = []; for(var i in buffer_array_in) { buffer_array_out.push(buffer_array_in[i].toBuffer().toString()); } + return buffer_array_out; }; +function decodeSigningIdentityInfo(signing_identity_info_bytes) { + var signing_identity_info = {}; + if(signing_identity_info_bytes) { + var proto_signing_identity_info = _mspConfigProto.SigningIdentityInfo.decode(signing_identity_info_bytes); + signing_identity_info.public_signer = proto_signing_identity_info.getPublicSigner().toBuffer().toString(); + signing_identity_info.private_signer = decodeKeyInfo(proto_signing_identity_info.getPrivateSigner()); + } + + return signing_identity_info_bytes; +} + +function decodeKeyInfo(key_info_bytes) { + var key_info = {}; + if(key_info_bytes) { + var proto_key_info = _mspConfigProto.KeyInfo.decode(key_info_bytes); + key_info.key_identifier = proto_key_info.getKeyIdentitier(); + key_info.key_material = 'private'; //should not show this + } + + return key_info; +} + function decodeHeader(proto_header) { var header = {}; header.channel_header = decodeChannelHeader(proto_header.getChannelHeader().toBuffer()); header.signature_header = decodeSignatureHeader(proto_header.getSignatureHeader().toBuffer()); + return header; }; +var HeaderType = { + 0 : 'MESSAGE', // Used for messages which are signed but opaque + 1 : 'CONFIG', // Used for messages which express the channel config + 2 : 'CONFIG_UPDATE', // Used for transactions which update the channel config + 3 : 'ENDORSER_TRANSACTION', // Used by the SDK to submit endorser based transactions + 4 : 'ORDERER_TRANSACTION', // Used internally by the orderer for management + 5 : 'DELIVER_SEEK_INFO', // Used as the type for Envelope messages submitted to instruct the Deliver API to seek + 6 : 'CHAINCODE_PACKAGE' // Used for packaging chaincode artifacts for install +}; + function decodeChannelHeader(header_bytes){ var channel_header = {}; var proto_channel_header = _commonProto.ChannelHeader.decode(header_bytes); - channel_header.type = proto_channel_header.getType(); - channel_header.version = proto_channel_header.getType(); + channel_header.type = HeaderType[proto_channel_header.getType()]; + channel_header.version = decodeVersion(proto_channel_header.getType()); channel_header.timestamp = timeStampToDate(proto_channel_header.getTimestamp()).toString(); channel_header.channel_id = proto_channel_header.getChannelId(); channel_header.tx_id = proto_channel_header.getTxId(); - channel_header.epoch = proto_channel_header.getEpoch(); + channel_header.epoch = proto_channel_header.getEpoch().toInt(); //TODO need to decode this channel_header.extension = proto_channel_header.getExtension().toBuffer().toString('hex');; @@ -548,12 +1040,20 @@ function timeStampToDate(time_stamp) { function decodeChaincodeActionPayload(payload_bytes) { var payload = {}; var proto_chaincode_action_payload = _transProto.ChaincodeActionPayload.decode(payload_bytes); - payload.chaincode_proposal_payload = proto_chaincode_action_payload.getChaincodeProposalPayload();//TODO more decode needed + payload.chaincode_proposal_payload = decodeChaincodeProposalPayload(proto_chaincode_action_payload.getChaincodeProposalPayload()); payload.action = decodeChaincodeEndorsedAction(proto_chaincode_action_payload.getAction()); return payload; }; +function decodeChaincodeProposalPayload(chaincode_proposal_payload_bytes) { + var chaincode_proposal_payload = {}; + var proto_chaincode_proposal_payload = _proposalProto.ChaincodeProposalPayload.decode(chaincode_proposal_payload_bytes); + chaincode_proposal_payload.input = proto_chaincode_proposal_payload.getInput().toBuffer().toString(); + //TransientMap is not allowed to be included on ledger + + return chaincode_proposal_payload; +} function decodeChaincodeEndorsedAction(proto_chaincode_endorsed_action) { var action = {}; action.proposal_response_payload = decodeProposalResponsePayload(proto_chaincode_endorsed_action.getProposalResponsePayload()); @@ -569,7 +1069,7 @@ function decodeChaincodeEndorsedAction(proto_chaincode_endorsed_action) { function decodeEndorsement(proto_endorsement) { var endorsement = {}; endorsement.endorser = decodeIdentity(proto_endorsement.getEndorser()); - endorsement.signature = proto_endorsement.getSignature(); + endorsement.signature = proto_endorsement.getSignature().toBuffer().toString('hex'); return endorsement; }; @@ -577,7 +1077,7 @@ function decodeEndorsement(proto_endorsement) { function decodeProposalResponsePayload(proposal_response_payload_bytes) { var proposal_response_payload = {}; var proto_proposal_response_payload = _responseProto.ProposalResponsePayload.decode(proposal_response_payload_bytes); - proposal_response_payload.proposal_hash = proto_proposal_response_payload.getProposalHash(); + proposal_response_payload.proposal_hash = proto_proposal_response_payload.getProposalHash().toBuffer().toString('hex'); proposal_response_payload.extension = decodeChaincodeAction(proto_proposal_response_payload.getExtension()); return proposal_response_payload; @@ -587,12 +1087,22 @@ function decodeChaincodeAction(action_bytes) { var chaincode_action = {}; var proto_chaincode_action = _proposalProto.ChaincodeAction.decode(action_bytes); chaincode_action.results = decodeReadWriteSets(proto_chaincode_action.getResults()); - chaincode_action.events = proto_chaincode_action.getEvents(); //TODO should we decode these + chaincode_action.events = decodeChaincodeEvents(proto_chaincode_action.getEvents()); chaincode_action.response = decodeResponse(proto_chaincode_action.getResponse()); return chaincode_action; }; +function decodeChaincodeEvents(event_bytes) { + var events = {}; + var proto_events = _ccEventProto.ChaincodeEvent.decode(event_bytes); + events.chaincode_id = proto_events.getChaincodeId(); + events.tx_id = proto_events.getTxId(); + events.event_name = proto_events.getEventName(); + events.payload = proto_events.getPayload().toBuffer().toString('hex'); + return events; +} + function decodeReadWriteSets(rw_sets_bytes) { var proto_tx_read_write_set = _rwsetProto.TxReadWriteSet.decode(rw_sets_bytes); var tx_read_write_set = {}; @@ -696,7 +1206,7 @@ function decodeKVWrite(proto_kv_write) { let kv_write = {}; kv_write.key = proto_kv_write.getKey(); kv_write.is_delete = proto_kv_write.getIsDelete(); - kv_write.value = proto_kv_write.getValue(); + kv_write.value = proto_kv_write.getValue().toBuffer().toString(); return kv_write; } @@ -706,9 +1216,18 @@ function decodeResponse(proto_response) { var response = {}; response.status = proto_response.getStatus(); response.message = proto_response.getMessage(); - response.payload = proto_response.getPayload(); + response.payload = proto_response.getPayload().toBuffer().toString(); return response; }; -module.exports = Block; +// version numbers should not get that big +// so lets just return an Integer (32bits) +function decodeVersion(version_long) { + var version_string = version_long.toString(); + var version_int = Number.parseInt(version_string); + + return version_int; +} + +module.exports = BlockDecoder; diff --git a/fabric-client/lib/Chain.js b/fabric-client/lib/Chain.js index 2ab991bea4..7076e09aa0 100755 --- a/fabric-client/lib/Chain.js +++ b/fabric-client/lib/Chain.js @@ -26,7 +26,7 @@ var path = require('path'); var ChannelConfig = require('./ChannelConfig.js'); var Peer = require('./Peer.js'); var Orderer = require('./Orderer.js'); -var Block = require('./Block.js'); +var BlockDecoder = require('./BlockDecoder.js'); var settle = require('promise-settle'); var grpc = require('grpc'); var logger = utils.getLogger('Chain.js'); @@ -968,7 +968,7 @@ var Chain = class { } if(response.response) { logger.debug('queryBlockByHash - response status %d:', response.response.status); - var block = Block.decode(response.response.payload); + var block = BlockDecoder.decode(response.response.payload); logger.debug('queryBlockByHash - looking at block :: %s',block.header.number); return Promise.resolve(block); } @@ -1028,7 +1028,7 @@ var Chain = class { } if(response.response) { logger.debug('queryBlock - response status %d:', response.response.status); - var block = Block.decode(response.response.payload); + var block = BlockDecoder.decode(response.response.payload); logger.debug('queryBlockByHash - looking at block :: %s',block.header.number); return Promise.resolve(block); } @@ -1088,7 +1088,7 @@ var Chain = class { } if(response.response) { logger.debug('queryTransaction - response status :: %d', response.response.status); - var processTrans = Block.decodeTransaction(response.response.payload); + var processTrans = BlockDecoder.decodeTransaction(response.response.payload); return Promise.resolve(processTrans); } // no idea what we have, lets fail it and send it back diff --git a/fabric-client/lib/EventHub.js b/fabric-client/lib/EventHub.js index b8f61a5b73..0c824ab93e 100644 --- a/fabric-client/lib/EventHub.js +++ b/fabric-client/lib/EventHub.js @@ -18,6 +18,7 @@ var utils = require('./utils.js'); var Remote = require('./Remote.js'); +var BlockDecoder = require('./BlockDecoder.js'); var grpc = require('grpc'); var HashTable = require('hashtable'); var logger = utils.getLogger('EventHub.js'); @@ -33,10 +34,16 @@ var _ccEventProto = grpc.load(__dirname + '/protos/peer/chaincode_event.proto'). var _validation_codes = {}; var keys = Object.keys(_transProto.TxValidationCode); for(var i = 0;i- any other standard grpc call options will be passed to the grpc service calls directly + *
- any other standard grpc stream options will be passed to the grpc service calls directly */ setPeerAddr(peerUrl, opts) { @@ -130,124 +145,251 @@ var EventHub = class { } /** - * Establishes connection with peer event source

- * Note: Only use this if creating your own EventHub. The chain - * class creates a default eventHub that most Node clients can - * use (see eventHubConnect, eventHubDisconnect and getEventHub). + * Establishes connection with peer event source + */ + connect(){ + this._connect(); + } + + /* + * Internal use only + * Establishes connection with peer event source + * @param {boolean} force - internal use only, will reestablish the + * the connection to the peer event hub */ - connect() { + _connect(force) { logger.debug('connect - start'); - if (this.connected) { + if (!force && this.connected) { logger.debug('connect - end - already conneted'); return; } if (!this.ep) throw Error('Must set peer address before connecting.'); + + var self = this; // for callback context + + var send_timeout = setTimeout(function(){ + logger.error('connect - timed out after:%s', self.ep._request_timeout); + self.disconnect(); + }, self.ep._request_timeout); + + this._client = new _events.Events(this.ep._endpoint.addr, this.ep._endpoint.creds, this.ep._options); - this.call = this._client.chat(); + this.stream = this._client.chat(); this.connected = true; - var eh = this; // for callback context - this.call.on('data', function(event) { + this.stream.on('data', function(event) { + clearTimeout(send_timeout); + var state = self.stream.call.channel_.getConnectivityState(); + logger.debug('connect - on.data - grpc stream state :%s',state); if (event.Event == 'block') { - eh.blockRegistrants.forEach(function(cb) { - cb(event.block); - }); - event.block.data.data.forEach(function(transaction) { - try { - var env = _common.Envelope.decode(transaction); - var payload = _common.Payload.decode(env.payload); - var channel_header = _common.ChannelHeader.decode(payload.header.channel_header); - if (channel_header.type == _common.HeaderType.ENDORSER_TRANSACTION) { - var tx = _transProto.Transaction.decode(payload.data); - var chaincodeActionPayload = _ccTransProto.ChaincodeActionPayload.decode(tx.actions[0].payload); - var propRespPayload = _responseProto.ProposalResponsePayload - .decode(chaincodeActionPayload.action.proposal_response_payload); - var caPayload = _ccProposalProto.ChaincodeAction.decode(propRespPayload.extension); - var ccEvent = _ccEventProto.ChaincodeEvent.decode(caPayload.events); - var cbtable = eh.chaincodeRegistrants.get(ccEvent.chaincode_id); - if (!cbtable) { - return; - } - cbtable.forEach(function(cbe) { - if (cbe.eventNameFilter.test(ccEvent.event_name)) { - cbe.cb(ccEvent); - } - }); - - } - } catch (err) { - logger.error('Error unmarshalling transaction=', err); - } - }); + var block = BlockDecoder.decodeBlock(event.block); + self._processBlockOnEvents(block); + self._processTxOnEvents(block); + self._processChainCodeOnEvents(block); } else if (event.Event == 'register'){ - //TODO use this event to verify that eventhub is ready logger.debug('connect - register event received'); } else if (event.Event == 'unregister'){ - // TODO use this event to mark this eventhub not ready + if(self.connected) self.disconnect(); logger.debug('connect - unregister event received'); } else { logger.debug('connect - unknown event %s',event.Event); } }); - this.call.on('end', function() { - eh.call.end(); - // clean up Registrants - should app get notified? - eh.chaincodeRegistrants.clear(); - eh.blockRegistrants.clear(); - eh.txRegistrants.clear(); + this.stream.on('end', function() { + clearTimeout(send_timeout); + var state = self.stream.call.channel_.getConnectivityState(); + logger.debug('connect - on.end - grpc stream state :%s',state); + if(self.connected) self.disconnect(); + }); + this.stream.on('error', function() { + clearTimeout(send_timeout); + var state = self.stream.call.channel_.getConnectivityState(); + logger.debug('connect - on.error - grpc stream state :%s',state); + if(self.connected) self.disconnect(); }); - // register txCallback to process txid callbacks - this.registerBlockEvent(this.txCallback.bind(this)); + this._sendRegistration(true); + logger.debug('connect - end'); } /** - * Disconnects peer event source

- * Note: Only use this if creating your own EventHub. The chain - * class creates a default eventHub that most Node clients can - * use (see eventHubConnect, eventHubDisconnect and getEventHub). + * Disconnects the connection to the peer event source. + * Will close all event listeners and send an `Error` to + * all listeners that provided an "onError" callback. */ disconnect() { - if (!this.connected) return; - this.unregisterBlockEvent(this.txCallback); - this.call.end(); this.connected = false; + this._closeAllCallbacks(new Error('EventHub has been shutdown')); + if(this.stream) { + this._sendRegistration(false); + this.stream.end(); + } + } + + /* + * Internal method + * Builds a signed event registration + * and sends it to the peer's event hub. + */ + _sendRegistration(register) { + var user = this._clientContext.getUserContext(); + var signedEvent = new _events.SignedEvent(); + var event = new _events.Event(); + var reg = {events: [{event_type: 'BLOCK'}]}; + + if(register) { + event.setRegister(reg); + } + else { + event.setUnregister(reg); + } + + event.setCreator(user.getIdentity().serialize()); + signedEvent.setEventBytes(event.toBuffer()); + var sig = user.getSigningIdentity().sign(event.toBuffer()); + signedEvent.setSignature(Buffer.from(sig)); + this.stream.write(signedEvent); + } + + /* + * Internal method to close out all callbacks + * Sends an error to all registered event onError callbacks + */ + _closeAllCallbacks(err) { + logger.debug('_closeAllCallbacks - start'); + + var closer = function(key, cb) { + logger.debug('_closeAllCallbacks - closing this callback %s',key); + cb(err); + }; + + logger.debug('_closeAllCallbacks - blockOnErrors %s',this.blockOnErrors.size()); + this.blockOnErrors.forEach(closer); + this.blockOnEvents.clear(); + this.blockOnErrors.clear(); + + logger.debug('_closeAllCallbacks - transactionOnErrors %s',this.transactionOnErrors.size()); + this.transactionOnErrors.forEach(closer); + this.transactionOnEvents.clear(); + this.transactionOnErrors.clear(); + + var cc_closer = function(key, cbtable) { + cbtable.forEach(function(cbe) { + logger.debug('_closeAllCallbacks - closing this chaincode event %s %s',cbe.ccid, cbe.eventNameFilter); + if(cbe.onError) { + cbe.onError(err); + } + }); + }; + + logger.debug('_closeAllCallbacks - chaincodeRegistrants %s',this.chaincodeRegistrants.size()); + this.chaincodeRegistrants.forEach(cc_closer); + this.chaincodeRegistrants.clear(); + } + + /* + * Internal method + * checks for a connection and will restart + */ + _checkConnection(throw_error, force_reconnect) { + var state = 0; + if(this.stream) { + state = this.stream.call.channel_.getConnectivityState(); + } + if(this.connected) { + logger.debug('_checkConnection - this hub %s is connected with stream channel state %s', this.ep.getUrl(), state); + } + else { + logger.debug('_checkConnection - this hub %s is not connected with stream channel state %s', this.ep.getUrl(), state); + if(throw_error) { + throw new Error('The event hub has not been connected to the event source'); + } + } + + if(force_reconnect) { + try { + var is_paused = this.stream.isPaused(); + logger.debug('_checkConnection - grpc isPaused :%s',is_paused); + if(is_paused) { + this.stream.resume(); + logger.debug('_checkConnection - grpc resuming '); + } + var state = this.stream.call.channel_.getConnectivityState(); + logger.debug('_checkConnection - grpc stream state :%s',state); + if(state != 2) { + // try to reconnect + this._connect(true); + } + } + catch(error) { + logger.error('_checkConnection - error ::' + error.stack ? error.stack : error); + this.disconnect(); + throw new Error('Event hub is not connected '); + } + } } /** * Register a callback function to receive chaincode events. - * @param {string} ccid string chaincode id - * @param {string} eventname string The regex string used to filter events - * @param {function} callback Function Callback function for filter matches + * This EventHub instance must be connected to a remote + * peer's event hub before registering for events by calling + * the "connect()" method. + * @param {string} ccid - string chaincode id + * @param {string} eventname - string The regex string used to filter events + * @param {function} onEvent - callback function for filter matches * that takes a single parameter which is a json object representation - * of type "message ChaincodeEvent" from lib/proto/chaincodeevent.proto + * of type "message ChaincodeEvent" from lib/proto/chaincode_event.proto + * @param {function} onError - optional callback function to be notified when this + * event hub is shutdown. * @returns {object} ChainCodeCBE object that should be treated as an opaque * handle used to unregister (see unregisterChaincodeEvent) */ - registerChaincodeEvent(ccid, eventname, callback) { - if (!this.connected) return; - var cb = new ChainCodeCBE(ccid, eventname, callback); + registerChaincodeEvent(ccid, eventname, onEvent, onError) { + logger.debug('registerChaincodeEvent - start'); + if(!ccid) { + throw new Error('Missing "ccid" parameter'); + } + if(!eventname) { + throw new Error('Missing "eventname" parameter'); + } + if(!onEvent) { + throw new Error('Missing "onEvent" parameter'); + } + var have_error_cb = onError ? true : false; + // when there is no error callback throw an error + // when this hub is not connected + this._checkConnection(!have_error_cb, false); + + var cbe = new ChainCodeCBE(ccid, eventname, onEvent, onError); var cbtable = this.chaincodeRegistrants.get(ccid); if (!cbtable) { cbtable = new Set(); this.chaincodeRegistrants.put(ccid, cbtable); - cbtable.add(cb); - } else { - cbtable.add(cb); } - return cb; + cbtable.add(cbe); + + // when there is an error callback try to reconnect this + // event hub if is not connected + if(have_error_cb) { + this._checkConnection(false, this.force_reconnect); + } + + return cbe; } /** * Unregister chaincode event registration - * @param {object} ChainCodeCBE handle returned from call to + * @param {object} cbe - ChainCodeCBE handle return from call to * registerChaincodeEvent. */ unregisterChaincodeEvent(cbe) { - if (!this.connected) return; + logger.debug('unregisterChaincodeEvent - start'); + if(!cbe) { + throw new Error('Missing "cbe" parameter'); + } var cbtable = this.chaincodeRegistrants.get(cbe.ccid); if (!cbtable) { logger.debug('No event registration for ccid %s ', cbe.ccid); @@ -261,85 +403,90 @@ var EventHub = class { /** * Register a callback function to receive block events. - * @param {function} callback Function that takes a single parameter - * which is a json object representation of type "message Block" - * from lib/proto/fabric.proto - * @returns {Promise} Promise for a successful registration, no returned values + * This EventHub instance must be connected to a remote + * peer's event hub before registering for events by calling + * the "connect()" method. + * @param {function} onEvent Function that takes a single parameter + * which is a JSON object representation of type GRPC message "Block" + * from lib/proto/common/common.proto. + * @see {@link Block} + * @param {function} onError - optional callback function to be notified when this + * event hub is shutdown. + * @returns {int} This is the block registration number that must be + * used to unregister (see unregisterBlockEvent) */ - registerBlockEvent(callback) { - var user = this._clientContext.getUserContext(); - if (!this.connected) throw new Error('The event hub has not been connected to the event source'); - - this.blockRegistrants.add(callback); - if (this.blockRegistrants.size == 1) { - - var signedEvent = new _events.SignedEvent(); - - var event = new _events.Event(); - event.setRegister({ - events: [{ - event_type: 'BLOCK' - }] - }); - event.setCreator(user.getIdentity().serialize()); - - signedEvent.setEventBytes(event.toBuffer()); - - var sig = user.getSigningIdentity().sign(event.toBuffer()); - signedEvent.setSignature(Buffer.from(sig)); - this.call.write(signedEvent); + registerBlockEvent(onEvent, onError) { + logger.debug('registerBlockEvent - start'); + if(!onEvent) { + throw new Error('Missing "onEvent" parameter'); } + var have_error_cb = onError ? true : false; + // when there is no error callback throw and error + // when this hub is not connected + this._checkConnection(!have_error_cb, false); + + var block_registration_number = this.block_registrant_count++; + this.blockOnEvents.put(block_registration_number, onEvent); + + // when there is an error callback try to reconnect this + // event hub if is not connected + if(have_error_cb) { + this.blockOnErrors.put(block_registration_number, onError); + this._checkConnection(false, this.force_reconnect); + } + + return block_registration_number; } /** - * Unregister block event registration - * @param {function} callback Function to unregister + * Unregister the block event listener with the block + * registration number. + * @param {int} The block registration number that was returned + * during registration. */ - unregisterBlockEvent(callback) { - var user = this._clientContext.getUserContext(); - if (!this.connected) return; - - if (this.blockRegistrants.size <= 1) { - var signedEvent = new _events.SignedEvent(); - - var event = new _events.Event(); - event.setUnregister({ - events: [{ - event_type: 'BLOCK' - }] - }); - event.setCreator(user.getIdentity().serialize()); - - signedEvent.setEventBytes(event.toBuffer()); - - var sig = user.getSigningIdentity().sign(event.toBuffer()); - signedEvent.setSignature(Buffer.from(sig)); - - this.call.write(signedEvent); + unregisterBlockEvent(block_registration_number) { + logger.debug('unregisterBlockEvent - start %s',block_registration_number); + if(!block_registration_number) { + throw new Error('Missing "block_registration_number" parameter'); } - this.blockRegistrants.delete(callback); + this.blockOnEvents.remove(block_registration_number); + this.blockOnErrors.remove(block_registration_number); } /** - * Register a callback function to receive transactional events.

- * Note: transactional event registration is primarily used by - * the sdk to track instantiate and invoke completion events. Nodejs - * clients generally should not need to call directly. + * Register a callback function to receive transactional events. + * This EventHub instance must be connected to a remote + * peer's event hub before registering for events by calling + * the "connect()" method. * @param {string} txid string transaction id - * @param {function} callback Function that takes a parameter which + * @param {function} onEvent Function that takes a parameter which * is a json object representation of type "message Transaction" * from lib/proto/fabric.proto and a parameter which is a boolean * that indicates if the transaction is invalid (true=invalid) + * @param {function} onError - optional callback function to be notified when this + * event hub is shutdown. */ - registerTxEvent(txid, callback) { - logger.debug('reg txid ' + txid); - if(this.connected) { - logger.debug(' this hub %s is connected', this.ep.getUrl()); + registerTxEvent(txid, onEvent, onError) { + logger.debug('registerTxEvent txid ' + txid); + if(!txid) { + throw new Error('Missing "txid" parameter'); } - else { - logger.debug('this hub %s is not connected', this.ep.getUrl()); + if(!onEvent) { + throw new Error('Missing "onEvent" parameter'); + } + var have_error_cb = onError ? true : false; + // when there is no onError callback throw and error + // when this hub is not connected + this._checkConnection(!have_error_cb, false); + + this.transactionOnEvents.put(txid, onEvent); + + // when there is an onError callback try to reconnect this + // event hub if is not connected + if(have_error_cb) { + this.transactionOnErrors.put(txid, onError); + this._checkConnection(false, this.force_reconnect); } - this.txRegistrants.put(txid, callback); } /** @@ -347,38 +494,94 @@ var EventHub = class { * @param txid string transaction id */ unregisterTxEvent(txid) { - this.txRegistrants.remove(txid); + logger.debug('unregisterTxEvent txid ' + txid); + if(!txid) { + throw new Error('Missing "txid" parameter'); + } + this.transactionOnEvents.remove(txid); + this.transactionOnErrors.remove(txid); } - /** - * private internal callback for processing tx events - * @param {object} block json object representing block of tx - * from the fabric + /* + * private internal method for processing block events + * @param {object} block protobuf object + */ + _processBlockOnEvents(block) { + logger.debug('_processBlockOnEvents block=%s', block.header.number); + if(this.blockOnEvents.size() == 0) { + logger.debug('_processBlockOnEvents - no registered block event "listeners"'); + return; + } + + // send to all registered block listeners + this.blockOnEvents.forEach(function(key, cb) { + cb(block); + }); + } + + /* + * private internal method for processing tx events + * @param {object} block protobuf object which might contain the tx from the fabric */ - txCallback(block) { - logger.debug('txCallback block=%s', block.header.number); - var eh = this; + _processTxOnEvents(block) { + logger.debug('_processTxOnEvents block=%s', block.header.number); + if(this.transactionOnEvents.size() == 0) { + logger.debug('_processTxOnEvents - no registered transaction event "listeners"'); + return; + } + var txStatusCodes = block.metadata.metadata[_common.BlockMetadataIndex.TRANSACTIONS_FILTER]; for (var index=0; index < block.data.data.length; index++) { - logger.debug('txCallback - trans index=%s',index); - try { - var env = _common.Envelope.decode(block.data.data[index]); - var payload = _common.Payload.decode(env.payload); - var channel_header = _common.ChannelHeader.decode(payload.header.channel_header); - } catch (err) { - logger.error('Error unmarshalling transaction from block=', err); - break; - } - + logger.debug('_processTxOnEvents - trans index=%s',index); + var channel_header = block.data.data[index].payload.header.channel_header; var val_code = convertValidationCode(txStatusCodes[index]); - logger.debug('txCallback - txid=%s val_code=%s',val_code, channel_header.tx_id); - var cb = eh.txRegistrants.get(channel_header.tx_id); + logger.debug('_processTxOnEvents - txid=%s val_code=%s', channel_header.tx_id, val_code); + var cb = this.transactionOnEvents.get(channel_header.tx_id); if (cb){ - logger.debug('txCallback - about to call the transaction call back for code=%s tx=%s', val_code, channel_header.tx_id); + logger.debug('_processTxOnEvents - about to stream the transaction call back for code=%s tx=%s', val_code, channel_header.tx_id); cb(channel_header.tx_id, val_code); } + } + }; + + /* + * private internal method for processing chaincode events + * @param {object} block protobuf object which might contain the chaincode event from the fabric + */ + _processChainCodeOnEvents(block) { + logger.debug('_processChainCodeOnEvents block=%s', block.header.number); + if(this.chaincodeRegistrants.size() == 0) { + logger.debug('_processChainCodeOnEvents - no registered chaincode event "listeners"'); + return; + } + for (var index=0; index < block.data.data.length; index++) { + logger.debug('_processChainCodeOnEvents - trans index=%s',index); + try { + var env = block.data.data[index]; + var payload = env.payload; + var channel_header = payload.header.channel_header; + if (channel_header.type === _header_types[3]) { + var tx = payload.data; + var chaincodeActionPayload = tx.actions[0].payload; + var propRespPayload = chaincodeActionPayload.action.proposal_response_payload; + var caPayload = propRespPayload.extension; + var ccEvent = caPayload.events; + logger.debug('_processChainCodeOnEvents - ccEvent %s',ccEvent); + var cbtable = this.chaincodeRegistrants.get(ccEvent.chaincode_id); + if (!cbtable) { + return; + } + cbtable.forEach(function(cbe) { + if (cbe.eventNameFilter.test(ccEvent.event_name)) { + cbe.onEvent(ccEvent); + } + }); + } + } catch (err) { + logger.error('on.data - Error unmarshalling transaction=', err); + } } }; }; diff --git a/test/integration/e2e/e2eUtils.js b/test/integration/e2e/e2eUtils.js index ff2a34ca8b..47d0bd2132 100644 --- a/test/integration/e2e/e2eUtils.js +++ b/test/integration/e2e/e2eUtils.js @@ -441,6 +441,7 @@ function invokeChaincode(userOrg, version, t){ var targets = [], eventhubs = []; + var pass_results = null; // override t.end function so it'll always disconnect the event hub t.end = ((context, ehs, f) => { @@ -547,13 +548,26 @@ function invokeChaincode(userOrg, version, t){ t.fail('Failed to enroll user \'admin\'. ' + err); throw new Error('Failed to enroll user \'admin\'. ' + err); + }).then((results) =>{ + pass_results = results; + var sleep_time = 0; + // can use "sleep=30000" to give some time to manually stop and start + // the peer so the event hub will also stop and start + if (process.argv.length > 2) { + if (process.argv[2].indexOf('sleep=') === 0) { + sleep_time = process.argv[2].split('=')[1]; + } + } + t.comment('*****************************************************************************'); + t.comment('stop and start the peer event hub ---- N O W ----- you have ' + sleep_time + ' millis'); + t.comment('*****************************************************************************'); + return sleep(sleep_time); + }).then((nothing) => { - }).then((results) => { - - var proposalResponses = results[0]; + var proposalResponses = pass_results[0]; - var proposal = results[1]; - var header = results[2]; + var proposal = pass_results[1]; + var header = pass_results[2]; var all_good = true; for(var i in proposalResponses) { let one_good = false; @@ -600,18 +614,25 @@ function invokeChaincode(userOrg, version, t){ let txPromise = new Promise((resolve, reject) => { let handle = setTimeout(reject, 120000); - eh.registerTxEvent(deployId.toString(), (tx, code) => { - clearTimeout(handle); - eh.unregisterTxEvent(deployId); - - if (code !== 'VALID') { - t.fail('The balance transfer transaction was invalid, code = ' + code); - reject(); - } else { - t.pass('The balance transfer transaction has been committed on peer '+ eh.ep._endpoint.addr); + eh.registerTxEvent(deployId.toString(), + (tx, code) => { + clearTimeout(handle); + eh.unregisterTxEvent(deployId); + + if (code !== 'VALID') { + t.fail('The balance transfer transaction was invalid, code = ' + code); + reject(); + } else { + t.pass('The balance transfer transaction has been committed on peer '+ eh.ep._endpoint.addr); + resolve(); + } + }, + (err) => { + clearTimeout(handle); + t.pass('Successfully received notification of the event call back being cancelled for '+ deployId); resolve(); } - }); + ); }); eventPromises.push(txPromise); diff --git a/test/integration/e2e/join-channel.js b/test/integration/e2e/join-channel.js index 3da0e58620..0a3f5e6eb4 100644 --- a/test/integration/e2e/join-channel.js +++ b/test/integration/e2e/join-channel.js @@ -28,11 +28,6 @@ var fs = require('fs'); var Client = require('fabric-client'); var EventHub = require('fabric-client/lib/EventHub.js'); -var Block = require('fabric-client/lib/Block.js'); - -var grpc = require('grpc'); -var _commonProto = grpc.load(path.join(__dirname, '../../../fabric-client/lib/protos/common/common.proto')).common; -var _configtxProto = grpc.load(path.join(__dirname, '../../../fabric-client/lib/protos/common/configtx.proto')).common; var testUtil = require('../../unit/util.js'); @@ -180,14 +175,11 @@ function joinChannel(org, t) { eh.registerBlockEvent((block) => { clearTimeout(handle); - // in real-world situations, a peer may have more than one channels so + // in real-world situations, a peer may have more than one channel so // we must check that this block came from the channel we asked the peer to join if(block.data.data.length === 1) { // Config block must only contain one transaction - var envelope = _commonProto.Envelope.decode(block.data.data[0]); - var payload = _commonProto.Payload.decode(envelope.payload); - var channel_header = _commonProto.ChannelHeader.decode(payload.header.channel_header); - + var channel_header = block.data.data[0].payload.header.channel_header; if (channel_header.channel_id === channel_name) { t.pass('The new channel has been successfully joined on peer '+ eh.ep._endpoint.addr); resolve(); diff --git a/test/integration/query.js b/test/integration/query.js index 3958376109..6706f3be2e 100644 --- a/test/integration/query.js +++ b/test/integration/query.js @@ -133,6 +133,7 @@ test(' ---->>>>> Query chain working <<<<<-----', function(t) { // send query return chain.queryBlock(0); }).then((block) => { + logger.info(' Chain getBlock() returned block %j',block); logger.info(' Chain getBlock() returned block number=%s',block.header.number); t.equal(block.header.number.toString(),'0','checking query results are correct that we got zero block back'); t.equal(block.data.data[0].payload.data.config.channel_group.groups.Orderer.groups.OrdererMSP.values.MSP.value.config.name,'OrdererMSP','checking query results are correct that we got the correct orderer MSP name'); diff --git a/test/unit/block.js b/test/unit/block.js index f63b77f3f4..6337e416ad 100644 --- a/test/unit/block.js +++ b/test/unit/block.js @@ -30,12 +30,12 @@ var policiesProto = grpc.load(path.join(__dirname, '../../fabric-client/lib/prot var commonProto = grpc.load(path.join(__dirname, '../../fabric-client/lib/protos/common/common.proto')).common; var rewire = require('rewire'); -var Block = rewire('fabric-client/lib/Block.js'); +var BlockDecoder = rewire('fabric-client/lib/BlockDecoder.js'); -test('\n\n*** Block.js tests ***\n\n', (t) => { +test('\n\n*** BlockDecoder.js tests ***\n\n', (t) => { t.throws( () => { - Block.decode(new Uint8Array(2)); + BlockDecoder.decode(new Uint8Array(2)); }, /Block input data is not a byte buffer/, 'Check input is a Buffer object' @@ -44,7 +44,7 @@ test('\n\n*** Block.js tests ***\n\n', (t) => { // use the genesis block as input to test the decoders var data = fs.readFileSync(path.join(__dirname, '../fixtures/channel/twoorgs.genesis.block')); - var block = Block.decode(data); + var block = BlockDecoder.decode(data); t.pass('Genesis block parsed without error'); t.equal( @@ -60,7 +60,7 @@ test('\n\n*** Block.js tests ***\n\n', (t) => { output += txt; }); - var decodeConfigPolicy = Block.__get__('decodeConfigPolicy'); + var decodeConfigPolicy = BlockDecoder.__get__('decodeConfigPolicy'); var mockPolicy = new policiesProto.Policy(); mockPolicy.setType(policiesProto.Policy.PolicyType.SIGNATURE); decodeConfigPolicy({ @@ -76,7 +76,7 @@ test('\n\n*** Block.js tests ***\n\n', (t) => { unhook(); - if (output.indexOf('warn') >= 0 && output.indexOf('[Block.js]: decodeConfigPolicy - found a PolicyType of MSP') >= 0) { + if (output.indexOf('warn') >= 0 && output.indexOf('[BlockDecoder.js]: decodeConfigPolicy - found a PolicyType of MSP') >= 0) { t.pass('Successfully tested warn logging message about using the "MSP" policy type'); } else { t.fail('Failed to warn logging message about using the "MSP" policy type'); diff --git a/test/unit/event-hub.js b/test/unit/event-hub.js index 949ffb89aa..eb99bb2fa3 100644 --- a/test/unit/event-hub.js +++ b/test/unit/event-hub.js @@ -106,15 +106,304 @@ test('\n\n** EventHub tests\n\n', (t) => { () => { eh.registerBlockEvent(); }, + /Missing "onEvent" parameter/, + 'Check the Missing "onEvent" parameter' + ); + + t.throws( + () => { + eh.unregisterBlockEvent(); + }, + /Missing "block_registration_number" parameter/, + 'Check the Missing "block_registration_number" parameter' + ); + t.throws( + () => { + eh.registerTxEvent(); + }, + /Missing "txid" parameter/, + 'Check the Missing "txid" parameter' + ); + t.throws( + () => { + eh.registerTxEvent('txid'); + }, + /Missing "onEvent" parameter/, + 'Check the Missing "onEvent" parameter' + ); + t.throws( + () => { + eh.unregisterTxEvent(); + }, + /Missing "txid" parameter/, + 'Check the Missing "txid" parameter' + ); + t.throws( + () => { + eh.registerChaincodeEvent(); + }, + /Missing "ccid" parameter/, + 'Check the Missing "ccid" parameter' + ); + t.throws( + () => { + eh.registerChaincodeEvent('ccid'); + }, + /Missing "eventname" parameter/, + 'Check the Missing "eventname" parameter' + ); + t.throws( + () => { + eh.registerChaincodeEvent('ccid','eventname'); + }, + /Missing "onEvent" parameter/, + 'Check the Missing "onEvent" parameter' + ); + t.throws( + () => { + eh.unregisterChaincodeEvent(); + }, + /Missing "cbe" parameter/, + 'Check the Missing "cbe" parameter' + ); + t.throws( + () => { + eh.registerBlockEvent({}); + }, /The event hub has not been connected to the event source/, 'Check the event hub must be connected before the block event listener can be registered' ); + t.throws( + () => { + eh.registerChaincodeEvent('ccid', 'eventname', {}); + }, + /The event hub has not been connected to the event source/, + 'Check the event hub must be connected before the chaincode event listener can be registered' + ); + t.throws( + () => { + eh.registerTxEvent('txid', {}); + }, + /The event hub has not been connected to the event source/, + 'Check the event hub must be connected before the tranaction event listener can be registered' + ); + t.end(); +}); + +test('\n\n** EventHub block callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + eh.registerBlockEvent((block) => { + t.fail('Should not have called success callback'); + t.end(); + }, (error) =>{ + t.pass('Successfully called error callback'); + t.end(); + }); + t.pass('successfully registered block callbacks'); + eh.disconnect(); +}); + +test('\n\n** EventHub transaction callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + eh.registerTxEvent('txid', (block) => { + t.fail('Should not have called success callback'); + t.end(); + }, (error) =>{ + t.pass('Successfully called error callback'); + t.end(); + }); + t.pass('successfully registered transaction callbacks'); + eh.disconnect(); +}); - eh.registerTxEvent('dummyId', () => { - // dummy function +test('\n\n** EventHub chaincode callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + eh.registerChaincodeEvent('ccid', 'eventfilter', (block) => { + t.fail('Should not have called success callback'); + t.end(); + }, (error) =>{ + t.pass('Successfully called error callback'); + t.end(); + }); + t.pass('successfully registered chaincode callbacks'); + eh.disconnect(); +}); + +test('\n\n** EventHub block callback no Error callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + eh.registerBlockEvent((block) => { + t.fail('Should not have called success callback'); + t.end(); }); + t.pass('successfully registered block callbacks'); + eh.disconnect(); + t.end(); +}); - t.equal(eh.txRegistrants.size(), 1, 'txRegistrants size should be 1 after registering a transaction event listener'); +test('\n\n** EventHub transaction callback no Error callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + eh.registerTxEvent('txid', (block) => { + t.fail('Should not have called success callback'); + t.end(); + }); + t.pass('successfully registered transaction callbacks'); + eh.disconnect(); + t.end(); +}); + +test('\n\n** EventHub chaincode callback no Error callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + eh.registerChaincodeEvent('ccid', 'eventfilter', (block) => { + t.fail('Should not have called success callback'); + t.end(); + }); + t.pass('successfully registered chaincode callbacks'); + eh.disconnect(); + t.end(); +}); + +test('\n\n** EventHub remove block callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + var blockcallback = (block) => { + t.fail('Should not have called success callback'); + t.end(); + }; + var blockerrorcallback = (error) =>{ + t.fail('Should not have called error callback'); + t.end(); + }; + var brn = eh.registerBlockEvent( blockcallback, blockerrorcallback); + t.pass('successfully registered block callbacks'); + eh.unregisterBlockEvent(brn); + t.pass('successfuly unregistered block callback'); + eh.disconnect(); + t.pass('successfuly disconnected eventhub'); + t.end(); +}); + +test('\n\n** EventHub remove transaction callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + var txid = 'txid'; + eh.registerTxEvent(txid, (block) => { + t.fail('Should not have called success callback'); + t.end(); + }, (error) =>{ + t.fail('Should not have called error callback'); + t.end(); + }); + t.pass('successfully registered transaction callbacks'); + eh.unregisterTxEvent(txid); + t.pass('successfuly unregistered transaction callback'); + eh.disconnect(); + t.pass('successfuly disconnected eventhub'); + t.end(); +}); + +test('\n\n** EventHub remove chaincode callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + var cbe = eh.registerChaincodeEvent('ccid', 'eventfilter', (block) => { + t.fail('Should not have called success callback'); + t.end(); + }, (error) =>{ + t.fail('Should not have called error callback'); + t.end(); + }); + t.pass('successfully registered chaincode callbacks'); + eh.unregisterChaincodeEvent(cbe); + t.pass('successfuly unregistered chaincode callback'); + eh.disconnect(); + t.pass('successfuly disconnected eventhub'); + t.end(); +}); + + +test('\n\n** EventHub remove block callback no Error callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + var blockcallback = (block) => { + t.fail('Should not have called success callback'); + t.end(); + }; + var brn = eh.registerBlockEvent( blockcallback); + t.pass('successfully registered block callbacks'); + eh.unregisterBlockEvent(brn); + t.pass('successfuly unregistered block callback'); + eh.disconnect(); + t.pass('successfuly disconnected eventhub'); + t.end(); +}); + +test('\n\n** EventHub remove transaction callback no Error callback\n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + + var txid = 'txid'; + eh.registerTxEvent(txid, (block) => { + t.fail('Should not have called success callback'); + t.end(); + }); + t.pass('successfully registered transaction callbacks'); + eh.unregisterTxEvent(txid); + t.pass('successfuly unregistered transaction callback'); + eh.disconnect(); + t.pass('successfuly disconnected eventhub'); + t.end(); +}); + +test('\n\n** EventHub remove chaincode callback no Error callback \n\n', (t) => { + var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); + eh.setPeerAddr('grpc://localhost:7053'); + eh.connected = true; //force this into connected state + eh.force_reconnect = false; + var cbe = eh.registerChaincodeEvent('ccid', 'eventfilter', (block) => { + t.fail('Should not have called success callback'); + t.end(); + }); + t.pass('successfully registered chaincode callbacks'); + eh.unregisterChaincodeEvent(cbe); + t.pass('successfuly unregistered chaincode callback'); + eh.disconnect(); + t.pass('successfuly disconnected eventhub'); t.end(); -}); \ No newline at end of file +});