Skip to content

Commit

Permalink
Merge "FABN-951: Fix event handling concurreny issue"
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew-coleman authored and Gerrit Code Review committed Oct 9, 2018
2 parents 7ea726a + 3243d67 commit 55f8ccc
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 209 deletions.
18 changes: 10 additions & 8 deletions fabric-network/lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,18 @@ class Gateway {
*/
async getNetwork(networkName) {
logger.debug('in getNetwork');

const existingNetwork = this.networks.get(networkName);
if (!existingNetwork) {
logger.debug('getNetwork: create network object and initialize');
const channel = this.client.getChannel(networkName);
const newNetwork = new Network(this, channel);
await newNetwork._initialize();
this.networks.set(networkName, newNetwork);
return newNetwork;
if (existingNetwork) {
return existingNetwork;
}
return existingNetwork;

logger.debug('getNetwork: create network object and initialize');
const channel = this.client.getChannel(networkName);
const newNetwork = new Network(this, channel);
await newNetwork._initialize();
this.networks.set(networkName, newNetwork);
return newNetwork;
}

async _createQueryHandler(channel, peerMap) {
Expand Down
5 changes: 0 additions & 5 deletions fabric-network/lib/impl/event/abstracteventstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ class AbstractEventStrategy {
checkCompletion(counts, successFn, failFn) {
throw new Error('AbstractEventStrategy.checkCompletion() not implemented');
}

reset() {
this.counts.success = 0;
this.counts.fail = 0;
}
}

module.exports = AbstractEventStrategy;
69 changes: 17 additions & 52 deletions fabric-network/lib/impl/event/defaulteventhandlermanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,51 @@ const logger = require('fabric-network/lib/logger').getLogger('DefaultEventHandl
class DefaultEventHandlerManager {
/**
* @typedef {Object} EventHandlerOptions
* @property {Number} [timeout = 0] Number of seconds to wait for transaction completion. A value of zero indicates
* that the handler should wait indefinitely.
* @property {Function} [strategy = EventHandlerStrategies.MSPID_SCOPE_ALLFORTX] Event strategy factory.
* @property {Number} [commitTimeout = 0] Number of seconds to wait for transaction completion. A value of zero
* indicates that the handler should wait indefinitely.
*/

/**
/**
* Constructor.
* @param {Network} network Network on which events will be processed.
* @param {String} mspId Member Services Provider identifier.
* @param {EventHandlerOptions} options Additional options for event handling behaviour.
*/
constructor(network, mspId, options) {
this.network = network;
this.channel = network.channel;
this.peerMap = network.peerMap;
this.options = options;
this.eventHubFactory = new EventHubFactory(network.getChannel());
this.mspId = mspId;

if (!this.options.strategy) {
this.options.strategy = EventHandlerStrategies.MSPID_SCOPE_ALLFORTX;
}
const defaultOptions = {
strategy: EventHandlerStrategies.MSPID_SCOPE_ALLFORTX
};
this.options = Object.assign(defaultOptions, options);

logger.debug('constructor: mspId = %s, options = %O', mspId, this.options);
}

async initialize() {
this.availableEventHubs = [];
if (!this.initialized) {
this.useFullBlocks = this.options.useFullBlocks || this.options.chaincodeEventsEnabled;
if (this.useFullBlocks === null || this.useFullBlocks === undefined) {
this.useFullBlocks = false;
}

logger.debug('initialize: useFullBlocks = %s', this.useFullBlocks);

const eventHubFactory = new EventHubFactory(this.channel);
this.eventStrategy = this.options.strategy(eventHubFactory, this.network, this.mspId);
this.availableEventHubs = await this.eventStrategy.getConnectedEventHubs();

this.initialized = true;

logger.debug('initialize: useFullBlocks = %j, availableEventHubs = %O', this.useFullBlocks, this.availableEventHubs);
}
}

dispose() {
logger.debug('dispose');
this.disconnectEventHubs();
this.availableEventHubs = [];
this.initialized = false;
}

getEventHubs() {
return this.availableEventHubs;
}

disconnectEventHubs() {
for (const hub of this.availableEventHubs) {
try {
hub.disconnect();
} catch (error) {
//
}
const strategy = this.options.strategy(this.eventHubFactory, this.network, this.mspId);
try {
await strategy.getConnectedEventHubs();
} catch (error) {
logger.debug('initialize:', error);
}
}

/**
* create an Tx Event handler for the specific txid
*
* @param {*} txid
* @param {String} txid
* @returns The transaction event handler
* @memberof DefaultEventHandlerFactory
*/
createTxEventHandler(txid) {
logger.debug('createTxEventHandler: txid = %s', txid);
// pass in all available eventHubs to listen on, the handler decides when to resolve based on strategy
// a TxEventHandler should check that the available ones are usable when appropriate.
this.eventStrategy.reset();
return new TransactionEventHandler(this, txid);
const strategy = this.options.strategy(this.eventHubFactory, this.network, this.mspId);
return new TransactionEventHandler(txid, strategy, this.options);
}

}

module.exports = DefaultEventHandlerManager;
20 changes: 14 additions & 6 deletions fabric-network/lib/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,31 @@ const util = require('util');
* @class
*/
class TransactionEventHandler {
/**
* @typedef {Object} TransactionEventHandlerOptions
* @property {Number} [commitTimeout = 0] Number of seconds to wait for transaction completion. A value of zero
* indicates that the handler should wait indefinitely.
*/

/**
* Constructor.
* @private
* @param {DefaultEventHandlerManager} manager Event handler manager
* @param {String} transactionId Transaction ID.
* @param {Object} strategy Event strategy implementation.
* @param {TransactionEventHandlerOptions} [options] Additional options.
*/
constructor(manager, transactionId) {
constructor(transactionId, strategy, options) {
this.transactionId = transactionId;
this.strategy = manager.eventStrategy;
this.strategy = strategy;

const defaultOptions = {
commitTimeout: 0 // No timeout by default
};
this.options = Object.assign(defaultOptions, manager.options);
this.options = Object.assign(defaultOptions, options);

logger.debug('constructor:', util.format('transactionId = %s, options = %O', this.transactionId, this.options));

this.eventHubs = manager.getEventHubs();
this.eventHubs = [];
this.respondedEventHubs = new Set();

this.notificationPromise = new Promise((resolve, reject) => {
Expand All @@ -49,9 +56,10 @@ class TransactionEventHandler {
* @async
*/
async startListening() {
this.eventHubs = await this.strategy.getConnectedEventHubs();
if (this.eventHubs.length > 0) {
this._setListenTimeout();
this._registerTxEventListeners();
this._setListenTimeout();
} else {
logger.debug('startListening: No event hubs');
this._txResolve();
Expand Down
6 changes: 3 additions & 3 deletions fabric-network/lib/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ class Network {
// network._dispose() followed by network.initialize() be safe ?
// make this private is the safest option.
this.contracts.clear();
if (this.eventHandlerManager) {
this.eventHandlerManager.dispose();
}

if (this.queryHandler) {
this.queryHandler.dispose();
}

this.channel.close(); // Tidies up event hubs obtained from the channel

this.initialized = false;
}

Expand Down
5 changes: 2 additions & 3 deletions fabric-network/test/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const TransactionEventHandler = require('../lib/impl/event/transactioneventhandl

describe('Contract', () => {

const sandbox = sinon.createSandbox();
let clock;

let mockChannel, mockClient, mockUser, mockGateway;
Expand Down Expand Up @@ -67,7 +66,7 @@ describe('Contract', () => {
});

afterEach(() => {
sandbox.restore();
sinon.restore();
clock.restore();
});

Expand Down Expand Up @@ -150,7 +149,7 @@ describe('Contract', () => {


beforeEach(() => {
sandbox.stub(contract, '_validatePeerResponses').returns({validResponses: validResponses});
sinon.stub(contract, '_validatePeerResponses').returns({validResponses: validResponses});
});

it('should throw if functionName not specified', () => {
Expand Down
8 changes: 3 additions & 5 deletions fabric-network/test/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const EventStrategies = require('../lib/impl/event/defaulteventhandlerstrategies


describe('Gateway', () => {
const sandbox = sinon.createSandbox();

let mockClient;
let mockDefaultQueryHandler;

Expand All @@ -46,7 +44,7 @@ describe('Gateway', () => {
});

afterEach(() => {
sandbox.restore();
sinon.restore();
});


Expand Down Expand Up @@ -165,7 +163,7 @@ describe('Gateway', () => {
beforeEach(() => {
gateway = new Gateway();
mockWallet = sinon.createStubInstance(Wallet);
sandbox.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
sinon.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
mockWallet.setUserContext.withArgs(mockClient, 'admin').returns('foo');
});

Expand Down Expand Up @@ -357,7 +355,7 @@ describe('Gateway', () => {
beforeEach(async () => {
gateway = new Gateway();
mockWallet = sinon.createStubInstance(Wallet);
sandbox.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
sinon.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
mockWallet.setUserContext.withArgs(mockClient, 'admin').returns('foo');
const options = {
wallet: mockWallet,
Expand Down
Loading

0 comments on commit 55f8ccc

Please sign in to comment.