Skip to content

Commit

Permalink
[FABN-1203] fabric-network event listening doc changes
Browse files Browse the repository at this point in the history
Change-Id: I2832c4e8b68f966967dd78540435c3cdac25e1a1
Signed-off-by: Liam Grace <[email protected]>
  • Loading branch information
liam-grace authored and andrew-coleman committed Jun 10, 2019
1 parent 1e28d1b commit 0b885e1
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 93 deletions.
104 changes: 66 additions & 38 deletions docs/tutorials/event-checkpointer.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
`Experimental`
This tutorial describes the approaches that can be selected by users of the fabric-network module for replaying missed events emitted by peers.

### Overview
Expand All @@ -17,40 +16,13 @@ When using checkpointing:

### Checkpointers

The `BaseCheckpoint` class is an interface that is to be used by all Checkpoint classes. fabric-network has one default class, `FileSystemCheckpointer` that is exported as a factory in the `CheckpointFactories`. The `FILE_SYSTEM_CHECKPOINTER` is the default checkpointer.
The `BaseCheckpoint` class is an interface that is to be used by all Checkpoint classes. fabric-network has one default class, {@link module:fabric-network~FileSystemCheckpointer} that is exported as a factory in the {@link module:fabric-network~CheckpointFactories}. The `FILE_SYSTEM_CHECKPOINTER` is the default checkpointer.

A checkpoint factory is a function that returns an instance with `BaseCheckpointer` as a parent class. These classes implement the `async save(channelName, listenerName)` and `async load()` functions.

A checkpointer is called each time the event callback is triggered.

The checkpointer can be set when connecting to a gateway or when creating the event listener.
```javascript
const { Gateway, CheckpointFactories } = require('fabric-network');

const connectOptions = {
checkpointer: {
factory: CheckpointFactories.FILE_SYSTEM_CHECKPOINTER,
options: {} // Options usable by the factory
}
};

const gateway = new Gateway()
await gateway.connect(connectionProfile, connectOptions);
```
`Note:` When using the filesystem checkpointer, use absolute paths rather than relative paths

Configuring a listener to be checkpointed required two properties:
1. `replay : boolean` - Tells the listener to record a checkpoint. Required if checkpointing is desired
2. `checkpointer : BaseCheckpointer` - If a checkpointer is not specified in the gateway, it must be specified here
```javascript
const listener = await contract.addContractListener('saleEventListener', 'sale', (err, event, blockNumber, txId) => {
if (err) {
console.error(err);
return;
}
// -- Do something
}, {replay: true, checkpointer: {factory: MyCheckpointer});
```

### Custom Checkpointer

Expand All @@ -59,25 +31,81 @@ Users can configure their own checkpointer. This requires two components to be c
2. The Factory

```javascript
class DbCheckpointer extends BaseCheckpointer {
constructor(channelName, listenerName, dbOptions) {
const fs = require('fs-extra');
const path = require('path');
const { Gateway } = require('fabric-network');

class FileSystemCheckpointer extends BaseCheckpointer {
constructor(channelName, listenerName, fsOptions) {
super(channelName, listenerName);
this.db = new Db(dbOptions);
this.basePath = path.resolve(fsOptions.basePath);
this.channelName = channelName;
this.listenerName = listenerName;
}

/**
* Initializes the checkpointer directory structure
*/
async _initialize() {
const cpPath = this._getCheckpointFileName()
}

async save(transactionId, blockNumber) { /* Your implementation using a database */ }
/**
* Constructs the checkpoint files name
*/
_getCheckpointFileName() {
let filePath = path.join(this._basePath, this._channelName);
if (this._chaincodeId) {
filePath = path.join(filePath, this._chaincodeId);
}
return path.join(filePath, this._listenerName);
}

async save(transactionId, blockNumber) {
const cpPath = this._getCheckpointFileName()
if (!(await fs.exists(cpPath))) {
await this._initialize();
}
const latestCheckpoint = await this.load();
if (Number(latestCheckpoint.blockNumber) === Number(blockNumber)) {
const transactionIds = latestCheckpoint.transactionIds;
latestCheckpoint.transactionIds = transactionIds;
} else {
latestCheckpoint.blockNumber = blockNumber;
latestCheckpoint.transactionIds = [transactionIds];
}
await fs.writeFile(cppPath, JSON.stringify(latestCheckpoint));
}

async load() { /* Your implementation using a database*/ }
async load() {
const cpPath = this._getCheckpointFileName(this._chaincodeId);
if (!(await fs.exists(cpPath))) {
await this._initialize();
}
const chkptBuffer = await fs.readFile(cpFile);
let checkpoint = checkpointBuffer.toString('utf8');
if (!checkpoint) {
checkpoint = {};
} else {
checkpoint = JSON.parse(checkpoint);
}
return checkpoint;
}
}

function BD_CHECKPOINTER_FACTORY(channelName, listenerName, options) {
return new DbCheckpointer(channelName, listenerName, options);
function File_SYSTEM_CHECKPOINTER_FACTORY(channelName, listenerName, options) {
return new FileSystemCheckpointer(channelName, listenerName, options);
}

const gateway = new Gateway();
await gateway.connect({
checkpointer: {
factory: DB_CHECKPOINTER_FACTORY,
options: {host: 'http://localhost'}
factory: FILE_SYSTEM_CHECKPOINTER_FACTORY,
options: {basePath: '/home/blockchain/checkpoints'} // These options will vary depending on the checkpointer implementation
});

```
`Note:` When using the filesystem checkpointer, use absolute paths rather than relative paths
When specifying a specific type of checkpointer for a listener, the `checkpointer` option in {@link module:fabric-network.Network~EventListenerOptions`}

46 changes: 39 additions & 7 deletions docs/tutorials/event-hub-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,56 @@ This tutorial describes how to define the behavior of the event hub selection st

The `ChannelEventHub` is a fabric-client class that receives contract, commit and block events from the event hub within a peer. The `fabric-network` abstracts the event hub away, and instead uses an event hub selection strategy to create new event hub instances or reuse existing instances.

#### Note
If you do not want the event hub strategy to manage event hubs for a listener, call `AbstractEventListener.setEventHub(eventHub: ChannelEventHub, isFixed: boolean)` and it will continue to use the same event hub

The interface for an event hub selection strategy is as follows:
Below is an example event hub selection strategy:

```javascript
class BaseEventHubSelectionStrategy {
class ExampleEventHubSelectionStrategy extends AbstractEventHubSelectionStrategy {

constructor(peers) {
this.peers = peers;
this.disconnectedPeers = [];

this.cleanupInterval = null;
}
_disconnectedPeerCleanup() {
this.cleanupInterval = setInterval(() => {
// Reset the list of disconnected peers every 10 seconds
for (const peerRecord of disconnectedPeers) {
// If 10 seconds has passed since the disconnect
if (Date.now() - peerRecord.time > 10000) {
this.disconnectedPeers = this.disconnectedPeers.filter((p) => p !== peerRecord.peer);
}
}

if (this.disconnectedPeers.length === 0) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
}, 10000);
}
/**
* Returns the next peer in the list per the strategy implementation
* @returns {ChannelPeer}
*/
getNextPeer() {
// Peer selection logic. Called whenever an event hub is required
// Only select those peers that have not been disconnected recently
let availablePeers = this.peers.filter((peer) => this.disconnectedPeers.indexOf(peer) === -1)
if (availablePeers.length === 0) {
availablePeers = this.peers;
}
const randomPeerIdx = Math.floor(Math.random() * availablePeers.length);
return availablePeers[randomPeerIdx];
}

/**
* Updates the status of a peers event hub
* @param {ChannelPeer} deadPeer The peer that needs its status updating
*/
updateEventHubAvailability(deadPeer) {
// Peer availability update logic. Called whenever the event hub disconnects.
if (!this.cleanupInterval) {
this._disconnectedPeerCleanup()
}
this.disconnectedPeers.push({peer: deadPeer, time: Date.now()})
}
}
```
Expand All @@ -44,3 +73,6 @@ await gateway.connect(connectionProfile, {
}
})
```

### Static event hub
Calling {@link module:fabric-network.AbstractEventListener#setEventHub} allows you to set one event hub that will not change. On unanticipated disconnect the SDK will attempt to reconnect to that event hub, rather than select the next peer using the event hub selection strategy.
49 changes: 33 additions & 16 deletions docs/tutorials/listening-to-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,38 @@ There are three event types that can be subscribed to:
2. Transaction (Commit) events - Those emitted automatically when a transaction is committed after an invoke
3. Block events - Those emitted automatically when a block is committed

Listening for these events allows the application to react without directly calling a transaction. This is ideal in use cases such as tracking network analytics.
Listening for these events allows the application to react without directly calling a transaction. This is ideal in use cases such as monitoring network analytics.

### Usage

Each listener type takes at least one parameter, the event callback. This is the function that is called when an event is detected. This callback is overriden by the `fabric-network` in order to support `Checkpointing`.
Each listener type takes at least one parameter, the event callback. This is the function that is called when an event is received.

The callback function given is expected to be a promise, meaning that the callback can perform asynchronous tasks without risking missing events.

### Options
{@link module:fabric-network.Network~EventListenerOptions}.

*Note*: Listeners will connect to event hubs and ask to receive _unfiltered_ events by default. To receive _filtered_ events, set `EventListenerOptions.filtered: true`.

### Naming

All event listeners (including CommitEventListeners, which use the transaction ID) must have a unique name at the `Network` level

#### Contract events

```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');
const network = await gateway.getNetwork('mychannel');
const contract = network.getContract('my-contract');

/**
* @param {String} listenerName the name of the event listener
* @param {String} eventName the name of the event being listened to
* @param {Function} callback the callback function with signature (error, event, blockNumber, transactionId, status)
* @param {Object} options
* @param {module:fabric-network.Network~EventListenerOptions} options
**/
const listener = await contract.addContractListener('my-contract-listener', 'sale', (error, event, blockNumber, transactionId, status) => {
const listener = await contract.addContractListener('my-contract-listener', 'sale', (err, event, blockNumber, transactionId, status) => {
if (err) {
console.error(err);
return;
Expand All @@ -42,30 +53,32 @@ Notice that there is no need to specify an event hub, as the `EventHubSelectionS
```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');
const network = await gateway.getNetwork('mychannel');

/**
* @param {String} listenerName the name of the event listener
* @param {Function} callback the callback function with signature (error, blockNumber, transactionId, status)
* @param {Object} options
* @param {module:fabric-network.Network~EventListenerOptions} options
**/
const listener = await network.addBlockListener('my-block-listener', (error, block) => {
if (err) {
console.error(err);
return;
}
console.log(`Block: ${block}`);
}, {filtered: true /*false*/})
})
```
When listening for block events, it is important to specify if you want a filtered or none filtered event, as this determines which event hub is compatible with the request.

#### Commit events

Option 1:
*Note*: The listener listener name is _transactionId_._\<some random string\>_

#### Option 1:
```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');
const network = await gateway.getNetwork('mychannel');
const contract = network.getContract('my-contract');

const transaction = contract.newTransaction('sell');
Expand All @@ -74,20 +87,20 @@ const transaction = contract.newTransaction('sell');
* @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber)
* @param {Object} options
**/
const listener = await network.addCommitListener(transaction.getTransactionID().getTransactionID(), (error, transactionId, status, blockNumber) => {
const listener = await network.addCommitListener(transaction.getTransactionID().getTransactionID(), (err, transactionId, status, blockNumber) => {
if (err) {
console.error(err);
return;
}
console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`);
}, {});
});
```

Option 2:
#### Option 2:
```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');
const network = await gateway.getNetwork('mychannel');
const contract = network.getContract('my-contract');

const transaction = contract.newTransaction('sell');
Expand All @@ -96,7 +109,7 @@ const transaction = contract.newTransaction('sell');
* @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber)
* @param {Object} options
**/
const listener = await transaction.addCommitListener((error, transactionId, status, blockNumber) => {
const listener = await transaction.addCommitListener((err, transactionId, status, blockNumber) => {
if (err) {
console.error(err);
return;
Expand All @@ -105,7 +118,11 @@ const listener = await transaction.addCommitListener((error, transactionId, stat
});
```

Both `Network.addCommitListener` and `Contract.addCommitListener` have an optional `eventHub` parameter. When set, the listener will only listen to that event hub, and in the event of an unforeseen disconnect, it will try and to reconnect without using the `EventHubSelectionStrategy`.

### Checkpointing
{@tutorial event-checkpointer}

### Unregistering listeners


`addContractListener`, `addBlockListener` and `addCommitListener` return a `ContractEventListener`, `BlockEventListener` and `CommitEventListener` respectively. Each has an `unregister()` function that removes the listener from the event hub, meaning no further events will be received from that listener until `register()` is called again
4 changes: 2 additions & 2 deletions fabric-network/lib/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ class Contract {
* @param {Function} callback - This callback will be triggered when
* a transaction commit event is emitted. It takes parameters
* of error, event payload, block number, transaction ID and status
* @param {RegistrationOptions} [options] - Optional. Options on
* @param {module:fabric-network.Network~ListenerOptions} [options] - Optional. Options on
* registrations allowing start and end block numbers.
* @param {ChannelEventHub} [eventHub] - Optional. Used to override the event hub selection
* @returns {CommitEventListener}
* @returns {module:fabric-network~CommitEventListener}
* @async
*/
async addContractListener(listenerName, eventName, callback, options) {
Expand Down
4 changes: 2 additions & 2 deletions fabric-network/lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ const logger = require('./logger').getLogger('Gateway');
* @typedef {Object} Gateway~DefaultEventHubSelectionFactory
* @memberof module:fabric-network
* @param {module:fabric-network.Network} network The network the event hub is being selected for
* @returns {module:fabric-network.Gateway~BaseEventHubSelectionStrategy}
* @returns {module:fabric-network.Gateway~AbstractEventHubSelectionStrategy}
*/

/**
* @typedef {Object} Gateway~BaseEventHubSelectionStrategy
* @typedef {Object} Gateway~AbstractEventHubSelectionStrategy
* @memberof module:fabric-network
* @property {Function} getNextPeer Function that returns the next peer in the list of available peers
* @property {Function} updateEventHubAvailability Function that updates the availability of an event hub
Expand Down
4 changes: 2 additions & 2 deletions fabric-network/lib/impl/event/abstracteventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class AbstractEventListener {
/**
* Constructor
* @param {module:fabric-network.Network} network The network
* @param {string} listenerName The name of the listener being created
* @param {string} listenerName a unique name identifying the listener.
* @param {function} eventCallback The function called when the event is triggered.
* It has signature (err, ...args) where args changes depending on the event type
* @param {module:fabric-network.Network~ListenerOptions} options Event handler options
Expand Down Expand Up @@ -113,7 +113,7 @@ class AbstractEventListener {
}

/**
* @param {module:fabric-client.ChannelEventHub} eventhub Event hub.
* @param {ChannelEventHub} eventHub Event hub.
* @param {boolean} isFixed If true only this peers event hub will be used
*/
setEventHub(eventHub, isFixed) {
Expand Down
2 changes: 1 addition & 1 deletion fabric-network/lib/impl/event/blockeventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BlockEventListener extends AbstractEventListener {
/**
*
* @param {module:fabric-network.Network} network The fabric network
* @param {string} listenerName the name of the listener being created
* @param {string} listenerName a unique name identifying the listener
* @param {Function} eventCallback The event callback called when a transaction is committed.
* It has signature (err, block)
* @param {module:fabric-network.Network~ListenerOptions} options
Expand Down
Loading

0 comments on commit 0b885e1

Please sign in to comment.