Skip to content

Commit

Permalink
Add a circular array of txPromises
Browse files Browse the repository at this point in the history
  • Loading branch information
nklincoln committed Apr 9, 2020
1 parent e97c93c commit 1aca87e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
3 changes: 2 additions & 1 deletion packages/caliper-core/lib/common/config/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ const keys = {
Communication: {
Method: 'caliper-worker-communication-method',
Address: 'caliper-worker-communication-address',
}
},
MaxTxPromises: 'maxtxpromises'
},
Flow: {
Skip: {
Expand Down
1 change: 1 addition & 0 deletions packages/caliper-core/lib/common/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ caliper:
method: process
# Address used for mqtt communications
address: mqtt://localhost:1883
maxtxpromises: 100
# Caliper flow options
flow:
# Skip options
Expand Down
46 changes: 46 additions & 0 deletions packages/caliper-core/lib/common/utils/circular-array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

/**
* Create an Array that has a maximum length and will overwrite existing entries when additional items are added to the array
*/
class CircularArray extends Array {

/**
* Constructor
* @param {number} maxLength maximum length of array
*/
constructor(maxLength) {
super();
this.pointer = 0;
this.maxLength = maxLength;
}

/**
* Add entry into array
* @param {any} element the element to add to the array
*/
add(element) {
if (this.length === this.maxLength) {
this[this.pointer] = element;
} else {
this.push(element);
}
this.pointer = (this.pointer + 1) % this.maxLength;
}
}

module.exports = CircularArray;
17 changes: 10 additions & 7 deletions packages/caliper-core/lib/worker/client/caliper-local-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

const Config = require('../../common/config/config-util.js');
const CaliperUtils = require('../../common/utils/caliper-utils.js');
const CircularArray = require('../../common/utils/circular-array');
const bc = require('../../common/core/blockchain.js');
const RateControl = require('../rate-control/rateControl.js');
const PrometheusClient = require('../../common/prometheus/prometheus-push-client');
Expand Down Expand Up @@ -240,21 +241,21 @@ class CaliperLocalClient {
Logger.info('Info: client ' + this.clientIndex + ' start test runFixedNumber()' + (cb.info ? (':' + cb.info) : ''));
this.startTime = Date.now();

let promises = [];
while(this.txNum < number) {
const circularArray = new CircularArray(this.maxTxPromises);
while (this.txNum < number) {
// If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
promises.push(cb.run().then((result) => {
circularArray.add(cb.run().then((result) => {
this.addResult(result);
return Promise.resolve();
}));
});
await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats);
}

await Promise.all(promises);
await Promise.all(circularArray);
this.endTime = Date.now();
}

Expand All @@ -269,21 +270,22 @@ class CaliperLocalClient {
Logger.info('Info: client ' + this.clientIndex + ' start test runDuration()' + (cb.info ? (':' + cb.info) : ''));
this.startTime = Date.now();

let promises = [];
// Use a circular array of Promises so that the Promise.all() call does not exceed the maximum permissable Array size
const circularArray = new CircularArray(this.maxTxPromises);
while ((Date.now() - this.startTime)/1000 < duration) {
// If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
promises.push(cb.run().then((result) => {
circularArray.add(cb.run().then((result) => {
this.addResult(result);
return Promise.resolve();
}));
});
await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats);
}

await Promise.all(promises);
await Promise.all(circularArray);
this.endTime = Date.now();
}

Expand Down Expand Up @@ -321,6 +323,7 @@ class CaliperLocalClient {
let cb = require(CaliperUtils.resolvePath(test.cb));

this.txUpdateTime = Config.get(Config.keys.TxUpdateTime, 5000);
this.maxTxPromises = Config.get(Config.keys.Worker.MaxTxPromises, 100);
const self = this;
let initUpdateInter = setInterval( () => { self.initUpdate(); } , self.txUpdateTime);

Expand Down

0 comments on commit 1aca87e

Please sign in to comment.