Skip to content

Commit

Permalink
FAB-6390] NodeSDK - Channel Event Service
Browse files Browse the repository at this point in the history
Add new Channel event service. Includes new
updated protos, new test cases, and new event
hub class, and tutorial.

Change-Id: Id706fd139586af3125f4af60ecc02514e81d0a90
Signed-off-by: Bret Harrison <[email protected]>
  • Loading branch information
harrisob committed Jan 2, 2018
1 parent 115548f commit f2a3fd6
Show file tree
Hide file tree
Showing 14 changed files with 2,932 additions and 54 deletions.
1 change: 1 addition & 0 deletions build/tasks/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ gulp.task('test', ['clean-up', 'lint', 'pre-test', 'docker-ready', 'ca'], functi
'test/integration/fileKeyValueStore-fabricca-tests.js',
'test/integration/install.js',
'test/integration/events.js',
'test/integration/channel-event-hub.js',
// channel: mychannel, chaincode: end2endnodesdk:v2
'test/integration/grpc.js',
// channel: mychannel, chaincode: end2endnodesdk:v3
Expand Down
198 changes: 198 additions & 0 deletions docs/tutorials/channel-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@

This tutorial illustrates the use of channel-based events. These events are similar to the existing events, however are specific to a single channel. channel-based events are a new feature of the Hyperledger Fabric Node.js client as of 1.1.

For more information on:
* getting started with Hyperledger Fabric see
[Building your first network](http://hyperledger-fabric.readthedocs.io/en/latest/build_network.html).

The following assumes an understanding of the Hyperledger Fabric network
(orderers and peers),
and of Node application development, including the use of the
Javascript `Promise`.

### Overview
Channel-based events occur when there is a new block added to the channel ledger. A client application may use the Fabric Node.js client to register a listener to receive new blocks as they are added to the channel ledger. The Fabric Node.js client will also assist client applications by processing the incoming blocks and looking for specific transactions or chaincode events. This allows a client application to be notified of transaction completion or arbitrary chaincode events without having to perform multiple queries or search through blocks.

### new API on the Channel
* `newChannelEventHub(peer)` - A Channel instance method to get a new instance of a ChannelEventHub.

### new class ChannelEventHub and new APIs
* `registerBlockEvent(eventCallBack, errorCallBack, start_block)` - To register for block events.
* `unregisterBlockEvent(reg_num)` - To remove a block registration.
* `registerTxEvent(tx_id, eventCallBack, errorCallBack, start_block)` - To register for a specific transaction event.
* `unregisterTxEvent(tx_id)` - To remove a specific transaction registration.
* `registerChaincodeEvent(ccid, eventCallBack, errorCallBack, start_block)` - To register for chaincode events.
* `unregisterChaincodeEvent(cc_handle)` - To remove a chaincode event registration.
* `connect()` - To have the client channel event hub connect with the fabric network channel base event service. This call must be made before events will be received by your instance of a ChannelEventHub. This call may be made before or after the registrations of events.
* `disconnect()` - To have the client channel event hub shutdown the connection to the fabric network channel-based event service and notify all current channel event registrations of the shutdown by using the registered errorCallBacks.

##### `peer` parameter
This parameter must be included when getting a new instance of the ChannelEventHub. The value may be a `Peer` instance or the name of a peer when using a `connection profile` see [How to use a common network configuration file](tutorial-network-config.html).

##### `eventCallback` parameter
This parameter must be included. This is the callback function to be notified when this channel receives a new block, when listening for a specific transaction or chaincode events.

##### `errorCallback` parameter
This is an optional parameter. This is the callback function to be notified when this channel event hub is shutdown. The shutdown may be caused by a fabric network error, network connection problem or by a call to the "disconnect()" method.

##### `start_block` parameter
This is an optional parameter. This is is the starting block number for event checking. When included, the fabric channel-based event service will be asked to start sending blocks from this point. This could be used to resume and replay missed blocks that were added to the ledger. Since replaying events may confuse other event listeners, only one listener will be allowed on a ChannelEventHub when a start_block is included. When this parameter is excluded, which would be the normal situation, the fabric channel-based event service will be asked to start sending blocks from the last block on the ledger.

### Get a Channel Event Hub
A new method has been added to the fabric channel object to simplify setting up of an ChannelEventHub object. Use the following to get a ChannelEventHub instance that will be setup to work with the peer's channel-based event service. The ChannelEventHub instance will use all the same endpoint configuration settings that the peer instance is using, like the tls certs and the host and port address.

call by peer name
```
var channelEventHub = channel.getChannelEventHub('peer0.org1.example.com');
```

call by peer instance
```
let data = fs.readFileSync(path.join(__dirname, 'somepath/tlscacerts/org1.example.com-cert.pem'));
let peer = client.newPeer(
'grpcs://localhost:7051',
{
pem: Buffer.from(data).toString(),
'ssl-target-name-override': 'peer0.org1.example.com'
}
);
let channelEventHub = channel.newChannelEventHub(peer);
channelEventHub.connect();
```
### Block Listener
When there is a need to monitor for new blocks being added to the channel ledger, use a block event listener. The fabric client Node.js will be notified when a new block is committed to the ledger on the fabric peer. The fabric client Node.js will then call the registered callback of the application program. The callback will be passed a JSON representation of the newly added block. When there is a need to see previously added blocks, the registration of the callback may include a starting block number. The callback will start receiving blocks from this number and continue to receive new blocks as they are added to the ledger. This is a way for the application to resume and replay events that may have been lost if the application were to be offline. The application should remember the last block it has processed to avoid replaying the entire ledger.

The following example will register a block listener to start receiving blocks.
```
// keep the block_reg to unregister with later if needed
block_reg = channelEventHub.registerBlockEvent((block) => {
console.log('Successfully received the block event');
<do something with the block>
}, (error)=> {
console.log('Failed to receive the block event ::'+error);
<do something with the error>
});
```

The following example will register with a start block number because this application needs to resume at a specific block and replay the missed blocks. The application callback will handle the replayed blocks in the same manor like current events. The block listener will continue to receive blocks as they are committed to the ledger on the fabric peer.
```
// keep the block_reg to unregister with later if needed
block_reg = channelEventHub.registerBlockEvent((block) => {
console.log('Successfully received the block event');
<do something with the block>
}, (error)=> {
console.log('Failed to receive the block event ::'+error);
<do something with the error>
},
resume_point
);
```

### Transaction listener
When there is a need to monitor for the completion of a transaction on your organization's peer, use a transaction listener. The fabric client Node.js will be notified when a new block is committed to the ledger on the fabric peer. The fabric client Node.js will then check the block for registered transaction identifiers. If a transaction is found then the callback will be notified with the transaction ID, the transaction status, and the block number.

The following example will show registering a transaction ID within a javascript promise and building another promise for sending the transaction to the orderer. Both promises will be executed together so that the results will be received for both actions together.

```
let tx_object = client.newTransactionID();
// get the transaction ID string for later use
let tx_id = tx_object.getTransactionID();
let request = {
targets : targets,
chaincodeId: 'my_chaincode',
fcn: 'invoke',
args: ['doSomething', 'with this data'],
txId: tx_object
};
return channel.sendTransactionProposal(request);
}).then((results) => {
// a real application would check the proposal results
console.log('Successfully endorsed proposal to invoke chaincode');
// start block may be null if there is no need to resume or replay
let start_block = getBlockFromSomewhere();
let event_monitor = new Promise((resolve, reject) => {
let handle = setTimeout(() => {
channelEventHub.unregisterTxEvent(tx_id);
console.log('Timeout - Failed to receive the transaction event');
reject(new Error('Timed out waiting for block event'));
}, 20000);
channelEventHub.registerTxEvent((event_tx_id, status, block_num) => {
clearTimeout(handle);
channelEventHub.unregisterTxEvent(event_tx_id);
console.log('Successfully received the transaction event');
storeBlockNumForLater(block_num);
resolve(status);
}, (error)=> {
clearTimeout(handle);
channelEventHub.unregisterTxEvent(tx_id);
console.log('Failed to receive the transaction event ::'+error);
reject(error);
},
start_block // when this value is null (the normal case) transaction
// checking will start with the latest block
);
});
let send_trans = channel.sendTransaction({proposalResponses: results[0], proposal: results[1]});
return Promise.all([event_monitor, send_trans]);
}).then((results) => {
```

### Chaincode event listener
When there is a need to monitor for events that will be posted from within your chaincode, use a chaincode event listener. The fabric client Node.js will be notified when a new block is committed to the ledger on the fabric peer. The fabric client Node.js will then check for registered chaincode patterns within the chaincode events of the block. If a chaincode event is found then the callback will be notified with the chaincode event object and the block number.

The following example will show registering a chaincode event listener within a javascript promise and building another promise for sending the transaction to the orderer. Both promises will be executed together so that the results will be received for both actions together. If a chaincode event listener is needed for long term monitoring, follow the block listener example above.

```
let tx_object = client.newTransactionID();
let request = {
targets : targets,
chaincodeId: 'my_chaincode',
fcn: 'invoke',
args: ['doSomething', 'with this data'],
txId: tx_object
};
return channel.sendTransactionProposal(request);
}).then((results) => {
// a real application would check the proposal results
console.log('Successfully endorsed proposal to invoke chaincode');
let event_monitor = new Promise((resolve, reject) => {
let regid = null;
let handle = setTimeout(() => {
if (regid) {
channelEventHub.unregisterChaincodeEvent(regid);
console.log('Timeout - Failed to receive the chaincode event');
}
reject(new Error('Timed out waiting for chaincode event'));
}, 20000);
regid = channelEventHub.registerChaincodeEvent(chaincode_id.toString(), '^evtsender*',
(event, block_num) => {
clearTimeout(handle);
channelEventHub.unregisterChaincodeEvent(regid);
console.log('Successfully received the chaincode event');
storeBlockNumForLater(block_num);
resolve();
}, (error)=> {
clearTimeout(handle);
channelEventHub.unregisterChaincodeEvent(regid);
console.log('Failed to receive the chaincode event ::'+error);
reject(error);
});
});
let send_trans = channel.sendTransaction({proposalResponses: results[0], proposal: results[1]});
return Promise.all([event_monitor, send_trans]);
}).then((results) => {
```


<a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>.
3 changes: 3 additions & 0 deletions docs/tutorials/tutorials.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@
},
"network-config" : {
"title": "How to use a common network configuration file"
},
"channel-events" : {
"title": "How to use the channel-based event service"
}
}
90 changes: 62 additions & 28 deletions fabric-client/lib/BlockDecoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ rule
block.metadata = decodeBlockMetaData(proto_block.metadata);
} catch (error) {
logger.error('decode - ::' + error.stack ? error.stack : error);
throw error;
throw new Error('Block decode has failed with ' + error.toString());
}

return block;
Expand Down Expand Up @@ -650,18 +650,9 @@ function decodeBlockDataEnvelope(proto_envelope) {
envelope.payload = {};
var proto_payload = _commonProto.Payload.decode(proto_envelope.getPayload().toBuffer());
envelope.payload.header = decodeHeader(proto_payload.getHeader());

if (envelope.payload.header.channel_header.type === HeaderType[1]) { // CONFIG
envelope.payload.data = decodeConfigEnvelope(proto_payload.getData().toBuffer());
}
// else if(envelope.payload.header.channel_header.type === HeaderType[2]) { // CONFIG_UPDATE
// envelope.payload.data = decodeConfigUpdateEnvelope(proto_payload.getData().toBuffer());
// }
else if (envelope.payload.header.channel_header.type === HeaderType[3]) { //ENDORSER_TRANSACTION
envelope.payload.data = decodeEndorserTransaction(proto_payload.getData().toBuffer());
} else {
throw new Error('Only able to decode ENDORSER_TRANSACTION and CONFIG type blocks');
}
envelope.payload.data = HeaderType.decodePayloadBasedOnType(proto_payload.getData().toBuffer(), envelope.payload.header.channel_header.type);
// let's also have the type as the enum string value so it is easier to read
envelope.payload.header.channel_header.typeString = HeaderType.convertToString(envelope.payload.header.channel_header.type);

return envelope;
};
Expand All @@ -670,13 +661,14 @@ function decodeEndorserTransaction(trans_bytes) {
var data = {};
var transaction = _transProto.Transaction.decode(trans_bytes);
data.actions = [];
if (transaction && transaction.actions)
if (transaction && transaction.actions) {
for (let i in transaction.actions) {
var action = {};
action.header = decodeSignatureHeader(transaction.actions[i].header);
action.payload = decodeChaincodeActionPayload(transaction.actions[i].payload);
data.actions.push(action);
}
}

return data;
};
Expand Down Expand Up @@ -1074,22 +1066,13 @@ function decodeHeader(proto_header) {
return header;
};

var HeaderType = {
0: 'MESSAGE', // Used for messages which are signed but opaque
1: 'CONFIG', // Used for messages which express the channel config
2: 'CONFIG_UPDATE', // Used for transactions which update the channel config
3: 'ENDORSER_TRANSACTION', // Used by the SDK to submit endorser based transactions
4: 'ORDERER_TRANSACTION', // Used internally by the orderer for management
5: 'DELIVER_SEEK_INFO', // Used as the type for Envelope messages submitted to instruct the Deliver API to seek
6: 'CHAINCODE_PACKAGE' // Used for packaging chaincode artifacts for install
};

function decodeChannelHeader(header_bytes) {
var channel_header = {};
var proto_channel_header = _commonProto.ChannelHeader.decode(header_bytes);
channel_header.type = HeaderType[proto_channel_header.getType()];
channel_header.version = decodeVersion(proto_channel_header.getType());
channel_header.timestamp = timeStampToDate(proto_channel_header.getTimestamp()).toString();
channel_header.type = proto_channel_header.getType();
logger.debug('decodeChannelHeader - looking at type:%s',channel_header.type);
channel_header.version = decodeVersion(proto_channel_header.getVersion());
channel_header.timestamp = timeStampToDate(proto_channel_header.getTimestamp());
channel_header.channel_id = proto_channel_header.getChannelId();
channel_header.tx_id = proto_channel_header.getTxId();
channel_header.epoch = proto_channel_header.getEpoch().toString(); //unit64
Expand All @@ -1100,10 +1083,13 @@ function decodeChannelHeader(header_bytes) {
};

function timeStampToDate(time_stamp) {
if(!time_stamp) {
return 'null';
}
var millis = time_stamp.seconds * 1000 + time_stamp.nanos / 1000000;
var date = new Date(millis);

return date;
return date.toString();
};

function decodeChaincodeActionPayload(payload_bytes) {
Expand Down Expand Up @@ -1316,4 +1302,52 @@ function decodeVersion(version_long) {
return version_int;
}

const type_as_string = {
0: 'MESSAGE', // Used for messages which are signed but opaque
1: 'CONFIG', // Used for messages which express the channel config
2: 'CONFIG_UPDATE', // Used for transactions which update the channel config
3: 'ENDORSER_TRANSACTION', // Used by the SDK to submit endorser based transactions
4: 'ORDERER_TRANSACTION', // Used internally by the orderer for management
5: 'DELIVER_SEEK_INFO', // Used as the type for Envelope messages submitted to instruct the Deliver API to seek
6: 'CHAINCODE_PACKAGE' // Used for packaging chaincode artifacts for install
};

var HeaderType = class {
static convertToString(type) {
let result = null;
try {
result = type_as_string[type];
} catch(error) {
logger.error('HeaderType conversion - unknown headertype - %s',type);
}
if(!result) {
result = 'UNKNOWN_TYPE';
}
return result;
}

static decodePayloadBasedOnType(proto_data, type) {
let result = null;
switch(type) {
case 1:
result = decodeConfigEnvelope(proto_data);
break;
case 2:
result = decodeConfigUpdateEnvelope(proto_data);
break;
case 3:
result = decodeEndorserTransaction(proto_data);
break;
default:
// return empty data on types we do not know so that
// event processing may continue on blocks we do not
// care about
result = {};
}

return result;
}
}

module.exports = BlockDecoder;
module.exports.HeaderType = HeaderType;
Loading

0 comments on commit f2a3fd6

Please sign in to comment.