Skip to content

Commit

Permalink
Convert workload modules to classes
Browse files Browse the repository at this point in the history
- Create interface and base class for workload modules
- Load workload modules with the factory approach
- Update CI test workloads to class format
- Extend CI workload arguments where needed to make workloads deterministic
- Create separate artifacts for the Fabric gateway CI phase
- Increase frequency of TX updates in CI tests to increase monitor accuracy
- Update workload module paths in the benchconfig files to precise relative path

Signed-off-by: Attila Klenik <[email protected]>
  • Loading branch information
aklenik committed Jun 4, 2020
1 parent d0507aa commit bd84a29
Show file tree
Hide file tree
Showing 45 changed files with 1,866 additions and 861 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@
}
},
"license": "Apache-2.0"
}
}
2 changes: 2 additions & 0 deletions packages/caliper-core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ module.exports.CaliperEngine = require('./lib/master/caliper-engine');
module.exports.MonitorOrchestrator = require('./lib/master/orchestrators/monitor-orchestrator');
module.exports.RoundOrchestrator = require('./lib/master/orchestrators/round-orchestrator');
module.exports.WorkerOrchestrator = require('./lib/master/orchestrators/worker-orchestrator');
module.exports.WorkloadModuleInterface = require('./lib/worker/workload/workloadModuleInterface');
module.exports.WorkloadModuleBase = require('./lib/worker/workload/workloadModuleBase');
61 changes: 36 additions & 25 deletions packages/caliper-core/lib/worker/client/caliper-local-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class CaliperLocalClient {
constructor(bcClient, clientIndex, messenger) {
this.blockchain = new bc(bcClient);
this.clientIndex = clientIndex;
this.currentRoundIndex = -1;
this.messenger = messenger;
this.context = undefined;
this.txUpdateTime = Config.get(Config.keys.TxUpdateTime, 5000);
Expand All @@ -54,6 +55,12 @@ class CaliperLocalClient {
this.prometheusClient = new PrometheusClient();
this.totalTxCount = 0;
this.totalTxDelay = 0;

/**
* The workload module instance associated with the current round, updated by {CaliperLocalClient.prepareTest}.
* @type {WorkloadModuleInterface}
*/
this.workloadModule = undefined;
}

/**
Expand Down Expand Up @@ -234,25 +241,26 @@ class CaliperLocalClient {

/**
* Perform test with specified number of transactions
* @param {Object} cb callback module
* @param {Object} number number of transactions to submit
* @param {Object} rateController rate controller object
* @async
*/
async runFixedNumber(cb, number, rateController) {
Logger.info('Info: client ' + this.clientIndex + ' start test runFixedNumber()' + (cb.info ? (':' + cb.info) : ''));
async runFixedNumber(number, rateController) {
Logger.info(`Worker ${this.clientIndex} is starting TX number-based round ${this.currentRoundIndex + 1} (${number} TXs)`);
this.startTime = Date.now();

const circularArray = new CircularArray(this.maxTxPromises);
const self = this;
while (this.txNum < number) {
// If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises,
// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
circularArray.add(cb.run().then((result) => {
this.addResult(result);
return Promise.resolve();
}));
circularArray.add(self.workloadModule.submitTransaction()
.then((result) => {
this.addResult(result);
return Promise.resolve();
}));
});
await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats);
}
Expand All @@ -263,26 +271,27 @@ class CaliperLocalClient {

/**
* Perform test with specified test duration
* @param {Object} cb callback module
* @param {Object} duration duration to run for
* @param {Object} rateController rate controller object
* @async
*/
async runDuration(cb, duration, rateController) {
Logger.info('Info: client ' + this.clientIndex + ' start test runDuration()' + (cb.info ? (':' + cb.info) : ''));
async runDuration(duration, rateController) {
Logger.info(`Worker ${this.clientIndex} is starting duration-based round ${this.currentRoundIndex + 1} (${duration} seconds)`);
this.startTime = Date.now();

// Use a circular array of Promises so that the Promise.all() call does not exceed the maximum permissable Array size
const circularArray = new CircularArray(this.maxTxPromises);
const self = this;
while ((Date.now() - this.startTime)/1000 < duration) {
// If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises,
// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
circularArray.add(cb.run().then((result) => {
this.addResult(result);
return Promise.resolve();
}));
circularArray.add(self.workloadModule.submitTransaction()
.then((result) => {
this.addResult(result);
return Promise.resolve();
}));
});
await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats);
}
Expand Down Expand Up @@ -322,7 +331,10 @@ class CaliperLocalClient {
*/
async prepareTest(test) {
Logger.debug('prepareTest() with:', test);
let cb = require(CaliperUtils.resolvePath(test.cb));
this.currentRoundIndex = test.testRound;

const workloadModuleFactory = CaliperUtils.loadModuleFunction(new Map(), test.cb, 'createWorkloadModule');
this.workloadModule = workloadModuleFactory();

const self = this;
let initUpdateInter = setInterval( () => { self.initUpdate(); } , self.txUpdateTime);
Expand All @@ -343,15 +355,15 @@ class CaliperLocalClient {
}

// Run init phase of callback
Logger.info(`Info: client ${this.clientIndex} prepare test ${(cb.info ? (':' + cb.info + 'phase starting...') : 'phase starting...')}`);
await cb.init(this.blockchain, this.context, test.args);
Logger.info(`Info: client ${this.clientIndex} prepare test phase for round ${this.currentRoundIndex + 1} is starting...`);
await this.workloadModule.initializeWorkloadModule(this.clientIndex, test.totalClients, this.currentRoundIndex, test.args, this.blockchain, this.context);
await CaliperUtils.sleep(this.txUpdateTime);
} catch (err) {
Logger.info(`Client[${this.clientIndex}] encountered an error during prepare test phase: ${(err.stack ? err.stack : err)}`);
Logger.info(`Client[${this.clientIndex}] encountered an error during prepare test phase for round ${this.currentRoundIndex + 1}: ${(err.stack ? err.stack : err)}`);
throw err;
} finally {
clearInterval(initUpdateInter);
Logger.info(`Info: client ${this.clientIndex} prepare test ${(cb.info ? (':' + cb.info + 'phase complete') : 'phase complete')}`);
Logger.info(`Info: client ${this.clientIndex} prepare test phase for round ${this.currentRoundIndex + 1} is completed`);
}
}

Expand All @@ -373,7 +385,6 @@ class CaliperLocalClient {
*/
async doTest(test) {
Logger.debug('doTest() with:', test);
let cb = require(CaliperUtils.resolvePath(test.cb));

this.beforeTest(test);

Expand All @@ -390,15 +401,15 @@ class CaliperLocalClient {
// Run the test loop
if (test.txDuration) {
const duration = test.txDuration; // duration in seconds
await this.runDuration(cb, duration, rateController);
await this.runDuration(duration, rateController);
} else {
const number = test.numb;
await this.runFixedNumber(cb, number, rateController);
await this.runFixedNumber(number, rateController);
}

// Clean up
await rateController.end();
await cb.end();
await this.workloadModule.cleanupWorkloadModule();
await this.blockchain.releaseContext(this.context);
this.clearUpdateInter(txUpdateInter);

Expand Down
100 changes: 100 additions & 0 deletions packages/caliper-core/lib/worker/workload/workloadModuleBase.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const WorkloadModuleInterface = require('./workloadModuleInterface');
const Logger = require('./../../common/utils/caliper-utils').getLogger('workload-module-base');

/**
* Utility base class for the user-implemented workload modules used for assembling TXs for the SUT.
*/
class WorkloadModuleBase extends WorkloadModuleInterface {
/**
* Initialize an instance of the WorkloadModuleBase class.
*/
constructor() {
super();

Logger.debug('Constructing workload module');

/**
* The 0-based index of the worker instantiating the workload module.
* @type {number}
*/
this.workerIndex = -1;

/**
* The total number of workers participating in the round.
* @type {number}
*/
this.totalWorkers = -1;

/**
* The 0-based index of the currently executing round.
* @type {number}
*/
this.roundIndex = -1;

/**
* The user-provided arguments for the round from the benchmark configuration file.
* @type {Object}
*/
this.roundArguments = undefined;

/**
* The adapter of the underlying SUT.
* @type {BlockchainInterface}
*/
this.sutAdapter = undefined;

/**
* The custom context object provided by the SUT adapter.
* @type {Object}
*/
this.sutContext = undefined;
}

/**
* Initialize the workload module with the given parameters.
* @param {number} workerIndex The 0-based index of the worker instantiating the workload module.
* @param {number} totalWorkers The total number of workers participating in the round.
* @param {number} roundIndex The 0-based index of the currently executing round.
* @param {Object} roundArguments The user-provided arguments for the round from the benchmark configuration file.
* @param {BlockchainInterface} sutAdapter The adapter of the underlying SUT.
* @param {Object} sutContext The custom context object provided by the SUT adapter.
* @async
*/
async initializeWorkloadModule(workerIndex, totalWorkers, roundIndex, roundArguments, sutAdapter, sutContext) {
Logger.debug(`Workload module initialized with: workerIndex=${workerIndex}, totalWorkers=${totalWorkers}, roundIndex=${roundIndex}, roundArguments=${JSON.stringify(roundArguments)}`);
this.workerIndex = workerIndex;
this.totalWorkers = totalWorkers;
this.roundIndex = roundIndex;
this.roundArguments = roundArguments;
this.sutAdapter = sutAdapter;
this.sutContext = sutContext;
}

/**
* Clean up the workload module at the end of the round.
* @async
*/
async cleanupWorkloadModule() {
// NOOP by default
Logger.debug('Cleaning up workload module: NOOP');
}
}


module.exports = WorkloadModuleBase;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

/**
* Interface for the user-implemented workload modules used for assembling TXs for the SUT.
*/
class WorkloadModuleInterface {
/**
* Initialize the workload module with the given parameters.
* @param {number} workerIndex The 0-based index of the worker instantiating the workload module.
* @param {number} totalWorkers The total number of workers participating in the round.
* @param {number} roundIndex The 0-based index of the currently executing round.
* @param {object} roundArguments The user-provided arguments for the round from the benchmark configuration file.
* @param {BlockchainInterface} sutAdapter The adapter of the underlying SUT.
* @param {object} sutContext The custom context object provided by the SUT adapter.
* @async
*/
async initializeWorkloadModule(workerIndex, totalWorkers, roundIndex, roundArguments, sutAdapter, sutContext) {
throw new Error('WorkloadModuleInterface.initializeWorkloadModule() must be implemented in derived class');
}

/**
* Assemble the next TX content(s) and submit it to the SUT adapter.
* @return {Promise<TxStatus[]>} The promises for the TX results as returned by the SUT adapter.
* @async
*/
async submitTransaction() {
throw new Error('WorkloadModuleInterface.submitTransaction() must be implemented in derived class');
}

/**
* Clean up the workload module at the end of the round.
* @async
*/
async cleanupWorkloadModule() {
throw new Error('WorkloadModuleInterface.cleanupWorkloadModule() must be implemented in derived class');
}
}


module.exports = WorkloadModuleInterface;
1 change: 1 addition & 0 deletions packages/caliper-tests-integration/besu_tests/caliper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
caliper:
benchconfig: benchconfig.yaml
networkconfig: networkconfig.json
txupdatetime: 1000
workspace: ./
report:
path: report.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ services:
- ./keys:/root/.ethereum/keystore
ports:
- 8545-8547:8545-8547
command: --revert-reason-enabled --rpc-http-enabled --rpc-http-host 0.0.0.0 --host-whitelist=* --rpc-http-apis admin,eth,miner,web3,net --graphql-http-enabled --discovery-enabled=false
command: --revert-reason-enabled --rpc-ws-enabled --rpc-ws-host 0.0.0.0 --host-whitelist=* --rpc-ws-apis admin,eth,miner,web3,net --graphql-http-enabled --discovery-enabled=false
Loading

0 comments on commit bd84a29

Please sign in to comment.