From b94efae98c553d9fb5259152e94cf483b874f2fb Mon Sep 17 00:00:00 2001 From: "nkl199@yahoo.co.uk" <nkl199@yahoo.co.uk> Date: Thu, 9 Apr 2020 12:24:43 +0100 Subject: [PATCH] Add a circular array of txPromises Signed-off-by: nkl199@yahoo.co.uk <nkl199@yahoo.co.uk> --- .../caliper-core/lib/common/config/Config.js | 3 +- .../lib/common/config/default.yaml | 1 + .../lib/common/utils/circular-array.js | 46 +++++++++++++++++++ .../lib/worker/client/caliper-local-client.js | 20 ++++---- 4 files changed, 60 insertions(+), 10 deletions(-) create mode 100644 packages/caliper-core/lib/common/utils/circular-array.js diff --git a/packages/caliper-core/lib/common/config/Config.js b/packages/caliper-core/lib/common/config/Config.js index 2a4011c98..dc3ce7add 100644 --- a/packages/caliper-core/lib/common/config/Config.js +++ b/packages/caliper-core/lib/common/config/Config.js @@ -91,7 +91,8 @@ const keys = { Communication: { Method: 'caliper-worker-communication-method', Address: 'caliper-worker-communication-address', - } + }, + MaxTxPromises: 'caliper-worker-maxtxpromises' }, Flow: { Skip: { diff --git a/packages/caliper-core/lib/common/config/default.yaml b/packages/caliper-core/lib/common/config/default.yaml index b4531f221..1b6605467 100644 --- a/packages/caliper-core/lib/common/config/default.yaml +++ b/packages/caliper-core/lib/common/config/default.yaml @@ -122,6 +122,7 @@ caliper: method: process # Address used for mqtt communications address: mqtt://localhost:1883 + maxtxpromises: 100 # Caliper flow options flow: # Skip options diff --git a/packages/caliper-core/lib/common/utils/circular-array.js b/packages/caliper-core/lib/common/utils/circular-array.js new file mode 100644 index 000000000..504ef24d5 --- /dev/null +++ b/packages/caliper-core/lib/common/utils/circular-array.js @@ -0,0 +1,46 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +/** + * Create an Array that has a maximum length and will overwrite existing entries when additional items are added to the array + */ +class CircularArray extends Array { + + /** + * Constructor + * @param {number} maxLength maximum length of array + */ + constructor(maxLength) { + super(); + this.pointer = 0; + this.maxLength = maxLength; + } + + /** + * Add entry into array + * @param {any} element the element to add to the array + */ + add(element) { + if (this.length === this.maxLength) { + this[this.pointer] = element; + } else { + this.push(element); + } + this.pointer = (this.pointer + 1) % this.maxLength; + } +} + +module.exports = CircularArray; diff --git a/packages/caliper-core/lib/worker/client/caliper-local-client.js b/packages/caliper-core/lib/worker/client/caliper-local-client.js index c06956dc6..73298fd33 100644 --- a/packages/caliper-core/lib/worker/client/caliper-local-client.js +++ b/packages/caliper-core/lib/worker/client/caliper-local-client.js @@ -16,6 +16,7 @@ const Config = require('../../common/config/config-util.js'); const CaliperUtils = require('../../common/utils/caliper-utils.js'); +const CircularArray = require('../../common/utils/circular-array'); const bc = require('../../common/core/blockchain.js'); const RateControl = require('../rate-control/rateControl.js'); const PrometheusClient = require('../../common/prometheus/prometheus-push-client'); @@ -37,6 +38,8 @@ class CaliperLocalClient { this.clientIndex = clientIndex; this.messenger = messenger; this.context = undefined; + this.txUpdateTime = Config.get(Config.keys.TxUpdateTime, 5000); + this.maxTxPromises = Config.get(Config.keys.Worker.MaxTxPromises, 100); // Internal stats this.results = []; @@ -240,13 +243,13 @@ class CaliperLocalClient { Logger.info('Info: client ' + this.clientIndex + ' start test runFixedNumber()' + (cb.info ? (':' + cb.info) : '')); this.startTime = Date.now(); - let promises = []; - while(this.txNum < number) { + const circularArray = new CircularArray(this.maxTxPromises); + while (this.txNum < number) { // If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises, // and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit: // https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/ await this.setImmediatePromise(() => { - promises.push(cb.run().then((result) => { + circularArray.add(cb.run().then((result) => { this.addResult(result); return Promise.resolve(); })); @@ -254,7 +257,7 @@ class CaliperLocalClient { await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats); } - await Promise.all(promises); + await Promise.all(circularArray); this.endTime = Date.now(); } @@ -269,13 +272,14 @@ class CaliperLocalClient { Logger.info('Info: client ' + this.clientIndex + ' start test runDuration()' + (cb.info ? (':' + cb.info) : '')); this.startTime = Date.now(); - let promises = []; + // Use a circular array of Promises so that the Promise.all() call does not exceed the maximum permissable Array size + const circularArray = new CircularArray(this.maxTxPromises); while ((Date.now() - this.startTime)/1000 < duration) { // If this function calls cb.run() too quickly, micro task queue will be filled with unexecuted promises, // and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit: // https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/ await this.setImmediatePromise(() => { - promises.push(cb.run().then((result) => { + circularArray.add(cb.run().then((result) => { this.addResult(result); return Promise.resolve(); })); @@ -283,7 +287,7 @@ class CaliperLocalClient { await rateController.applyRateControl(this.startTime, this.txNum, this.results, this.resultStats); } - await Promise.all(promises); + await Promise.all(circularArray); this.endTime = Date.now(); } @@ -320,7 +324,6 @@ class CaliperLocalClient { Logger.debug('prepareTest() with:', test); let cb = require(CaliperUtils.resolvePath(test.cb)); - this.txUpdateTime = Config.get(Config.keys.TxUpdateTime, 5000); const self = this; let initUpdateInter = setInterval( () => { self.initUpdate(); } , self.txUpdateTime); @@ -374,7 +377,6 @@ class CaliperLocalClient { this.beforeTest(test); - this.txUpdateTime = Config.get(Config.keys.TxUpdateTime, 1000); Logger.info('txUpdateTime: ' + this.txUpdateTime); const self = this; let txUpdateInter = setInterval( () => { self.txUpdate(); } , self.txUpdateTime);