Skip to content

Commit

Permalink
FABN-1345 NodeSDK Add DeliverWithPrivateData support
Browse files Browse the repository at this point in the history
the connect function in the ChannelEventHub have a new parameter in its
options called private_data, when set to true, it will deliver blocks
with private data. The new return willl contain a new atribute called
private_data, that will contain the set of writes of the private data
collections a peer can access.

Signed-off-by: Alexis Ortiz <[email protected]>
Change-Id: I5ddc155da0abbf0c21b77baace3d2e40054065c8
  • Loading branch information
Alexis Ortiz authored and harrisob committed Sep 26, 2019
1 parent 70a9234 commit 59afabc
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 16 deletions.
40 changes: 40 additions & 0 deletions fabric-client/lib/BlockDecoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,29 @@ rule
return block;
}

/**
* Constructs an object containing all decoded values from the
* protobuf encoded `BlockAndPrivateData` object
*
* @param {Object} block_with_private_data - an object that represents the protobuf common.BlockAndPrivateData
* @returns {Block} An object of the fully decoded protobuf common.Block with its private data as a new property
*/
static decodeBlockWithPrivateData(block_with_private_data) {
if (!block_with_private_data) {
throw new Error('Block with private data input data is missing');
}
let block = {};
try {
block = this.decodeBlock(block_with_private_data.block);
block.private_data = decodePrivateData(block_with_private_data.private_data_map);
} catch (error) {
logger.error('decode - ::' + (error.stack ? error.stack : error));
throw new Error('Block with private data decode has failed with ' + error.toString());
}

return block;
}

/**
* @typedef {Object} ProcessedTransaction
* @property {number} validationCode - See [this list]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/peer/transaction.proto#L125}
Expand Down Expand Up @@ -566,6 +589,23 @@ payload -- {}
}
};

function decodePrivateData(private_data_map) {
const private_data = {};
for (const i in private_data_map) {
const privateDataTransaction = private_data_map[i];
for (const j in privateDataTransaction.ns_pvt_rwset) {
const nsPrivateRW = privateDataTransaction.ns_pvt_rwset[j];
for (const k in nsPrivateRW.collection_pvt_rwset) {
const colPrivateRW = nsPrivateRW.collection_pvt_rwset[k];
colPrivateRW.rwset = decodeKVRWSet(colPrivateRW.rwset);
nsPrivateRW.collection_pvt_rwset[k] = colPrivateRW;
}
}
private_data[i] = privateDataTransaction;
}
return private_data;
}

function decodeBlockHeader(proto_block_header) {
const block_header = {};
block_header.number = proto_block_header.getNumber().toString();
Expand Down
43 changes: 36 additions & 7 deletions fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class ChannelEventHub {
// using filtered blocks
this._filtered_stream = true; // the default

// using private data
this._stream_with_private_data = false; // the default

// connect count for this instance
this._current_stream = 0;
// reference to the channel instance holding critical context such as signing identity
Expand Down Expand Up @@ -249,7 +252,7 @@ class ChannelEventHub {

/**
* @typedef {Object} ConnectOptions
* @property {boolean} full_block - Optional. To indicate that the connection
* @property {boolean} [full_block] - Optional. To indicate that the connection
* with the peer will be sending full blocks or filtered blocks to this
* ChannelEventHub.
* The default will be to establish a connection using filtered blocks.
Expand All @@ -260,7 +263,14 @@ class ChannelEventHub {
* receive full blocks.
* Registering a block listener on a filtered block connection may not
* provide sufficient information.
* @property {Number | string} startBlock - Optional. This will have the connection
* @property {boolean} [private_data] - Optional. To indicate that the connection
* with the peer will be sending full blocks with private data.
* The default will be to establish a connection without retrieving private data.
* Blocks with private data have an extra field called 'private_data' which
* contains a map of the key-values writes of the private data collections the
* peer can access. The private_data option can only be set to true if the option
* full_block is set to true, otherwise it will throw an error.
* @property {Number | string} [startBlock] - Optional. This will have the connection
* setup to start sending blocks back to the event hub at the block
* with this number. If connecting with a
* a startBlock then event listeners may not be registered with a
Expand All @@ -272,7 +282,7 @@ class ChannelEventHub {
* If the event hub should start with the latest block on the ledger,
* use the string 'latest' or do use a startBlock.
* Default is to start with the latest block on the ledger.
* @property {Number | string} endBlock - Optional. This will have the connection
* @property {Number | string} [endBlock] - Optional. This will have the connection
* setup to end sending blocks back to the event hub at the block
* with this number. If connecting with a
* a endBlock then event listeners may not be registered with a
Expand All @@ -282,11 +292,11 @@ class ChannelEventHub {
* If the event hub should end with the current block on the
* ledger use the string 'newest'.
* Default is to not stop sending.
* @property {SignedEvent} signedEvent - Optional. The signed event to be sent
* @property {SignedEvent} [signedEvent] - Optional. The signed event to be sent
* to the peer. This option is useful when the fabric-client application
* does not have the user's privateKey and can not sign requests to the
* fabric network.
* @property {Peer | string} target - Optional. The peer that provides the
* @property {Peer | string} [target] - Optional. The peer that provides the
* fabric event service. When using a string, the {@link Channel}
* must have a peer assigned with that name. This peer will replace
* the current peer endpoint of this channel event hub.
Expand Down Expand Up @@ -322,6 +332,7 @@ class ChannelEventHub {
logger.debug('%s - start - hub:%s', method, this._event_hub_number);
let signedEvent = null;
let full_block = null;
let private_data = null;
const connect_request = {};

// the following supports the users using the boolean parameter to control
Expand All @@ -332,6 +343,7 @@ class ChannelEventHub {
if (typeof options === 'object' && options !== null) {
signedEvent = options.signedEvent || null;
full_block = options.full_block || null;
private_data = options.private_data || null;

if (typeof options.force === 'boolean') {
connect_request.force = options.force;
Expand Down Expand Up @@ -375,6 +387,16 @@ class ChannelEventHub {
logger.debug('%s - using a filtered block stream by default', method);
}

if (typeof private_data === 'boolean') {
if (private_data && !full_block) {
throw new Error('A filtered stream can not deliver private data, if you want to be able to deliver private data you have to set up the full_block option to true');
}
this._stream_with_private_data = private_data;
logger.debug('%s - deliver block with private data set to:%s', method, !private_data);
} else {
logger.debug('%s - delivering block without private data by default', method);
}

logger.debug('%s - signed event:%s', method, !!signedEvent);
this._connect(connect_request);
logger.debug('%s - end %s', method, this.getPeerAddr());
Expand Down Expand Up @@ -469,7 +491,11 @@ class ChannelEventHub {
if (this._filtered_stream) {
this._stream = this._event_client.deliverFiltered();
} else {
this._stream = this._event_client.deliver();
if (this._stream_with_private_data) {
this._stream = this._event_client.deliverWithPrivateData();
} else {
this._stream = this._event_client.deliver();
}
}

this._stream.on('data', (deliverResponse) => {
Expand All @@ -485,7 +511,7 @@ class ChannelEventHub {
}

logger.debug('on.data - grpc stream is ready :%s', isStreamReady(self));
if (deliverResponse.Type === 'block' || deliverResponse.Type === 'filtered_block') {
if (deliverResponse.Type === 'block' || deliverResponse.Type === 'filtered_block' || deliverResponse.Type === 'block_and_private_data') {
if (self._connected === true) {
logger.debug('on.data - new block received - check event registrations');
} else {
Expand All @@ -502,6 +528,9 @@ class ChannelEventHub {
if (deliverResponse.Type === 'block') {
block = BlockDecoder.decodeBlock(deliverResponse.block);
self._last_block_seen = convertToLong(block.header.number);
} else if (deliverResponse.Type === 'block_and_private_data') {
block = BlockDecoder.decodeBlockWithPrivateData(deliverResponse.block_and_private_data);
self._last_block_seen = convertToLong(block.header.number);
} else {
block = JSON.parse(JSON.stringify(deliverResponse.filtered_block));
self._last_block_seen = convertToLong(block.number);
Expand Down
144 changes: 137 additions & 7 deletions test/integration/e2e/e2eUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ function instantiateChaincodeWithId(userOrg, chaincode_id, chaincode_path, versi
}

const targets = [];
const eventhubs = [];
const txEventHubs = [];

let type = 'instantiate';
if (upgrade) {
Expand Down Expand Up @@ -249,7 +249,7 @@ function instantiateChaincodeWithId(userOrg, chaincode_id, chaincode_path, versi
channel.addPeer(peer);

const eh = channel.newChannelEventHub(peer);
eventhubs.push(eh);
txEventHubs.push(eh);
}
}

Expand Down Expand Up @@ -367,7 +367,7 @@ function instantiateChaincodeWithId(userOrg, chaincode_id, chaincode_path, versi
const eventPromises = [];
eventPromises.push(channel.sendTransaction(request));

eventhubs.forEach((eh) => {
txEventHubs.forEach((eh) => {
const txPromise = new Promise((resolve, reject) => {
const handle = setTimeout(() => {
t.fail('Timeout - Failed to receive the event for instantiate: waiting on ' + eh.getPeerAddr());
Expand Down Expand Up @@ -458,14 +458,15 @@ function buildChaincodeProposal(client, theuser, chaincode_id, chaincode_path, v
}
module.exports.buildChaincodeProposal = buildChaincodeProposal;

function invokeChaincode(userOrg, version, chaincodeId, t, useStore, fcn, args, expectedResult) {
function invokeChaincode(userOrg, version, chaincodeId, t, useStore, fcn, args, expectedResult, expectedPrivateDataMap) {
init();

logger.debug('invokeChaincode begin');
Client.setConfigSetting('request-timeout', 60000);
const channel_name = Client.getConfigSetting('E2E_CONFIGTX_CHANNEL_NAME', testUtil.END2END.channel);

const eventhubs = [];
const txEventHubs = [];
const blockEventHubs = [];
let pass_results = null;

// this is a transaction, will just use org's identity to
Expand Down Expand Up @@ -532,12 +533,14 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore, fcn, args,
const peer = client.newPeer(
ORGS[key].peer1.requests,
{
name:ORGS[key].name,
pem: Buffer.from(newData).toString(),
'ssl-target-name-override': ORGS[key].peer1['server-hostname']
}
);
channel.addPeer(peer);
eventhubs.push(channel.newChannelEventHub(peer));
txEventHubs.push(channel.newChannelEventHub(peer));
blockEventHubs.push(channel.newChannelEventHub(peer));
}
}

Expand Down Expand Up @@ -644,7 +647,7 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore, fcn, args,
const deployId = tx_id.getTransactionID();

const eventPromises = [];
eventhubs.forEach((eh) => {
txEventHubs.forEach((eh) => {
const txPromise = new Promise((resolve, reject) => {
const handle = setTimeout(() => {
t.fail('Timeout - Failed to receive the event for commit: waiting on ' + eh.getPeerAddr());
Expand Down Expand Up @@ -673,6 +676,60 @@ function invokeChaincode(userOrg, version, chaincodeId, t, useStore, fcn, args,
eventPromises.push(txPromise);
});

if (expectedPrivateDataMap) {

if (!Object.keys(expectedPrivateDataMap).length) {
throw new Error('the expected private data map can not be empty');
}

blockEventHubs.forEach((eh) => {

const peerName = eh.getName();
const expectedPrivateData = expectedPrivateDataMap[peerName]; // we make sure to be checking the right private data for each peer org

const blockPromise = new Promise((resolve, reject) => {

const handle = setTimeout(() => {
if (Object.keys(expectedPrivateData).length) {
t.fail('Timeout - Failed to receive the the expected private data in the block event: waiting on ' + eh.getPeerAddr());
eh.disconnect(); // will not be using this event hub
reject('TIMEOUT waiting on ' + eh.getPeerAddr());
}
}, 30000);

eh.registerBlockEvent((block) => {
const privateData = block.private_data;
if (blockHavePrivateDataHashes(block)) {
if (checkPrivateDataContent(privateData, expectedPrivateData)) {
t.pass('Successfully received the private data in the deliver of the block ' + block.header.number + ' at the eventhub on ' + eh.getPeerAddr());
}
}
// if there is no more data to expect, is because it found everything that expected
// that means the calling was succesfull
if (!Object.keys(expectedPrivateData).length) {
t.pass('Successfully checked the presence of the expected private data in the blocks delivered by eventhub ' + eh.getPeerAddr());
delete expectedPrivateDataMap[peerName]; // as we found the expected private data element, then we delete it because we dont expect it anymore
if (!Object.keys(expectedPrivateDataMap).length) { // expecting no more data, then it was all found
t.pass('Successfully checked the presence of all the expectedPrivateData in the blocks delivered');
}
clearTimeout(handle);
eh.disconnect();
resolve();
}
}, (error) => {
if (Object.keys(expectedPrivateData).length) {
t.fail('There was a problem with the fetching of the block: ' + error + ' and there is still expecting results to be checked');
clearTimeout(handle);
reject(error);
}
});
eh.connect({full_block:true, private_data:true});

});
eventPromises.push(blockPromise);
});
}

const sendPromise = channel.sendTransaction(request);
return Promise.all([sendPromise].concat(eventPromises))
.then((results) => {
Expand Down Expand Up @@ -927,6 +984,79 @@ function getTargetPeers(channel, targets) {
return targetPeers;
}

// checks if a block contains transactions with private data
function blockHavePrivateDataHashes(block) {
// iterate over the block to look if there is any trail of private data
let hasPrivateData = false;

const blockData = block.data.data;
blockData.forEach((dataItem) => {
const payloadActions = dataItem.payload.data.actions;
payloadActions.forEach((payloadAction) => {
const nsRWSet = payloadAction.payload.action.proposal_response_payload.extension.results.ns_rwset;
nsRWSet.forEach((rw) => {
const collectionHashedRWset = rw.collection_hashed_rwset;
if (collectionHashedRWset.length) { // if collectionHashedRWset is not empty then it have private data... obviously
hasPrivateData = true;
}
});
});
});

return hasPrivateData;

}

// checks that the private data matches the expected result
function checkPrivateDataContent(privateData, expectedResults) {

if (!expectedResults || Object.keys(expectedResults).length === 0) {
throw new Error('you have to provide some expected results');
}
if (!privateData || Object.keys(privateData).length === 0) {
throw new Error('you have to provide private data to check');
}

// we iterate over the complexity of the private data object to gouge out the value of the writes

for (const i in privateData) {
// here we iterate over each transaction in the block


const privateDataTransaction = privateData[i];
for (const j in privateDataTransaction.ns_pvt_rwset) {
// here we iterate over each namespace read-write set

const nsPrivateRW = privateDataTransaction.ns_pvt_rwset[j];
for (const k in nsPrivateRW.collection_pvt_rwset) {
// here we iterate over each collection read-write set in the given namespace

const colPrivateRW = nsPrivateRW.collection_pvt_rwset[k];
// we look for the expected value given the name of the collection that we have in this iteration
const expectedValue = expectedResults[colPrivateRW.collection_name];

if (!expectedValue) {
// if expectedValue is undefined is because the collection name was not present in the expectedResults parameter
continue;
}

for (const l in colPrivateRW.rwset.writes) {
const realValue = colPrivateRW.rwset.writes[l];
if (realValue.key === expectedValue.key && realValue.value === expectedValue.value) {
// if we found in the real private data values a matching pair with the expected values
// then we remove it from the expected results, because we are no more expecting them
delete expectedResults[colPrivateRW.collection_name];
}
}
}
}
}

// if the expected results are empty is because we found all af them, and then we dont expect them anymore
// that means that the checking was successful, otherwise not
return (Object.keys(expectedResults).length === 0);
}

async function getCollectionsConfig(t, org, chaincodeId, channel_name) {
init();

Expand Down
Loading

0 comments on commit 59afabc

Please sign in to comment.