Skip to content

Commit

Permalink
Ensure that connector errors finishes caliper transactions (#1328)
Browse files Browse the repository at this point in the history
A Scenario was discovered where if you send a single txn and the
connector throws an error then that single txn never finishes and the
worker loops forever waiting for that transaction to finish.

eg in the workload

```
invokerIdentity: 'unknownuser'
```

example benchmark
```
test:
  name: fixed-asset-test
  description: >-
    This is a test yaml for the existing fixed-asset benchmarks
  workers:
    type: local
    number: 1
  rounds:
    - label: empty-contract-evaluate
      chaincodeID: fixed-asset
      txNumber: 1
      rateControl:
        type: fixed-rate
        opts:
          tps: 2
      workload:
        module: benchmarks/api/fabric/workloads/empty-contract.js
        arguments:
          chaincodeID: fixed-asset
          consensus: false
```

In the wider support for connectors though the caliper framework should
ensure that connectors that either don't handle errors or throw errors
should make sure that the submission is registered as a failure (which
is what a connector would do if it did catch an error)

This fix ensures that any error received will mark a transaction as
finished

closes #1068

Signed-off-by: D <[email protected]>
  • Loading branch information
davidkel authored May 9, 2022
1 parent d699171 commit 35ae4ca
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 7 deletions.
58 changes: 52 additions & 6 deletions packages/caliper-core/lib/common/core/connector-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
*/

'use strict';

const CaliperUtils = require('../../common/utils/caliper-utils.js');
const TxStatus = require('../core/transaction-status');
const ConnectorInterface = require('./connector-interface');
const Events = require('./../utils/constants').Events.Connector;
const Logger = CaliperUtils.getLogger('connector-base');

/**
* Optional base class for connectors.
Expand Down Expand Up @@ -75,20 +77,64 @@ class ConnectorBase extends ConnectorInterface {
async sendRequests(requests) {
if (!Array.isArray(requests)) {
this._onTxsSubmitted(1);
const result = await this._sendSingleRequest(requests);
this._onTxsFinished(result);
let result = new TxStatus();

try {
result = await this._sendSingleRequest(requests);
} catch(error) {
Logger.error(`Unexpected error while sending request: ${error.stack || error}`);
result.SetStatusFail();
result.SetVerification(true);
result.SetResult('');

// re-throwing an error allows for the worker to exit doing further work
// and move into waiting for tx's to finish. If any further errors occur
// then they will be ignored but the tx's will be marked as finished still
throw error;
} finally {
this._onTxsFinished(result);
}

return result;
}

const promises = [];
const creationTime = Date.now();
for (let i = 0; i < requests.length; ++i) {
this._onTxsSubmitted(1);
promises.push(this._sendSingleRequest(requests[i]));
}

const results = await Promise.all(promises);
this._onTxsFinished(results);
return results;
const results = await Promise.allSettled(promises);
let firstError;
const actualResults = results.map((result) => {
if (result.status === 'rejected') {
if (!firstError) {
firstError = result.reason;
}
const failureResult = new TxStatus();
failureResult.SetStatusFail();
failureResult.SetVerification(true);
failureResult.SetResult('');
failureResult.Set('time_create', creationTime);
return failureResult;
}

return result.value;
});

this._onTxsFinished(actualResults);

if (firstError) {
Logger.error(`Unexpected error while sending multiple requests, first error was: ${firstError.stack || firstError}`);

// re-throwing an error allows for the worker to exit doing further work
// and move into waiting for tx's to finish. If any further errors occur
// then they will be ignored but the tx's will be marked as finished still
throw firstError;
}

return actualResults;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/caliper-core/lib/worker/caliper-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class CaliperWorker {
}

if (error) {
Logger.error(`Unhandled error while executing TX: ${error.stack || error}`);
// Already logged, no need to log again
throw error;
}

Expand Down
108 changes: 108 additions & 0 deletions packages/caliper-core/test/core/connector-base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const chai = require('chai');
chai.should();
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const sinon = require('sinon');

const loggerSandbox = sinon.createSandbox();
const CaliperUtils = require('../../lib/common/utils/caliper-utils');
loggerSandbox.replace(CaliperUtils, "getLogger", () => {
return {
debug: sinon.stub(),
error: sinon.stub(),
warn: sinon.stub(),
info: sinon.stub()
};
});

const { ConnectorBase, TxStatus } = require('../..');
const Events = require('../../lib/common/utils/constants').Events.Connector;

class MockConnector extends ConnectorBase {
async _sendSingleRequest(request) {
if (request instanceof Error) {
throw request;
}
return request;
}
}

describe('the base connector implementation', () => {
after(() => {
loggerSandbox.restore();
});

describe('on sending requests', () => {
let mockConnector;
let emitSpy;
const txStatus = new TxStatus();
const txStatus2 = new TxStatus();
const txStatus3 = new TxStatus();
beforeEach(() => {
mockConnector = new MockConnector(1, 'mock');
emitSpy = sinon.spy(mockConnector, 'emit');
})

it('should process a single request that returns a transaction status result', async() => {
const result = await mockConnector.sendRequests(txStatus);

result.should.equal(txStatus);
sinon.assert.calledTwice(emitSpy);
sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.secondCall, Events.TxsFinished, txStatus);
});

it('should process a single request that throws an error', async() => {
await mockConnector.sendRequests(new Error('some failure')).should.be.rejectedWith(/some failure/);

sinon.assert.calledTwice(emitSpy);
sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.secondCall, Events.TxsFinished, sinon.match.instanceOf(TxStatus));
});

it('should process multiple requests that where they all return a transaction status result', async() => {
const result = await mockConnector.sendRequests([txStatus, txStatus2, txStatus3]);
result.should.deep.equal([txStatus, txStatus2, txStatus3]);
sinon.assert.callCount(emitSpy, 4);
sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.secondCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.thirdCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.getCall(3), Events.TxsFinished, [txStatus, txStatus2, txStatus3]);
});

it('should process multiple requests where some return an error', async() => {
await mockConnector.sendRequests([new Error('error 1'), txStatus2, new Error('error 4'), txStatus3]).should.be.rejectedWith(/error 1/);
sinon.assert.callCount(emitSpy, 5);
sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.secondCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.thirdCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.getCall(3), Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.getCall(4), Events.TxsFinished, [sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus)]);
});

it('should process multiple requests where all return an error', async() => {
await mockConnector.sendRequests([new Error('error 1'), new Error('error 2'), new Error('error 3')]).should.be.rejectedWith(/error 1/);
sinon.assert.callCount(emitSpy, 4);
sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.secondCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.thirdCall, Events.TxsSubmitted, 1);
sinon.assert.calledWith(emitSpy.getCall(3), Events.TxsFinished, [sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus)]);
});
});
});

0 comments on commit 35ae4ca

Please sign in to comment.