Skip to content

Commit

Permalink
FABN-951: Fix event handling concurreny issue
Browse files Browse the repository at this point in the history
Event handling strategy instances have transaction-specific state
so a separate instance must be used for each transaction. They
cannot be shared between transactions.

Event hubs should be obtained from the EventHubFactory by each
new event handler strategy instance, since this deals with
reconnecting disconnected event hubs. We should not store only
the currently connected event hubs in the manager as we will:

1. Never connect event hubs that were initially unavailable as
they come back online.

2. Gradually lose working event hubs over time if there are
network outages or maintenance on peers, until it is not
possible to submit any transactions.

3. Potentially pass disconnected event hubs to the listener.

Closing the underlying Channel cleans up the event hubs, so
the Channel should be closed when the Network is disposed rather
than trying to store and explicitly clean up event hubs in the
fabric-network code. At least for now...!

Change-Id: Ia5dc24ca9b7501fe5a1eebb867b0ea3277080ab4
Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday committed Oct 5, 2018
1 parent d7b354e commit 3243d67
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 209 deletions.
18 changes: 10 additions & 8 deletions fabric-network/lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,18 @@ class Gateway {
*/
async getNetwork(networkName) {
logger.debug('in getNetwork');

const existingNetwork = this.networks.get(networkName);
if (!existingNetwork) {
logger.debug('getNetwork: create network object and initialize');
const channel = this.client.getChannel(networkName);
const newNetwork = new Network(this, channel);
await newNetwork._initialize();
this.networks.set(networkName, newNetwork);
return newNetwork;
if (existingNetwork) {
return existingNetwork;
}
return existingNetwork;

logger.debug('getNetwork: create network object and initialize');
const channel = this.client.getChannel(networkName);
const newNetwork = new Network(this, channel);
await newNetwork._initialize();
this.networks.set(networkName, newNetwork);
return newNetwork;
}

async _createQueryHandler(channel, peerMap) {
Expand Down
5 changes: 0 additions & 5 deletions fabric-network/lib/impl/event/abstracteventstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ class AbstractEventStrategy {
checkCompletion(counts, successFn, failFn) {
throw new Error('AbstractEventStrategy.checkCompletion() not implemented');
}

reset() {
this.counts.success = 0;
this.counts.fail = 0;
}
}

module.exports = AbstractEventStrategy;
69 changes: 17 additions & 52 deletions fabric-network/lib/impl/event/defaulteventhandlermanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,51 @@ const logger = require('fabric-network/lib/logger').getLogger('DefaultEventHandl
class DefaultEventHandlerManager {
/**
* @typedef {Object} EventHandlerOptions
* @property {Number} [timeout = 0] Number of seconds to wait for transaction completion. A value of zero indicates
* that the handler should wait indefinitely.
* @property {Function} [strategy = EventHandlerStrategies.MSPID_SCOPE_ALLFORTX] Event strategy factory.
* @property {Number} [commitTimeout = 0] Number of seconds to wait for transaction completion. A value of zero
* indicates that the handler should wait indefinitely.
*/

/**
/**
* Constructor.
* @param {Network} network Network on which events will be processed.
* @param {String} mspId Member Services Provider identifier.
* @param {EventHandlerOptions} options Additional options for event handling behaviour.
*/
constructor(network, mspId, options) {
this.network = network;
this.channel = network.channel;
this.peerMap = network.peerMap;
this.options = options;
this.eventHubFactory = new EventHubFactory(network.getChannel());
this.mspId = mspId;

if (!this.options.strategy) {
this.options.strategy = EventHandlerStrategies.MSPID_SCOPE_ALLFORTX;
}
const defaultOptions = {
strategy: EventHandlerStrategies.MSPID_SCOPE_ALLFORTX
};
this.options = Object.assign(defaultOptions, options);

logger.debug('constructor: mspId = %s, options = %O', mspId, this.options);
}

async initialize() {
this.availableEventHubs = [];
if (!this.initialized) {
this.useFullBlocks = this.options.useFullBlocks || this.options.chaincodeEventsEnabled;
if (this.useFullBlocks === null || this.useFullBlocks === undefined) {
this.useFullBlocks = false;
}

logger.debug('initialize: useFullBlocks = %s', this.useFullBlocks);

const eventHubFactory = new EventHubFactory(this.channel);
this.eventStrategy = this.options.strategy(eventHubFactory, this.network, this.mspId);
this.availableEventHubs = await this.eventStrategy.getConnectedEventHubs();

this.initialized = true;

logger.debug('initialize: useFullBlocks = %j, availableEventHubs = %O', this.useFullBlocks, this.availableEventHubs);
}
}

dispose() {
logger.debug('dispose');
this.disconnectEventHubs();
this.availableEventHubs = [];
this.initialized = false;
}

getEventHubs() {
return this.availableEventHubs;
}

disconnectEventHubs() {
for (const hub of this.availableEventHubs) {
try {
hub.disconnect();
} catch (error) {
//
}
const strategy = this.options.strategy(this.eventHubFactory, this.network, this.mspId);
try {
await strategy.getConnectedEventHubs();
} catch (error) {
logger.debug('initialize:', error);
}
}

/**
* create an Tx Event handler for the specific txid
*
* @param {*} txid
* @param {String} txid
* @returns The transaction event handler
* @memberof DefaultEventHandlerFactory
*/
createTxEventHandler(txid) {
logger.debug('createTxEventHandler: txid = %s', txid);
// pass in all available eventHubs to listen on, the handler decides when to resolve based on strategy
// a TxEventHandler should check that the available ones are usable when appropriate.
this.eventStrategy.reset();
return new TransactionEventHandler(this, txid);
const strategy = this.options.strategy(this.eventHubFactory, this.network, this.mspId);
return new TransactionEventHandler(txid, strategy, this.options);
}

}

module.exports = DefaultEventHandlerManager;
20 changes: 14 additions & 6 deletions fabric-network/lib/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,31 @@ const util = require('util');
* @class
*/
class TransactionEventHandler {
/**
* @typedef {Object} TransactionEventHandlerOptions
* @property {Number} [commitTimeout = 0] Number of seconds to wait for transaction completion. A value of zero
* indicates that the handler should wait indefinitely.
*/

/**
* Constructor.
* @private
* @param {DefaultEventHandlerManager} manager Event handler manager
* @param {String} transactionId Transaction ID.
* @param {Object} strategy Event strategy implementation.
* @param {TransactionEventHandlerOptions} [options] Additional options.
*/
constructor(manager, transactionId) {
constructor(transactionId, strategy, options) {
this.transactionId = transactionId;
this.strategy = manager.eventStrategy;
this.strategy = strategy;

const defaultOptions = {
commitTimeout: 0 // No timeout by default
};
this.options = Object.assign(defaultOptions, manager.options);
this.options = Object.assign(defaultOptions, options);

logger.debug('constructor:', util.format('transactionId = %s, options = %O', this.transactionId, this.options));

this.eventHubs = manager.getEventHubs();
this.eventHubs = [];
this.respondedEventHubs = new Set();

this.notificationPromise = new Promise((resolve, reject) => {
Expand All @@ -49,9 +56,10 @@ class TransactionEventHandler {
* @async
*/
async startListening() {
this.eventHubs = await this.strategy.getConnectedEventHubs();
if (this.eventHubs.length > 0) {
this._setListenTimeout();
this._registerTxEventListeners();
this._setListenTimeout();
} else {
logger.debug('startListening: No event hubs');
this._txResolve();
Expand Down
6 changes: 3 additions & 3 deletions fabric-network/lib/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ class Network {
// network._dispose() followed by network.initialize() be safe ?
// make this private is the safest option.
this.contracts.clear();
if (this.eventHandlerManager) {
this.eventHandlerManager.dispose();
}

if (this.queryHandler) {
this.queryHandler.dispose();
}

this.channel.close(); // Tidies up event hubs obtained from the channel

this.initialized = false;
}

Expand Down
5 changes: 2 additions & 3 deletions fabric-network/test/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const TransactionEventHandler = require('../lib/impl/event/transactioneventhandl

describe('Contract', () => {

const sandbox = sinon.createSandbox();
let clock;

let mockChannel, mockClient, mockUser, mockGateway;
Expand Down Expand Up @@ -67,7 +66,7 @@ describe('Contract', () => {
});

afterEach(() => {
sandbox.restore();
sinon.restore();
clock.restore();
});

Expand Down Expand Up @@ -150,7 +149,7 @@ describe('Contract', () => {


beforeEach(() => {
sandbox.stub(contract, '_validatePeerResponses').returns({validResponses: validResponses});
sinon.stub(contract, '_validatePeerResponses').returns({validResponses: validResponses});
});

it('should throw if functionName not specified', () => {
Expand Down
8 changes: 3 additions & 5 deletions fabric-network/test/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const EventStrategies = require('../lib/impl/event/defaulteventhandlerstrategies


describe('Gateway', () => {
const sandbox = sinon.createSandbox();

let mockClient;
let mockDefaultQueryHandler;

Expand All @@ -46,7 +44,7 @@ describe('Gateway', () => {
});

afterEach(() => {
sandbox.restore();
sinon.restore();
});


Expand Down Expand Up @@ -165,7 +163,7 @@ describe('Gateway', () => {
beforeEach(() => {
gateway = new Gateway();
mockWallet = sinon.createStubInstance(Wallet);
sandbox.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
sinon.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
mockWallet.setUserContext.withArgs(mockClient, 'admin').returns('foo');
});

Expand Down Expand Up @@ -357,7 +355,7 @@ describe('Gateway', () => {
beforeEach(async () => {
gateway = new Gateway();
mockWallet = sinon.createStubInstance(Wallet);
sandbox.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
sinon.stub(Client, 'loadFromConfig').withArgs('ccp').returns(mockClient);
mockWallet.setUserContext.withArgs(mockClient, 'admin').returns('foo');
const options = {
wallet: mockWallet,
Expand Down
Loading

0 comments on commit 3243d67

Please sign in to comment.