Skip to content

Commit

Permalink
FABN-1332: Revert commit handler implementation
Browse files Browse the repository at this point in the history
Revert the implementation of the default commit handlers to
equivalent to the 1.4.0 implementation to restore performance.
This means using event hubs directly to listen for commit events
rather than using Transaction.addCommitListener().

Also update the sample custom sample transaction event handler
to use cached event hubs rather then new event hub instances on
each transaction for better performance.

Change-Id: I2f5e799f8983ef688c0351d1153aeed2ab173de6
Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday committed Aug 2, 2019
1 parent 6a7e5ef commit 54d230b
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 117 deletions.
2 changes: 1 addition & 1 deletion fabric-network/lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const logger = require('./logger').getLogger('Gateway');
/**
* @typedef {Function} Gateway~TxEventHandlerFactory
* @memberof module:fabric-network
* @param {String} transactionId The transaction ID for which the handler should listen.
* @param {Transaction} transaction The transaction for which the handler should listen for commit events.
* @param {module:fabric-network.Network} network The network on which this transaction is being submitted.
* @returns {module:fabric-network.Gateway~TxEventHandler} A transaction event handler.
*/
Expand Down
1 change: 0 additions & 1 deletion fabric-network/lib/impl/event/contracteventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class ContractEventListener extends AbstractEventListener {
}

async _onEvents(events) {
console.log('_onEvents');
logger.debug(`[${this.listenerName}]: Received contract events as array`);
if (!this.options.asArray) {
logger.debug(`[${this.listenerName}]: Splitting events`);
Expand Down
44 changes: 17 additions & 27 deletions fabric-network/lib/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const util = require('util');
* Delegates to an event strategy to decide whether events or errors received should be interpreted as success or
* failure of a transaction.
* @private
* @class
*/
class TransactionEventHandler {
/**
Expand Down Expand Up @@ -56,12 +55,11 @@ class TransactionEventHandler {

/**
* Called to initiate listening for transaction events.
* @async
*/
async startListening() {
if (this.eventHubs.length > 0) {
this._setListenTimeout();
await this._registerTxEventListeners();
this._registerTxEventListeners();
} else {
logger.debug('startListening: No event hubs');
this._resolveNotificationPromise();
Expand All @@ -75,34 +73,28 @@ class TransactionEventHandler {

logger.debug('_setListenTimeout:', `setTimeout(${this.options.commitTimeout}) for transaction ${this.transactionId}`);

this.timeoutHandler = setTimeout(() => {
this._timeoutFail();
}, this.options.commitTimeout * 1000);
this.timeoutHandler = setTimeout(
() => this._timeoutFail(),
this.options.commitTimeout * 1000
);
}

async _registerTxEventListeners() {
const registrationOptions = {unregister: true, fixedEventHub: true};

const promises = this.eventHubs.map((eventHub) => {
return new Promise(async (resolve) => {
logger.debug('_registerTxEventListeners:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());
await this.transaction.addCommitListener((err, txId, code) => {
if (err) {
return this._onError(eventHub, err);
}
return this._onEvent(eventHub, txId, code);
}, registrationOptions, eventHub);
resolve();
});
_registerTxEventListeners() {
this.eventHubs.forEach(eventHub => {
logger.debug('_registerTxEventListeners:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());
eventHub.registerTxEvent(
this.transactionId,
(txId, code) => this._onEvent(eventHub, txId, code),
(err) => this._onError(eventHub, err)
);
eventHub.connect();
});

await Promise.all(promises);
}

_timeoutFail() {
const unrespondedEventHubs = this.eventHubs
.filter((eventHub) => !this.respondedEventHubs.has(eventHub))
.map((eventHub) => eventHub.getName())
.filter(eventHub => !this.respondedEventHubs.has(eventHub))
.map(eventHub => eventHub.getName())
.join(', ');
const message = 'Event strategy not satisfied within the timeout period. No response received from event hubs: ' + unrespondedEventHubs;
const error = new TimeoutError({
Expand Down Expand Up @@ -160,7 +152,6 @@ class TransactionEventHandler {

/**
* Wait until enough events have been received from the event hubs to satisfy the event handling strategy.
* @async
* @throws {Error} if the transaction commit is not successful within the timeout period.
*/
async waitForEvents() {
Expand All @@ -175,9 +166,8 @@ class TransactionEventHandler {
logger.debug('cancelListening called');

clearTimeout(this.timeoutHandler);
this.eventHubs.forEach((eventHub) => eventHub.unregisterTxEvent(this.transactionId));
this.eventHubs.forEach(eventHub => eventHub.unregisterTxEvent(this.transactionId));
}

}

module.exports = TransactionEventHandler;
2 changes: 1 addition & 1 deletion fabric-network/test/impl/event/filesystemcheckpointer.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('FileSystemCheckpointer', () => {
describe('#save', () => {

it('should initialize the checkpointer file doesnt exist', async () => {
fs.readFile.resolves(new Buffer(''));
fs.readFile.resolves(Buffer.from(''));
fs.exists.resolves(false);
sinon.spy(checkpointer, '_initialize');
sinon.spy(checkpointer, 'load');
Expand Down
86 changes: 35 additions & 51 deletions fabric-network/test/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,22 @@ describe('TransactionEventHandler', () => {
let stubTransaction;
let stubTransactionID;
const transactionId = 'TRANSACTION_ID';

beforeEach(() => {
// Include _stubInfo property on stubs to enable easier equality comparison in tests
stubTransaction = sinon.createStubInstance(Transaction);
stubTransactionID = sinon.createStubInstance(TransactionID);
stubTransactionID.getTransactionID.returns(transactionId);

stubTransaction = sinon.createStubInstance(Transaction);
stubTransaction.getTransactionID.returns(stubTransactionID);

stubEventHub = sinon.createStubInstance(ChannelEventHub);
stubEventHub.getName.returns('eventHub');
stubEventHub.getPeerAddr.returns('eventHubAddress');
stubEventHub.registerTxEvent.callsFake((txId, onEventFn, onErrorFn) => {
stubEventHub._transactionId = txId;
stubEventHub._onEventFn = onEventFn;
stubEventHub._onErrorFn = onErrorFn;
});

stubStrategy = {
getEventHubs: sinon.stub(),
Expand Down Expand Up @@ -76,112 +84,79 @@ describe('TransactionEventHandler', () => {
});

describe('#startListening', () => {
it('calls addCommitListener() on transaction with callback', async () => {
it('calls registerTxEvent() on event hub with transaction ID', async () => {
await handler.startListening();
sinon.assert.calledWith(stubTransaction.addCommitListener, sinon.match.func);
sinon.assert.calledWith(stubEventHub.registerTxEvent, transactionId);
});

it('sets auto-unregister option when calling registerTxEvent() on event hub', async () => {
it('calls connect() on event hub', async () => {
await handler.startListening();
sinon.assert.calledWith(
stubTransaction.addCommitListener,
sinon.match.func,
sinon.match.has('unregister', true),
stubEventHub
);
});

it('should call transaction.addCommitListener', () => {
handler = new TransactionEventHandler(stubTransaction, stubStrategy);
handler.startListening();
sinon.assert.calledWith(
stubTransaction.addCommitListener,
sinon.match.func,
sinon.match.has('unregister', true),
stubEventHub
);
});

it('should call _onError if err is set', () => {
handler = new TransactionEventHandler(stubTransaction, stubStrategy);
sinon.spy(handler, '_onError');
handler.startListening();
const err = new Error('an error');
stubTransaction.addCommitListener.callArgWith(0, err);
sinon.assert.calledWith(handler._onError, stubEventHub, err);
});

it('should call _onEvent if err is set', () => {
handler = new TransactionEventHandler(stubTransaction, stubStrategy);
sinon.spy(handler, '_onEvent');
handler.startListening();
stubTransaction.addCommitListener.callArgWith(0, null, transactionId, 'VALID');
sinon.assert.calledWith(handler._onEvent, stubEventHub, transactionId, 'VALID');
sinon.assert.called(stubEventHub.connect);
});
});

it('calls eventReceived() on strategy when event hub sends valid event', async () => {
stubTransaction.addCommitListener.yields(null, transactionId, 'VALID');
await handler.startListening();
stubEventHub._onEventFn(transactionId, 'VALID');
sinon.assert.calledWith(stubStrategy.eventReceived, sinon.match.func, sinon.match.func);
});

it('does not call errorReceived() on strategy when event hub sends valid event', async () => {
stubTransaction.addCommitListener.yields(null, transactionId, 'VALID');
await handler.startListening();
stubEventHub._onEventFn(transactionId, 'VALID');
sinon.assert.notCalled(stubStrategy.errorReceived);
});

it('calls errorReceived() on strategy when event hub sends an error', async () => {
stubTransaction.addCommitListener.yields(new Error());
await handler.startListening();
stubEventHub._onErrorFn(new Error('EVENT HUB ERROR'));
sinon.assert.calledWith(stubStrategy.errorReceived, sinon.match.func, sinon.match.func);
});

it('does not call eventReceived() on strategy when event hub sends an error', async () => {
stubTransaction.addCommitListener.yields(new Error('EVENT_HUB_ERROR'));
await handler.startListening();
stubEventHub._onErrorFn(new Error('EVENT HUB ERROR'));
sinon.assert.notCalled(stubStrategy.eventReceived);
});

it('fails when event hub sends an invalid event', async () => {
const code = 'ERROR_CODE';
stubTransaction.addCommitListener.yields(null, transactionId, code);
await handler.startListening();
stubEventHub._onEventFn(transactionId, code);
return expect(handler.waitForEvents()).to.be.rejectedWith(code);
});

it('succeeds when strategy calls success function after event received', async () => {
stubStrategy.eventReceived = ((successFn, failFn) => successFn()); // eslint-disable-line no-unused-vars
stubStrategy.eventReceived = (successFn, failFn) => successFn(); // eslint-disable-line no-unused-vars

stubTransaction.addCommitListener.yields(null, transactionId, 'VALID');
await handler.startListening();
stubEventHub._onEventFn(transactionId, 'VALID');
return expect(handler.waitForEvents()).to.be.fulfilled;
});

it('fails when strategy calls fail function after event received', async () => {
const error = new Error('STRATEGY_FAIL');
stubStrategy.eventReceived = ((successFn, failFn) => failFn(error));

stubTransaction.addCommitListener.yields(null, transactionId, 'VALID');
await handler.startListening();
stubEventHub._onEventFn(transactionId, 'VALID');
return expect(handler.waitForEvents()).to.be.rejectedWith(error);
});

it('succeeds when strategy calls success function after error received', async () => {
stubStrategy.errorReceived = ((successFn, failFn) => successFn()); // eslint-disable-line no-unused-vars

stubTransaction.addCommitListener.yields(new Error('EVENT_HUB_ERROR'));
await handler.startListening();
stubEventHub._onErrorFn(new Error('EVENT HUB ERROR'));
return expect(handler.waitForEvents()).to.be.fulfilled;
});

it('fails when strategy calls fail function after error received', async () => {
const error = new Error('STRATEGY_FAIL');
stubStrategy.errorReceived = ((successFn, failFn) => failFn(error));

stubTransaction.addCommitListener.yields(new Error('EVENT_HUB_ERROR'));
await handler.startListening();
stubEventHub._onErrorFn(new Error('EVENT HUB ERROR'));
return expect(handler.waitForEvents()).to.be.rejectedWith(error);
});

Expand Down Expand Up @@ -209,29 +184,34 @@ describe('TransactionEventHandler', () => {
it('fails on timeout if timeout set', async () => {
const options = {commitTimeout: 418};
handler = new TransactionEventHandler(stubTransaction, stubStrategy, options);

await handler.startListening();
const promise = handler.waitForEvents();

clock.runAll();
return expect(promise).to.be.rejectedWith(TimeoutError);
});

it('does not timeout if timeout set to zero', async () => {
stubStrategy.eventReceived = ((successFn, failFn) => successFn()); // eslint-disable-line no-unused-vars

const options = {commitTimeout: 0};
handler = new TransactionEventHandler(stubTransaction, stubStrategy, options);
stubTransaction.addCommitListener.yields(null, transactionId, 'VALID');

await handler.startListening();
clock.runAll();
stubEventHub._onEventFn(transactionId, 'VALID');

return expect(handler.waitForEvents()).to.be.fulfilled;
});

it('timeout failure message includes event hubs that have not responded', async () => {
const options = {commitTimeout: 418};
handler = new TransactionEventHandler(stubTransaction, stubStrategy, options);

await handler.startListening();
const promise = handler.waitForEvents();
clock.runAll();

const eventHubName = stubEventHub.getName();
return expect(promise).to.be.rejectedWith(eventHubName);
});
Expand All @@ -240,17 +220,21 @@ describe('TransactionEventHandler', () => {
stubStrategy.getEventHubs.returns([]);
const options = {commitTimeout: 418};
handler = new TransactionEventHandler(stubTransaction, stubStrategy, options);

await handler.startListening();
clock.runAll();

return expect(handler.waitForEvents()).to.be.fulfilled;
});

it('timeout failure error has transaction ID property', async () => {
const options = {commitTimeout: 418};
handler = new TransactionEventHandler(stubTransaction, stubStrategy, options);

await handler.startListening();
const promise = handler.waitForEvents();
clock.runAll();

try {
await promise;
chai.assert.fail('Expected an error');
Expand Down
Loading

0 comments on commit 54d230b

Please sign in to comment.