Skip to content

Commit

Permalink
FABN-910 Connect EventHubs up front in network api
Browse files Browse the repository at this point in the history
Connect to the event hubs as early as possible when creating a
network object to optimize the performance of the
submitTransaction method when it is later called.

Also disconnect the event hubs when the gateway object is
disconnected to cleanup resources

Change-Id: If271dd95bfee2e2893dbabc2acec33ca968232f5
Signed-off-by: andrew-coleman <[email protected]>
  • Loading branch information
andrew-coleman committed Sep 11, 2018
1 parent 399bd25 commit 828c718
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 101 deletions.
2 changes: 1 addition & 1 deletion fabric-network/lib/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Contract {

const txId = this.gateway.getClient().newTransactionID();
// createTxEventHandler() will return null if no event handler is requested
const eventHandler = this.eventHandlerFactory.createTxEventHandler(txId.getTransactionID());
const eventHandler = this.eventHandlerFactory ? this.eventHandlerFactory.createTxEventHandler(txId.getTransactionID()) : null;

// Submit the transaction to the endorsers.
const request = {
Expand Down
1 change: 1 addition & 0 deletions fabric-network/lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class Gateway {
}

Gateway._mergeOptions(this.options, options);
logger.debug('connection options: %O', options);

if (!(config instanceof Client)) {
// still use a ccp for the discovery peer and ca information
Expand Down
87 changes: 87 additions & 0 deletions fabric-network/lib/impl/event/defaulteventhandlermanager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright 2018 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

'use strict';

const EventHandlerStrategies = require('./defaulteventhandlerstrategies');
const TransactionEventHandler = require('./transactioneventhandler');
const EventHubFactory = require('./eventhubfactory');
const logger = require('../../logger').getLogger('DefaultEventHandlerManager');

class DefaultEventHandlerManager {

constructor(network, mspId, options) {
this.network = network;
this.channel = network.channel;
this.peerMap = network.peerMap;
this.options = options;
this.mspId = mspId;

if (!this.options.strategy) {
this.options.strategy = EventHandlerStrategies.MSPID_SCOPE_ALLFORTX;
}

logger.debug('constructor: mspId = %s, options = %O', mspId, this.options);
}

async initialize() {
this.availableEventHubs = [];
if (!this.initialized) {
this.useFullBlocks = this.options.useFullBlocks || this.options.chaincodeEventsEnabled;
if (this.useFullBlocks === null || this.useFullBlocks === undefined) {
this.useFullBlocks = false;
}

logger.debug('initialize: useFullBlocks = %s', this.useFullBlocks);

const eventHubFactory = new EventHubFactory(this.channel);
this.eventStrategy = this.options.strategy(eventHubFactory, this.network, this.mspId);
this.availableEventHubs = await this.eventStrategy.getConnectedEventHubs();

this.initialized = true;

logger.debug('initialize: useFullBlocks = %j, availableEventHubs = %O', this.useFullBlocks, this.availableEventHubs);
}
}

dispose() {
logger.debug('dispose');
this.disconnectEventHubs();
this.availableEventHubs = [];
this.initialized = false;
}

getEventHubs() {
return this.availableEventHubs;
}

disconnectEventHubs() {
for (const hub of this.availableEventHubs) {
try {
hub.disconnect();
} catch (error) {
//
}
}
}

/**
* create an Tx Event handler for the specific txid
*
* @param {*} txid
* @returns
* @memberof DefaultEventHandlerFactory
*/
createTxEventHandler(txid) {
logger.debug('createTxEventHandler: txid = %s', txid);
// pass in all available eventHubs to listen on, the handler decides when to resolve based on strategy
// a TxEventHandler should check that the available ones are usable when appropriate.
return new TransactionEventHandler(this, txid);
}

}

module.exports = DefaultEventHandlerManager;
26 changes: 10 additions & 16 deletions fabric-network/lib/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,21 @@ class TransactionEventHandler {
/**
* Constructor.
* @private
* @param {DefaultEventHandlerManager} manager Event handler manager
* @param {String} transactionId Transaction ID.
* @param {Object} strategy Event strategy implementation.
* @param {TransactionEventHandlerOptions} [options] Additional options.
*/
constructor(transactionId, strategy, options) {
constructor(manager, transactionId) {
this.transactionId = transactionId;
this.strategy = strategy;
this.strategy = manager.eventStrategy;

const defaultOptions = {
timeout: 0 // No timeout by default
commitTimeout: 0 // No timeout by default
};
this.options = Object.assign(defaultOptions, options);
this.options = Object.assign(defaultOptions, manager.options);

logger.debug('constructor:', util.format('transactionId = %s, options = %O', this.transactionId, this.options));

this.eventHubs = [];
// Kick off the async connection of event hubs as early as possible
this.eventHubsConnectionPromise = strategy.getConnectedEventHubs().then((eventHubs) => {
this.eventHubs = eventHubs;
});
this.eventHubs = manager.getEventHubs();

this.notificationPromise = new Promise((resolve, reject) => {
this._txResolve = resolve;
Expand All @@ -59,7 +54,6 @@ class TransactionEventHandler {
* @async
*/
async startListening() {
await this.eventHubsConnectionPromise;
this._setListenTimeout();

for (const eventHub of this.eventHubs) {
Expand All @@ -72,15 +66,15 @@ class TransactionEventHandler {
}

_setListenTimeout() {
if (this.options.timeout <= 0) {
if (this.options.commitTimeout <= 0) {
return;
}

logger.debug('_setListenTimeout:', `setTimeout(${this.options.timeout}) for transaction ${this.transactionId}`);
logger.debug('_setListenTimeout:', `setTimeout(${this.options.commitTimeout}) for transaction ${this.transactionId}`);

this.timeoutHandler = setTimeout(() => {
this._strategyFail(new Error('Event strategy not satisified within the timeout period'));
}, this.options.timeout * 1000);
this._strategyFail(new Error('Event strategy not satisfied within the timeout period'));
}, this.options.commitTimeout * 1000);
}

_onEvent(eventHub, txId, code) {
Expand Down
43 changes: 23 additions & 20 deletions fabric-network/lib/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
const FabricConstants = require('fabric-client/lib/Constants');
const Contract = require('./contract');
const logger = require('./logger').getLogger('Network');
const EventHubFactory = require('./impl/event/eventhubfactory');
const TransactionEventHandler = require('./impl/event/transactioneventhandler');
const DefaultEventHandlerManager = require('./impl/event/defaulteventhandlermanager');
const util = require('util');

class Network {
Expand All @@ -25,25 +24,8 @@ class Network {

this.gateway = gateway;
this.channel = channel;

this.eventHandlerFactory = {
createTxEventHandler: () => null
};
const createEventStrategyFn = gateway.getOptions().eventHandlerOptions.strategy;
if (createEventStrategyFn) {
const self = this;
const eventHubFactory = new EventHubFactory(channel);
const mspId = gateway.getCurrentIdentity()._mspId;
const commitTimeout = gateway.getOptions().eventHandlerOptions.commitTimeout;
this.eventHandlerFactory.createTxEventHandler = (txId) => {
const eventStrategy = createEventStrategyFn(eventHubFactory, self, mspId);
return new TransactionEventHandler(txId, eventStrategy, { timeout: commitTimeout });
};
}

this.contracts = new Map();
this.initialized = false;
this.queryHandler;
}

/**
Expand Down Expand Up @@ -140,6 +122,7 @@ class Network {

await this._initializeInternalChannel();
this.peerMap = this._mapPeersToMSPid();
this.eventHandlerManager = await this._createEventHandlerManager();
this.queryHandler = await this.gateway._createQueryHandler(this.channel, this.peerMap);
this.initialized = true;
}
Expand Down Expand Up @@ -175,20 +158,40 @@ class Network {
chaincodeId,
this.gateway,
this.queryHandler,
this.eventHandlerFactory
this.eventHandlerManager
);
this.contracts.set(chaincodeId, contract);
}
return contract;
}

async _createEventHandlerManager() {
const createEventStrategyFn = this.gateway.getOptions().eventHandlerOptions.strategy;
if (createEventStrategyFn) {
const currentmspId = this.gateway.getCurrentIdentity()._mspId;
const eventHandlerManager = new DefaultEventHandlerManager(
this,
currentmspId,
this.gateway.getOptions().eventHandlerOptions
);
await eventHandlerManager.initialize();
return eventHandlerManager;
}
return null;

}

_dispose() {
logger.debug('in _dispose');

// Danger as this cached in gateway, and also async so how would
// network._dispose() followed by network.initialize() be safe ?
// make this private is the safest option.
this.contracts.clear();
if (this.eventHandlerManager) {
this.eventHandlerManager.dispose();
}

if (this.queryHandler) {
this.queryHandler.dispose();
}
Expand Down
31 changes: 30 additions & 1 deletion fabric-network/test/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,35 @@ describe('Contract', () => {
});
});

it('should submit an invoke request to the chaincode which does return data', () => {
it('should submit an invoke request to the chaincode which does not return data', () => {
const proposalResponses = [{
response: {
status: 200
}
}];
const proposal = { proposal: 'i do' };
const header = { header: 'gooooal' };
mockChannel.sendTransactionProposal.resolves([ proposalResponses, proposal, header ]);
// This is the commit proposal and response (from the orderer).
const response = {
status: 'SUCCESS'
};
mockChannel.sendTransaction.withArgs({ proposalResponses: proposalResponses, proposal: proposal }).resolves(response);
return contract.submitTransaction('myfunc', 'arg1', 'arg2')
.then((result) => {
should.equal(result, null);
sinon.assert.calledOnce(mockChannel.sendTransactionProposal);
sinon.assert.calledWith(mockChannel.sendTransactionProposal, {
chaincodeId: 'someid',
txId: mockTransactionID,
fcn: 'myfunc',
args: ['arg1', 'arg2']
});
sinon.assert.calledOnce(mockChannel.sendTransaction);
});
});

it('should submit an invoke request to the chaincode with no event handler', () => {
const proposalResponses = [{
response: {
status: 200,
Expand All @@ -208,6 +236,7 @@ describe('Contract', () => {
status: 'SUCCESS'
};
mockChannel.sendTransaction.withArgs({ proposalResponses: proposalResponses, proposal: proposal }).resolves(response);
contract.eventHandlerFactory = null;
return contract.submitTransaction('myfunc', 'arg1', 'arg2')
.then((result) => {
result.should.equal('hello world');
Expand Down
3 changes: 2 additions & 1 deletion fabric-network/test/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ describe('Gateway', () => {
mockPeer1.isInRole.withArgs(FABRIC_CONSTANTS.NetworkConfig.LEDGER_QUERY_ROLE).returns(true);
const peerArray = [mockPeer1];
mockInternalChannel.getPeers.returns(peerArray);
mockInternalChannel.getChannelEventHub.returns({isconnected: () => true, getName: () => 'myeventhub'});
});

describe('#getNetwork', () => {
Expand All @@ -423,7 +424,7 @@ describe('Gateway', () => {

it('should create a non-existent network object', async () => {
mockClient.getChannel.withArgs('bar').returns(mockInternalChannel);
gateway.getCurrentIdentity = sinon.stub().returns({ _mspId: 'MSP_ID' });
gateway.getCurrentIdentity = sinon.stub().returns({ _mspId: 'MSP01' });

const network2 = await gateway.getNetwork('bar');
network2.should.be.instanceof(Network);
Expand Down
Loading

0 comments on commit 828c718

Please sign in to comment.