Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.4.0 PR - Convert workload modules to classes #856

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,4 @@
}
},
"license": "Apache-2.0"
}
}
2 changes: 2 additions & 0 deletions packages/caliper-core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ module.exports.CaliperEngine = require('./lib/master/caliper-engine');
module.exports.MonitorOrchestrator = require('./lib/master/orchestrators/monitor-orchestrator');
module.exports.RoundOrchestrator = require('./lib/master/orchestrators/round-orchestrator');
module.exports.WorkerOrchestrator = require('./lib/master/orchestrators/worker-orchestrator');
module.exports.WorkloadModuleInterface = require('./lib/worker/workload/workloadModuleInterface');
module.exports.WorkloadModuleBase = require('./lib/worker/workload/workloadModuleBase');
61 changes: 36 additions & 25 deletions packages/caliper-core/lib/worker/client/caliper-local-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class CaliperLocalClient {
constructor(bcClient, clientIndex, messenger) {
this.blockchain = new bc(bcClient);
this.clientIndex = clientIndex;
this.currentRoundIndex = -1;
this.messenger = messenger;
this.context = undefined;
this.txUpdateTime = Config.get(Config.keys.TxUpdateTime, 5000);
Expand All @@ -55,6 +56,12 @@ class CaliperLocalClient {
this.prometheusClient = new PrometheusClient();
this.totalTxCount = 0;
this.totalTxDelay = 0;

/**
* The workload module instance associated with the current round, updated by {CaliperLocalClient.prepareTest}.
* @type {WorkloadModuleInterface}
*/
this.workloadModule = undefined;
}

/**
Expand Down Expand Up @@ -235,25 +242,26 @@ class CaliperLocalClient {

/**
* Perform test with specified number of transactions
* @param {Object} cb callback module
* @param {Object} number number of transactions to submit
* @param {Object} rateController rate controller object
* @async
*/
async runFixedNumber(cb, number, rateController) {
Logger.info('Info: client ' + this.clientIndex + ' start test runFixedNumber()' + (cb.info ? (':' + cb.info) : ''));
async runFixedNumber(number, rateController) {
Logger.info(`Worker ${this.clientIndex} is starting TX number-based round ${this.currentRoundIndex + 1} (${number} TXs)`);
this.startTime = Date.now();

const circularArray = new CircularArray(this.maxTxPromises);
const self = this;
while (this.txNum < number) {
// If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises,
// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
circularArray.add(cb.run().then((result) => {
this.addResult(result);
return Promise.resolve();
}));
circularArray.add(self.workloadModule.submitTransaction()
.then((result) => {
this.addResult(result);
return Promise.resolve();
}));
});
await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats);
}
Expand All @@ -264,26 +272,27 @@ class CaliperLocalClient {

/**
* Perform test with specified test duration
* @param {Object} cb callback module
* @param {Object} duration duration to run for
* @param {Object} rateController rate controller object
* @async
*/
async runDuration(cb, duration, rateController) {
Logger.info('Info: client ' + this.clientIndex + ' start test runDuration()' + (cb.info ? (':' + cb.info) : ''));
async runDuration(duration, rateController) {
Logger.info(`Worker ${this.clientIndex} is starting duration-based round ${this.currentRoundIndex + 1} (${duration} seconds)`);
this.startTime = Date.now();

// Use a circular array of Promises so that the Promise.all() call does not exceed the maximum permissable Array size
const circularArray = new CircularArray(this.maxTxPromises);
const self = this;
while ((Date.now() - this.startTime)/1000 < duration) {
// If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises,
// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
circularArray.add(cb.run().then((result) => {
this.addResult(result);
return Promise.resolve();
}));
circularArray.add(self.workloadModule.submitTransaction()
.then((result) => {
this.addResult(result);
return Promise.resolve();
}));
});
await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats);
}
Expand Down Expand Up @@ -323,7 +332,10 @@ class CaliperLocalClient {
*/
async prepareTest(test) {
Logger.debug('prepareTest() with:', test);
let cb = require(CaliperUtils.resolvePath(test.cb));
this.currentRoundIndex = test.testRound;

const workloadModuleFactory = CaliperUtils.loadModuleFunction(new Map(), test.cb, 'createWorkloadModule');
this.workloadModule = workloadModuleFactory();

const self = this;
let initUpdateInter = setInterval( () => { self.initUpdate(); } , self.txUpdateTime);
Expand All @@ -344,15 +356,15 @@ class CaliperLocalClient {
}

// Run init phase of callback
Logger.info(`Info: client ${this.clientIndex} prepare test ${(cb.info ? (':' + cb.info + 'phase starting...') : 'phase starting...')}`);
await cb.init(this.blockchain, this.context, test.args);
Logger.info(`Info: client ${this.clientIndex} prepare test phase for round ${this.currentRoundIndex + 1} is starting...`);
await this.workloadModule.initializeWorkloadModule(this.clientIndex, test.totalClients, this.currentRoundIndex, test.args, this.blockchain, this.context);
await CaliperUtils.sleep(this.txUpdateTime);
} catch (err) {
Logger.info(`Client[${this.clientIndex}] encountered an error during prepare test phase: ${(err.stack ? err.stack : err)}`);
Logger.info(`Client[${this.clientIndex}] encountered an error during prepare test phase for round ${this.currentRoundIndex + 1}: ${(err.stack ? err.stack : err)}`);
throw err;
} finally {
clearInterval(initUpdateInter);
Logger.info(`Info: client ${this.clientIndex} prepare test ${(cb.info ? (':' + cb.info + 'phase complete') : 'phase complete')}`);
Logger.info(`Info: client ${this.clientIndex} prepare test phase for round ${this.currentRoundIndex + 1} is completed`);
}
}

Expand All @@ -374,7 +386,6 @@ class CaliperLocalClient {
*/
async doTest(test) {
Logger.debug('doTest() with:', test);
let cb = require(CaliperUtils.resolvePath(test.cb));

this.beforeTest(test);

Expand All @@ -391,15 +402,15 @@ class CaliperLocalClient {
// Run the test loop
if (test.txDuration) {
const duration = test.txDuration; // duration in seconds
await this.runDuration(cb, duration, rateController);
await this.runDuration(duration, rateController);
} else {
const number = test.numb;
await this.runFixedNumber(cb, number, rateController);
await this.runFixedNumber(number, rateController);
}

// Clean up
await rateController.end();
await cb.end();
await this.workloadModule.cleanupWorkloadModule();
await this.blockchain.releaseContext(this.context);
this.clearUpdateInter(txUpdateInter);

Expand Down
100 changes: 100 additions & 0 deletions packages/caliper-core/lib/worker/workload/workloadModuleBase.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const WorkloadModuleInterface = require('./workloadModuleInterface');
const Logger = require('./../../common/utils/caliper-utils').getLogger('workload-module-base');

/**
* Utility base class for the user-implemented workload modules used for assembling TXs for the SUT.
*/
class WorkloadModuleBase extends WorkloadModuleInterface {
/**
* Initialize an instance of the WorkloadModuleBase class.
*/
constructor() {
super();

Logger.debug('Constructing workload module');

/**
* The 0-based index of the worker instantiating the workload module.
* @type {number}
*/
this.workerIndex = -1;

/**
* The total number of workers participating in the round.
* @type {number}
*/
this.totalWorkers = -1;

/**
* The 0-based index of the currently executing round.
* @type {number}
*/
this.roundIndex = -1;

/**
* The user-provided arguments for the round from the benchmark configuration file.
* @type {Object}
*/
this.roundArguments = undefined;

/**
* The adapter of the underlying SUT.
* @type {BlockchainInterface}
*/
this.sutAdapter = undefined;

/**
* The custom context object provided by the SUT adapter.
* @type {Object}
*/
this.sutContext = undefined;
}

/**
* Initialize the workload module with the given parameters.
* @param {number} workerIndex The 0-based index of the worker instantiating the workload module.
* @param {number} totalWorkers The total number of workers participating in the round.
* @param {number} roundIndex The 0-based index of the currently executing round.
* @param {Object} roundArguments The user-provided arguments for the round from the benchmark configuration file.
* @param {BlockchainInterface} sutAdapter The adapter of the underlying SUT.
* @param {Object} sutContext The custom context object provided by the SUT adapter.
* @async
*/
async initializeWorkloadModule(workerIndex, totalWorkers, roundIndex, roundArguments, sutAdapter, sutContext) {
Logger.debug(`Workload module initialized with: workerIndex=${workerIndex}, totalWorkers=${totalWorkers}, roundIndex=${roundIndex}, roundArguments=${JSON.stringify(roundArguments)}`);
this.workerIndex = workerIndex;
this.totalWorkers = totalWorkers;
this.roundIndex = roundIndex;
this.roundArguments = roundArguments;
this.sutAdapter = sutAdapter;
this.sutContext = sutContext;
}

/**
* Clean up the workload module at the end of the round.
* @async
*/
async cleanupWorkloadModule() {
// NOOP by default
Logger.debug('Cleaning up workload module: NOOP');
}
}


module.exports = WorkloadModuleBase;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

/**
* Interface for the user-implemented workload modules used for assembling TXs for the SUT.
*/
class WorkloadModuleInterface {
/**
* Initialize the workload module with the given parameters.
* @param {number} workerIndex The 0-based index of the worker instantiating the workload module.
* @param {number} totalWorkers The total number of workers participating in the round.
* @param {number} roundIndex The 0-based index of the currently executing round.
* @param {object} roundArguments The user-provided arguments for the round from the benchmark configuration file.
* @param {BlockchainInterface} sutAdapter The adapter of the underlying SUT.
* @param {object} sutContext The custom context object provided by the SUT adapter.
* @async
*/
async initializeWorkloadModule(workerIndex, totalWorkers, roundIndex, roundArguments, sutAdapter, sutContext) {
throw new Error('WorkloadModuleInterface.initializeWorkloadModule() must be implemented in derived class');
}

/**
* Assemble the next TX content(s) and submit it to the SUT adapter.
* @return {Promise<TxStatus[]>} The promises for the TX results as returned by the SUT adapter.
* @async
*/
async submitTransaction() {
throw new Error('WorkloadModuleInterface.submitTransaction() must be implemented in derived class');
}

/**
* Clean up the workload module at the end of the round.
* @async
*/
async cleanupWorkloadModule() {
throw new Error('WorkloadModuleInterface.cleanupWorkloadModule() must be implemented in derived class');
}
}


module.exports = WorkloadModuleInterface;
1 change: 1 addition & 0 deletions packages/caliper-tests-integration/besu_tests/caliper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
caliper:
benchconfig: benchconfig.yaml
networkconfig: networkconfig.json
txupdatetime: 1000
workspace: ./
report:
path: report.html
Expand Down
Loading