Skip to content

Commit

Permalink
FABN-1486: Update sample transaction event handler (#136)
Browse files Browse the repository at this point in the history
Also update tutorial page describing the use of event handlers.

Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday authored Feb 18, 2020
1 parent ef36a91 commit 8f3ca2e
Show file tree
Hide file tree
Showing 16 changed files with 253 additions and 167 deletions.
54 changes: 32 additions & 22 deletions docs/tutorials/transaction-commit-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ being queried.

The SDK provides several selectable strategies for how it should wait for
commit events following a transaction invocation. The available strategies
are defined in `EventHandlerStrategies`. The desired strategy is
are defined in `DefaultEventHandlerStrategies`. The desired strategy is
(optionally) specified as an argument to `connect()` on the `Gateway`, and
is used for all transaction invocations on Contracts obtained from that
Gateway instance.
Expand All @@ -37,11 +37,11 @@ If no event handling strategy is specified, `MSPID_SCOPE_ALLFORTX` is used
by default.

```javascript
const { Gateway, EventHandlerStrategies } = require('fabric-network');
import { Gateway, GatewayOptions, DefaultEventHandlerStrategies } from 'fabric-network';

const connectOptions = {
const connectOptions: GatewayOptions = {
transaction: {
strategy: EventHandlerStrategies.MSPID_SCOPE_ALLFORTX
strategy: DefaultEventHandlerStrategies.MSPID_SCOPE_ALLFORTX
}
}

Expand All @@ -52,7 +52,10 @@ await gateway.connect(connectionProfile, connectOptions);
Specifying `null` as the event handling strategy will cause transaction
invocations to return immediately after successfully sending the endorsed
transaction to the orderer. It will not wait for any commit events to be
received from peers. For more details on *Event Handling Options*, see [TransactionOptions](module-fabric-network.Gateway.html#~TransactionOptions__anchor).
received from peers.

For more details on *Event Handling Options*, see
[TransactionOptions](module-fabric-network.Gateway.html#~TransactionOptions).

### Plug-in event handlers

Expand All @@ -61,18 +64,23 @@ strategies, it is possible to implement your own event handling. This is
achieved by specifying your own factory function as the event handling
strategy. The factory function should return a *transaction event handler*
object and take two parameters:
1. transaction: `Transaction`
2. options: `any`
1. transactionId: `string`
2. network: `Network`

From the Transaction instance get the Network instance to provides access to peers and event services from which events will be recieved.
The Network instance provides access to an underlying Channel object, from
which endorsing peers can be obtained.

```javascript
function createTransactionEventHandler(transaction, options) {
/* Your implementation here */
return new MyTransactionEventHandler(transaction, options);
import { Gateway, GatewayOptions, TxEventHandlerFactory } from 'fabric-network';

const createTransactionEventHandler: TxEventHandlerFactory = (transactionId, network) => {
/* Your implementation here */
const mspId = network.getGateway().getIdentity().mspId;
const myOrgPeers = network.getChannel().getEndorsers(mspId);
return new MyTransactionEventHandler(transactionId, network, myOrgPeers);
}

const connectOptions = {
const connectOptions: GatewayOptions = {
transaction: {
strategy: createTransactionEventhandler
}
Expand All @@ -82,23 +90,20 @@ const gateway = new Gateway();
await gateway.connect(connectionProfile, connectOptions);
```

For more details on *Event Handling Options*, see [TransactionOptions](module-fabric-network.Gateway.html#~TransactionOptions__anchor).

The *transaction event handler* object returned must implement the following functions.
The *transaction event handler* object returned must implement the following
functions:

```javascript
class MyTransactionEventHandler {
import { TxEventHandler } from 'fabric-network';

class MyTransactionEventHandler implements TxEventHandler {
/**
* Called to initiate listening for transaction events.
* @async
* @throws {Error} if not in a state where the handling strategy can be satified and the transaction should
* be aborted. For example, if insufficient event hubs are available.
*/
async startListening() { /* Your implementation here */ }

/**
* Wait until enough events have been received from the event hubs to satisfy the event handling strategy.
* @async
* Wait until enough events have been received from peers to satisfy the event handling strategy.
* @throws {Error} if the transaction commit is not successfully confirmed.
*/
async waitForEvents() { /* Your implementation here */ }
Expand All @@ -110,4 +115,9 @@ class MyTransactionEventHandler {
}
```

For a complete sample plug-in event handler implementation, see [sample-transaction-event-handler.ts](https://github.com/hyperledger/fabric-sdk-node/blob/master/test/ts-scenario/config/handlers/sample-transaction-event-handler.ts).
The *transaction event handler* implementation will typically use a commit
listener to monitor commit events from endorsing peers by calling
[Network.addCommitListener](module-fabric-network.Network.html#addCommitListener).

For a complete sample plug-in event handler implementation, see
[sample-transaction-event-handler.ts](https://github.com/hyperledger/fabric-sdk-node/blob/master/test/ts-scenario/config/handlers/sample-transaction-event-handler.ts).
19 changes: 14 additions & 5 deletions fabric-network/src/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class Gateway {
this.wallet = null;
this.identityContext = null;
this.networks = new Map();
this.identity = null;

// initial options - override with the connect()
this.options = {
Expand Down Expand Up @@ -195,15 +196,15 @@ class Gateway {
// setup an initial identity for the Gateway
if (options.identity) {
logger.debug('%s - setting identity', method);
const identity = await this._getIdentity(options.identity);
const provider = options.wallet.getProviderRegistry().getProvider(identity.type);
const user = await provider.getUserContext(identity, options.identity);
this.identity = await this._getWalletIdentity(options.identity);
const provider = options.wallet.getProviderRegistry().getProvider(this.identity.type);
const user = await provider.getUserContext(this.identity, options.identity);
this.identityContext = this.client.newIdentityContext(user);
}

if (options.clientTlsIdentity) {
logger.debug('%s - setting tlsIdentity', method);
const tlsIdentity = await this._getIdentity(options.clientTlsIdentity);
const tlsIdentity = await this._getWalletIdentity(options.clientTlsIdentity);
this.client.setTlsClientCertAndKey(tlsIdentity.credentials.certificate, tlsIdentity.credentials.privateKey);
} else if (options.tlsInfo) {
logger.debug('%s - setting tlsInfo', method);
Expand Down Expand Up @@ -232,14 +233,22 @@ class Gateway {
logger.debug('%s - end', method);
}

async _getIdentity(label) {
async _getWalletIdentity(label) {
const identity = await this.options.wallet.get(label);
if (!identity) {
throw new Error(`Identity not found in wallet: ${label}`);
}
return identity;
}

/**
* Get the identity associated with the gateway connection.
* @returns {module:fabric-network.Identity} An identity.
*/
getIdentity() {
return this.identity;
}

/**
* Returns the set of options associated with the Gateway connection
* @returns {module:fabric-network.Gateway~GatewayOptions} The Gateway connection options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import Network = require('../../network');
import { Endorser } from 'fabric-common';

function getOrganizationPeers(network: Network): Endorser[] {
return network.channel.getEndorsers(network.mspid);
const mspId = network.getGateway().getIdentity().mspId;
return network.getChannel().getEndorsers(mspId);
}

function getNetworkPeers(network: Network): Endorser[] {
return network.channel.getEndorsers();
return network.getChannel().getEndorsers();
}

/**
Expand Down
10 changes: 6 additions & 4 deletions fabric-network/src/impl/event/eventservicemanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class EventServiceManager {
logger.debug('%s - start', method);

this.network = network;
this.identityContext = this.network.gateway.identityContext;
this.channel = network.getChannel();
this.identityContext = this.network.getGateway().identityContext;

// wait to build the list of peers until someone needs an event service
// hopefully by then we have loaded the network (channel) with peers
Expand Down Expand Up @@ -179,7 +180,7 @@ class EventServiceManager {

const eventers = [];
for (const peer of peers) {
const eventer = this.network.channel.client.newEventer(peer.name);
const eventer = this.channel.client.newEventer(peer.name);
eventer.setEndpoint(peer.endpoint);
eventers.push(eventer);
logger.debug('%s - built new eventer %s', method, eventer.name);
Expand All @@ -188,7 +189,7 @@ class EventServiceManager {
if (!name) {
name = eventers[0].name;
}
const eventService = this.network.channel.newEventService(name);
const eventService = this.channel.newEventService(name);
logger.debug('%s - setting targets for eventService %s', method, eventService.name);
eventService.setTargets(eventers);

Expand Down Expand Up @@ -255,7 +256,8 @@ class RoundRobinPeerPool {
* @param {Endorser[]} peers The list of peers that the strategy can choose from
*/
constructor(network) {
const peers = network.channel.getEndorsers(network.mspid);
const mspId = network.getGateway().getIdentity().mspId;
const peers = network.getChannel().getEndorsers(mspId);
if (!peers || peers.length === 0) {
throw Error('No peers available');
}
Expand Down
64 changes: 27 additions & 37 deletions fabric-network/src/impl/event/transactioneventhandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ export class TransactionEventHandler implements TxEventHandler {
private readonly options: any;
private readonly peers: Endorser[];
private readonly notificationPromise: Promise<void>;
private readonly respondedPeers = new Set<Endorser>();
private readonly unrespondedPeers: Set<Endorser>;
private readonly listener: CommitListener = this.eventCallback.bind(this);
private readonly strategySuccessCallback = this.strategySuccess.bind(this);
private readonly strategyFailCallback = this.strategyFail.bind(this);
private resolveNotificationPromise!: () => void;
private rejectNotificationPromise!: (reason: Error) => void;
private timeoutHandler?: NodeJS.Timeout;
private listener: CommitListener;

/**
* Constructor.
Expand All @@ -66,24 +68,17 @@ export class TransactionEventHandler implements TxEventHandler {
const defaultOptions: any = {
commitTimeout: 30
};
this.options = Object.assign(defaultOptions, network.gateway.getOptions().transaction);
this.options = Object.assign(defaultOptions, network.getGateway().getOptions().transaction);

logger.debug('%s: transactionId = %s, options = %j', method, this.transactionId, this.options);

this.peers = strategy.getPeers();
this.unrespondedPeers = new Set(this.peers);

this.notificationPromise = new Promise((resolve, reject) => {
this.resolveNotificationPromise = resolve;
this.rejectNotificationPromise = reject;
});

this.listener = (error, event) => {
if (error) {
this.onError(error);
} else {
this.onEvent(event!);
}
};
}

/**
Expand Down Expand Up @@ -125,6 +120,25 @@ export class TransactionEventHandler implements TxEventHandler {
this.network.removeCommitListener(this.listener);
}

private eventCallback(error?: CommitError, event?: CommitEvent) {
if (event && event.status !== 'VALID') {
const message = `Commit of transaction ${this.transactionId} failed on peer ${event.peer.name} with status ${event.status}`;
this.strategyFail(new Error(message));
}

const peer = error?.peer || event!.peer;
if (!this.unrespondedPeers.delete(peer)) {
// Already seen a response from this peer
return;
}

if (error) {
this.strategy.errorReceived(this.strategySuccessCallback, this.strategyFailCallback);
} else {
this.strategy.eventReceived(this.strategySuccessCallback, this.strategyFailCallback);
}
}

private setListenTimeout() {
const method = 'setListenTimeout';

Expand All @@ -145,41 +159,17 @@ export class TransactionEventHandler implements TxEventHandler {
}

private timeoutFail() {
const unrespondedEventServices = this.peers
.filter((peer) => !this.respondedPeers.has(peer))
const unrespondedPeerNames = Array.from(this.unrespondedPeers)
.map((peer) => peer.name)
.join(', ');
const errorInfo = {
message: 'Event strategy not satisfied within the timeout period. No response received from peers: ' + unrespondedEventServices,
message: 'Event strategy not satisfied within the timeout period. No response received from peers: ' + unrespondedPeerNames,
transactionId: this.transactionId
};
const error = new TimeoutError(errorInfo);
this.strategyFail(error);
}

private onEvent(event: CommitEvent) {
logger.debug(`onEvent: received event for ${event.transactionId} from peer ${event.peer.name} with status ${event.status}`);

this.receivedEventServiceResponse(event.peer);
if (event.status !== 'VALID') {
const message = `Commit of transaction ${this.transactionId} failed on peer ${event.peer.name} with status ${event.status}`;
this.strategyFail(new Error(message));
} else {
this.strategy.eventReceived(this.strategySuccess.bind(this), this.strategyFail.bind(this));
}
}

private onError(error: CommitError) {
logger.debug('onError: received error from peer %s: %s', error.peer.name, error);

this.receivedEventServiceResponse(error.peer);
this.strategy.errorReceived(this.strategySuccess.bind(this), this.strategyFail.bind(this));
}

private receivedEventServiceResponse(peer: Endorser) {
this.respondedPeers.add(peer);
}

/**
* Callback for the strategy to indicate successful commit of the transaction.
* @private
Expand Down
3 changes: 2 additions & 1 deletion fabric-network/src/impl/query/queryhandlerstrategies.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const SingleQueryHandler = require('./singlequeryhandler');
const RoundRobinQueryHandler = require('./roundrobinqueryhandler');

function getOrganizationPeers(network) {
return network.channel.getEndorsers(network.mspid);
const mspId = network.getGateway().getIdentity().mspId;
return network.getChannel().getEndorsers(mspId);
}

function MSPID_SCOPE_SINGLE(network) {
Expand Down
22 changes: 19 additions & 3 deletions fabric-network/src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class Network {
logger.debug('%s - start', method);

this.gateway = gateway;
this.mspid = gateway.identityContext.mspid;
this.channel = channel;
this.contracts = new Map();
this.initialized = false;
Expand Down Expand Up @@ -109,10 +108,11 @@ class Network {
logger.debug('%s - user has not specified discovery targets, check channel and client', method);

// maybe the channel has connected endorsers with the mspid
targets = this.channel.getEndorsers(this.mspid);
const mspId = this.gateway.getIdentity().mspId;
targets = this.channel.getEndorsers(mspId);
if (!targets || targets.length < 1) {
// then check the client for connected peers associated with the mspid
targets = this.channel.client.getEndorsers(this.mspid);
targets = this.channel.client.getEndorsers(mspId);
}
if (!targets || targets.length < 1) {
// get any peer
Expand Down Expand Up @@ -180,6 +180,14 @@ class Network {
logger.debug('%s - end', method);
}

/**
* Get the owning Gateway connection.
* @returns {module:fabric-network.Gateway} A Gateway.
*/
getGateway() {
return this.gateway;
}

/**
* Get an instance of a contract (chaincode) on the current network.
* @param {string} chaincodeId - the chaincode identifier.
Expand Down Expand Up @@ -208,6 +216,14 @@ class Network {
return contract;
}

/**
* Get the underlying channel object representation of this network.
* @returns {Channel} A channel.
*/
getChannel() {
return this.channel;
}

_dispose() {
const method = '_dispose';
logger.debug('%s - start', method);
Expand Down
Loading

0 comments on commit 8f3ca2e

Please sign in to comment.