Skip to content

Commit

Permalink
FAB-6400 NodeSDK - filtered events
Browse files Browse the repository at this point in the history
Add support for the new protos. Covert to new deliver
service. This is the first step in supporting the
filtered events to be sure the existing channel event
service is able to use the new protos. Remove the old event
listener from the join channel to get ready for when it is
no longer supported. This event was not needed on the join.

Change-Id: Iff19a68d5ccbad2ab78e1f874a9d6b33a548caaa
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Jan 11, 2018
1 parent 38148d3 commit 945fa8b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 65 deletions.
3 changes: 2 additions & 1 deletion fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var BlockDecoder = require('./BlockDecoder.js');

var grpc = require('grpc');
var _abProto = grpc.load(__dirname + '/protos/orderer/ab.proto').orderer;
var _eventsProto = grpc.load(__dirname + '/protos/peer/events.proto').protos;
var _commonProto = grpc.load(__dirname + '/protos/common/common.proto').common;
var _ccTransProto = grpc.load(__dirname + '/protos/peer/transaction.proto').protos;
var _transProto = grpc.load(__dirname + '/protos/peer/transaction.proto').protos;
Expand Down Expand Up @@ -259,7 +260,7 @@ var ChannelEventHub = class {
options = utils.checkAndAddConfigSetting('grpc.keepalive_timeout_ms', request_timeout_ms, options);

logger.info('_connect - options %j',this._peer._options);
this._event_client = new _abProto.AtomicBroadcast(this._peer._endpoint.addr, this._peer._endpoint.creds, this._peer._options);
this._event_client = new _eventsProto.Deliver(this._peer._endpoint.addr, this._peer._endpoint.creds, this._peer._options);
this._stream = this._event_client.deliver();

this._stream.on('data', function(event) {
Expand Down
30 changes: 27 additions & 3 deletions fabric-client/lib/protos/peer/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ option go_package = "github.com/hyperledger/fabric/protos/peer";

package protos;


//----Event objects----

enum EventType {
Expand Down Expand Up @@ -123,16 +122,41 @@ message Event {
//Unregister consumer sent events
Unregister unregister = 5;

FilteredBlock filtered_block = 7;
FilteredBlock filtered_block = 7;
}
// Creator of the event, specified as a certificate chain
bytes creator = 6;
// Timestamp of the client - used to mitigate replay attacks
google.protobuf.Timestamp timestamp = 8;

// If mutual TLS is employed, this represents
// the hash of the client's TLS certificate
bytes tls_cert_hash = 9;
}

// Interface exported by the events server
service Events {
// event chatting using Event
rpc Chat(stream SignedEvent) returns (stream Event) {}
rpc Chat (stream SignedEvent) returns (stream Event) {
}
}

// DeliverResponse
message DeliverResponse {
oneof Type {
common.Status status = 1;
common.Block block = 2;
FilteredBlock filtered_block = 3;
}
}

service Deliver {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received.
rpc Deliver (stream common.Envelope) returns (stream DeliverResponse) {
}
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received.
rpc DeliverFiltered (stream common.Envelope) returns (stream DeliverResponse) {
}
}
6 changes: 6 additions & 0 deletions fabric-client/lib/protos/peer/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ message ChaincodeInfo {
// the name of the VSCC for this chaincode. This will be
// blank if the query is returning information about installed chaincodes.
string vscc = 6;
// the chaincode unique id.
// computed as: H(
// H(name || version) ||
// H(CodePackage)
// )
bytes id = 7;
}

// ChannelQueryResponse returns information about each channel that pertains
Expand Down
65 changes: 4 additions & 61 deletions test/integration/e2e/join-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,13 @@ var tx_id = null;

var ORGS;

var allEventhubs = [];

//
//Attempt to send a request to the orderer with the createChannel method
//
test('\n\n***** End-to-end flow: join channel *****\n\n', function(t) {
Client.addConfigFile(path.join(__dirname, './config.json'));
ORGS = Client.getConfigSetting('test-network');

// override t.end function so it'll always disconnect the event hub
t.end = ((context, ehs, f) => {
return function() {
for(var key in ehs) {
var eventhub = ehs[key];
if (eventhub && eventhub.isconnected()) {
logger.debug('Disconnecting the event hub');
eventhub.disconnect();
}
}

f.apply(context, arguments);
};
})(t, allEventhubs, t.end);

joinChannel('org1', t)
.then(() => {
t.pass(util.format('Successfully joined peers in organization "%s" to the channel', ORGS['org1'].name));
Expand Down Expand Up @@ -90,8 +73,7 @@ function joinChannel(org, t) {

var orgName = ORGS[org].name;

var targets = [],
eventhubs = [];
var targets = [];

var caRootsPath = ORGS.orderer.tls_cacerts;
let data = fs.readFileSync(path.join(__dirname, caRootsPath));
Expand Down Expand Up @@ -147,65 +129,26 @@ function joinChannel(org, t) {
}
)
);

let eh = client.newEventHub();
eh.setPeerAddr(
ORGS[org][key].events,
{
pem: Buffer.from(data).toString(),
'ssl-target-name-override': ORGS[org][key]['server-hostname']
}
);
eh.connect();
eventhubs.push(eh);
allEventhubs.push(eh);
}
}
}

var eventPromises = [];
eventhubs.forEach((eh) => {
let txPromise = new Promise((resolve, reject) => {
let handle = setTimeout(reject, 30000);

eh.registerBlockEvent((block) => {
clearTimeout(handle);

// in real-world situations, a peer may have more than one channel so
// we must check that this block came from the channel we asked the peer to join
if(block.data.data.length === 1) {
// Config block must only contain one transaction
var channel_header = block.data.data[0].payload.header.channel_header;
if (channel_header.channel_id === channel_name) {
t.pass('The new channel has been successfully joined on peer '+ eh.getPeerAddr());
resolve();
}
else {
t.fail('The new channel has not been succesfully joined');
reject();
}
}
});
});

eventPromises.push(txPromise);
});
tx_id = client.newTransactionID();
let request = {
targets : targets,
block : genesis_block,
txId : tx_id
};
let sendPromise = channel.joinChannel(request, 30000);
return Promise.all([sendPromise].concat(eventPromises));

return channel.joinChannel(request, 30000);
}, (err) => {
t.fail('Failed to enroll user \'admin\' due to error: ' + err.stack ? err.stack : err);
throw new Error('Failed to enroll user \'admin\' due to error: ' + err.stack ? err.stack : err);
})
.then((results) => {
logger.debug(util.format('Join Channel R E S P O N S E : %j', results));

if(results[0] && results[0][0] && results[0][0].response && results[0][0].response.status == 200) {
if(results && results[0] && results[0].response && results[0].response.status == 200) {
t.pass(util.format('Successfully joined peers in organization %s to join the channel', orgName));
} else {
t.fail(' Failed to join channel');
Expand Down

0 comments on commit 945fa8b

Please sign in to comment.