diff --git a/packages/caliper-burrow/lib/burrowClientFactory.js b/packages/caliper-burrow/lib/burrowClientFactory.js index 0f8298aab2..9d2abd7fcf 100644 --- a/packages/caliper-burrow/lib/burrowClientFactory.js +++ b/packages/caliper-burrow/lib/burrowClientFactory.js @@ -28,12 +28,6 @@ class BurrowClientFactory { */ async spawnWorker() { const child = childProcess.fork(path.join(__dirname, './burrowClientWorker.js'), process.argv.slice(2), { env: process.env}); - - const msg = { - type: 'init' - }; - child.send(msg); - return child; } } diff --git a/packages/caliper-burrow/lib/burrowClientWorker.js b/packages/caliper-burrow/lib/burrowClientWorker.js index 048c6fd2c6..21632be256 100644 --- a/packages/caliper-burrow/lib/burrowClientWorker.js +++ b/packages/caliper-burrow/lib/burrowClientWorker.js @@ -14,7 +14,7 @@ 'use strict'; -const { MessageHandler } = require('@hyperledger/caliper-core'); +const { ConfigUtil, Messenger, MessageHandler } = require('@hyperledger/caliper-core'); const BurrowClient = require('./burrow'); /** @@ -25,16 +25,27 @@ const BurrowClient = require('./burrow'); * @async */ async function initHandler(context, message) { - return new BurrowClient(context.networkConfigPath, context.workspacePath); + return new BurrowClient(context.networkConfigPath, context.workspacePath, context.workerId); } -const handlerContext = new MessageHandler({ - init: initHandler -}); - /** - * Message handler + * Main process */ -process.on('message', async (message) => { - await MessageHandler.handle(handlerContext, message); -}); +async function main (){ + + // Create the message client using the specified type + const type = `${ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Method)}-worker`; + const messenger = new Messenger({type, sut: 'burrow'}); + await messenger.initialize(); + + // Create a handler context for this worker + const handlerContext = new MessageHandler({ + init: initHandler + }, messenger); + + // Pass to the messenger to configure + messenger.configure(handlerContext); + +} + +main(); diff --git a/packages/caliper-core/index.js b/packages/caliper-core/index.js index 7d4cfbc0e4..79001e21ad 100644 --- a/packages/caliper-core/index.js +++ b/packages/caliper-core/index.js @@ -21,4 +21,5 @@ module.exports.CaliperUtils = require('./lib/common/utils/caliper-utils'); module.exports.Version = require('./lib/common/utils/version'); module.exports.ConfigUtil = require('./lib/common/config/config-util'); module.exports.MessageHandler = require('./lib/worker/client/message-handler'); +module.exports.Messenger = require('./lib/common/messaging/messenger'); module.exports.CaliperEngine = require('./lib/master/caliper-engine'); diff --git a/packages/caliper-core/lib/common/config/Config.js b/packages/caliper-core/lib/common/config/Config.js index 13a6960671..0c97adfa43 100644 --- a/packages/caliper-core/lib/common/config/Config.js +++ b/packages/caliper-core/lib/common/config/Config.js @@ -43,8 +43,6 @@ const keys = { MachineConfig: 'caliper-machineconfig', BenchConfig: 'caliper-benchconfig', NetworkConfig: 'caliper-networkconfig', - ZooAddress: 'caliper-zooaddress', - ZooConfig: 'caliper-zooconfig', TxUpdateTime: 'caliper-txupdatetime', LoggingRoot: 'caliper-logging', Logging: { @@ -87,6 +85,14 @@ const keys = { }, Targets: 'caliper-logging-targets' }, + Worker: { + Remote: 'caliper-worker-remote', + PollInterval: 'caliper-worker-pollinterval', + Communication: { + Method: 'caliper-worker-communication-method', + Address: 'caliper-worker-communication-address', + } + }, Flow: { Skip: { Start : 'caliper-flow-skip-start', diff --git a/packages/caliper-core/lib/common/config/default.yaml b/packages/caliper-core/lib/common/config/default.yaml index ee4a476ab8..bc6bc57bd5 100644 --- a/packages/caliper-core/lib/common/config/default.yaml +++ b/packages/caliper-core/lib/common/config/default.yaml @@ -110,7 +110,18 @@ caliper: options: flags: a mode: 0666 - + # Worker options + worker: + # Indicate if workers are in distributed mode + remote: false + # Polling interval to use once created, in milliseconds + pollinterval: 5000 + # Worker communication details + communication: + # Method used (process | mqtt) + method: process + # Address used for mqtt communications + address: mqtt://localhost:1883 # Caliper flow options flow: # Skip options diff --git a/packages/caliper-core/lib/common/core/blockchain-interface.js b/packages/caliper-core/lib/common/core/blockchain-interface.js index afe5dfe517..2dd1dc11b7 100644 --- a/packages/caliper-core/lib/common/core/blockchain-interface.js +++ b/packages/caliper-core/lib/common/core/blockchain-interface.js @@ -41,11 +41,13 @@ class BlockchainInterface { } /** - * Perform required preparation for test clients - * @param {Number} number count of test clients - * @return {Promise} obtained material for test clients + * Retrieve required arguments for test workers, e.g. retrieve information from the adaptor that is generated during an admin phase such as contract installation. + * Information returned here is passed to the worker through the messaging protocol on test. + * @param {Number} number total count of test workers + * @return {Promise} array of obtained material for each test worker + * @async */ - async prepareClients (number) { + async prepareWorkerArguments(number) { let result = []; for(let i = 0 ; i< number ; i++) { result[i] = {}; // as default, return an empty object for each client @@ -61,9 +63,8 @@ class BlockchainInterface { * } * @param {String} name name of the context * @param {Object} args adapter specific arguments - * @param {Integer} clientIdx the client index */ - async getContext(name, args, clientIdx) { + async getContext(name, args) { throw new Error('getContext is not implemented for this blockchain system'); } diff --git a/packages/caliper-core/lib/common/core/blockchain.js b/packages/caliper-core/lib/common/core/blockchain.js index cee48fd095..0625d10064 100644 --- a/packages/caliper-core/lib/common/core/blockchain.js +++ b/packages/caliper-core/lib/common/core/blockchain.js @@ -46,13 +46,13 @@ class Blockchain { } /** - * Perform required preparation for test clients, e.g. enroll clients and obtain key pairs - * @param {Number} number count of test clients - * @return {Promise} array of obtained material for test clients - * @async + * Retrieve required arguments for test workers, e.g. retrieve information from the adaptor that is generated during an admin phase such as contract installation. + * Information returned here is passed to the worker through the messaging protocol on test. + * @param {Number} number total count of test workers + * @return {Promise} array of obtained material for each test worker */ - async prepareClients (number) { - return await this.bcObj.prepareClients(number); + async prepareWorkerArguments(number) { + return await this.bcObj.prepareWorkerArguments(number); } /** @@ -67,13 +67,11 @@ class Blockchain { * Get a context for subsequent operations, e.g. invoke smart contract or query state * @param {String} name name of the context * @param {Object} args adapter specific arguments - * @param {Integer} clientIdx the client index - * @param {Object} txFile the file information for reading or writing. * @return {Promise} obtained context object * @async */ - async getContext(name, args, clientIdx, txFile) { - return await this.bcObj.getContext(name, args, clientIdx, txFile); + async getContext(name, args) { + return await this.bcObj.getContext(name, args); } /** diff --git a/packages/caliper-core/lib/common/messaging/messenger-interface.js b/packages/caliper-core/lib/common/messaging/messenger-interface.js new file mode 100644 index 0000000000..6d86e38521 --- /dev/null +++ b/packages/caliper-core/lib/common/messaging/messenger-interface.js @@ -0,0 +1,77 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const Logger = require('../utils/caliper-utils').getLogger('messenger-base'); + + +/** + * Interface of messenger. Messenger implementations must follow a naming convention that is -observer.js so + * that they may be dynamically loaded in the WorkerOrchestrator and WorkerAdaptor + */ +class MessengerInterface { + + /** + * Set configuration details + * @param {object} configuration configuration details for the messenger + */ + constructor(configuration) { + this.configuration = configuration; + } + + /** + * Initialize the Messenger + * @async + */ + async initialize() { + this._throwNotImplementedError('initialize'); + } + + /** + * Configure the Messenger for use + * @async + */ + async configure() { + this._throwNotImplementedError('configure'); + } + + /** + * Send a message using the messenger + */ + send() { + this._throwNotImplementedError('send'); + } + + /** + * Get the UUID for the messenger + */ + getUUID() { + this._throwNotImplementedError('getUUID'); + } + + /** + * Logs and throws a "not implemented" error for the given function. + * @param {string} functionName The name of the function. + * @private + */ + _throwNotImplementedError(functionName) { + let msg = `The function "${functionName}" is not implemented for this messenger`; + Logger.error(msg); + throw new Error(msg); + } + +} + +module.exports = MessengerInterface; diff --git a/packages/caliper-core/lib/common/messaging/messenger.js b/packages/caliper-core/lib/common/messaging/messenger.js new file mode 100644 index 0000000000..d4cc8b1dfb --- /dev/null +++ b/packages/caliper-core/lib/common/messaging/messenger.js @@ -0,0 +1,100 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +'use strict'; + +const CaliperUtils = require('..//utils/caliper-utils'); +const Logger = CaliperUtils.getLogger('messenger.js'); + +const builtInMessengers = new Map([ + ['mqtt-master', './mqtt-master.js'], + ['mqtt-worker', './mqtt-worker.js'], + ['process-master', './process-master.js'], + ['process-worker', './process-worker.js'] +]); + +const Messenger = class { + + /** + * Instantiates the proxy messenger and creates the configured messenger behind it. + * @param {object} configuration The messenger configuration object. + */ + constructor(configuration) { + this.configuration = configuration; + + Logger.info(`Creating messenger of type "${configuration.type}" ${configuration.sut ? ` for SUT ${configuration.sut}` : ''}`); + + // resolve the type to a module path + let modulePath = builtInMessengers.has(configuration.type) + ? builtInMessengers.get(configuration.type) : CaliperUtils.resolvePath(configuration.type); // TODO: what if it's an external module name? + + let factoryFunction = require(modulePath).createMessenger; + if (!factoryFunction) { + throw new Error(`${configuration.type} does not export the mandatory factory function 'createMessenger'`); + } + + this.messenger = factoryFunction(configuration); + } + + /** + * Initialize the Messenger + * @async + */ + async initialize() { + await this.messenger.initialize(); + } + + /** + * Configure the Messenger for use + * @param {object} configurationObject configuration object + * @async + */ + async configure(configurationObject) { + await this.messenger.configure(configurationObject); + } + + /** + * Get the UUID for the messenger + * @returns {string} the UUID of the messenger + */ + getUUID() { + return this.messenger.getUUID(); + } + + /** + * Method used to publish message to worker clients + * @param {string[]} to string array of workers that the update is intended for + * @param {*} type the string type of the update + * @param {*} data data pertinent to the update type + */ + send(to, type, data) { + // Create Date object for timestamp + const date = new Date(); + + // Augment data object with type + data.type = type; + + // Create complete message + const message = { + to, + from: this.messenger.getUUID(), + timestamp: date.toISOString(), + data + }; + + this.messenger.send(message); + } + +}; + +module.exports = Messenger; diff --git a/packages/caliper-core/lib/common/messaging/mqtt-master.js b/packages/caliper-core/lib/common/messaging/mqtt-master.js new file mode 100644 index 0000000000..bef963a318 --- /dev/null +++ b/packages/caliper-core/lib/common/messaging/mqtt-master.js @@ -0,0 +1,125 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const MessengerInterface = require('./messenger-interface'); +const Logger = require('../utils/caliper-utils').getLogger('mqtt-master-messenger'); +const ConfigUtil = require('../config/config-util'); + +const mqtt = require('mqtt'); + +/** + * Messenger that is based on an mqtt implementation + */ +class MqttMasterMessenger extends MessengerInterface { + + /** + * Constructor for MQTT Master + * @param {object} configuration configuration details + */ + constructor(configuration) { + super(configuration); + this.configuration.address = ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Address); + } + + /** + * Initialize the Messenger + * @async + */ + async initialize() { + Logger.info('Initializing MQTT messenger ... '); + + const messengerConnectedPromise = new Promise((resolve, reject) => { + this.messengerConnectedPromise = { + resolve: resolve, + reject: reject + }; + }); + + this.mqttClient = mqtt.connect(this.configuration.address); + + this.mqttClient.on('connect', () => { + Logger.info(`Connected to mqtt broker with clientID: ${this.mqttClient.options.clientId}`); + + // Subscribe to the topic that workers publish to + this.mqttClient.subscribe('worker/update'); + + this.connected = true; + this.messengerConnectedPromise.resolve(); + }); + + this.mqttClient.on('error', (error) => { + if (this.connected) { + Logger.error('MQTT Message error: ', error); + } else { + this.messengerConnectedPromise.reject(error); + } + }); + + await messengerConnectedPromise; + } + + /** + * Configure the Messenger with the worker orchestrator + * @param {WorkerOrchestrator} orchestrator the worker orchestrator + */ + configure(orchestrator) { + this.mqttClient.on('message', (topic, message) => { + switch (topic) { + case 'worker/update': + Logger.debug(`Processing message from 'worker/update' topic and message: ${message}`); + orchestrator.processWorkerUpdate(JSON.parse(message)); + break; + default: + Logger.warn(`No conditional for topic: ${topic}`); + } + }); + } + + /** + * Get the client UUID + * @returns {string} the UUID of the MQTT client connection instance + */ + getUUID() { + if (this.mqttClient) { + return this.mqttClient.options.clientId; + } else { + return undefined; + } + } + + /** + * Send a message using the messenger + * @param {object} message the message to send + */ + send(message) { + // Convert to string and send + const msg = JSON.stringify(message); + this.mqttClient.publish('master/update', msg); + Logger.debug(`Published message: ${msg}`); + } + +} + +/** + * Creates a new MqttMasterMessenger instance. + * @param {object} messengerConfig the messenger configuration + * @return {MqttMessenger} The MqttMasterMessenger instance. + */ +function createMessenger(messengerConfig) { + return new MqttMasterMessenger(messengerConfig); +} + +module.exports.createMessenger = createMessenger; diff --git a/packages/caliper-core/lib/common/messaging/mqtt-worker.js b/packages/caliper-core/lib/common/messaging/mqtt-worker.js new file mode 100644 index 0000000000..d248d72cfa --- /dev/null +++ b/packages/caliper-core/lib/common/messaging/mqtt-worker.js @@ -0,0 +1,134 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const MessengerInterface = require('./messenger-interface'); +const Logger = require('../utils/caliper-utils').getLogger('mqtt-worker-messenger'); +const ConfigUtil = require('../config/config-util'); +const MessageHandler = require('../../worker/client/message-handler'); + +const mqtt = require('mqtt'); + +/** + * Messenger that is based on an mqtt implementation + */ +class MqttWorkerMessenger extends MessengerInterface { + + /** + * Constructor for MQTT Worker + * @param {object} configuration configuration details + */ + constructor(configuration) { + super(configuration); + this.configuration.address = ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Address); + } + + /** + * Initialize the Messenger + * @async + */ + async initialize() { + Logger.info('Initializing MQTT messenger ... '); + + const messengerConnectedPromise = new Promise((resolve, reject) => { + this.messengerConnectedPromise = { + resolve: resolve, + reject: reject + }; + }); + + this.mqttClient = mqtt.connect(this.configuration.address); + + this.mqttClient.on('connect', () => { + const clientId = this.mqttClient.options.clientId; + Logger.info(`${this.configuration.sut} worker connected with mqtt clientId ${clientId}`); + + // Subscribe to the update topic + Logger.info(`${this.configuration.sut} worker with mqtt clientId ${clientId} subscribing to topic "master/update"`); + this.mqttClient.subscribe('master/update'); + + // resolve promise so that connection can complete + this.connected = true; + this.messengerConnectedPromise.resolve(); + }); + + this.mqttClient.on('error', (error) => { + if (this.connected) { + Logger.error('MQTT Message error: ', error); + } else { + this.messengerConnectedPromise.reject(error); + } + }); + + await messengerConnectedPromise; + } + + /** + * Get the client UUID + * @returns {string} the UUID of the MQTT client connection instance + */ + getUUID() { + if (this.mqttClient) { + return this.mqttClient.options.clientId; + } else { + Logger.error('No client constructed'); + return undefined; + } + } + + /** + * Send a message using the messenger + * @param {object} message the message to send + */ + send(message) { + // Convert to string and send + const msg = JSON.stringify(message); + this.mqttClient.publish('worker/update', msg); + Logger.debug(`${this.configuration.sut} worker published message: ${msg}`); + } + + /** + * Configure the Messenger for use + * @param {MessageHandler} handlerContext a configured message handler + */ + configure(handlerContext) { + this.mqttClient.on('message', async (topic, message) => { + switch (topic) { + case 'master/update':{ + const msg = JSON.parse(message); + // Only action if intended for this client + if (msg.to.includes(this.getUUID()) || msg.to.includes('all')) { + await MessageHandler.handle(handlerContext, msg.data); + } + break; + } + default: + Logger.info(`No conditional for topic: ${topic}`); + } + }); + } + +} + +/** + * Creates a new MqttWorkerMessenger instance. + * @param {object} messengerConfig the messenger configuration + * @return {MqttMessenger} The MqttWorkerMessenger instance. + */ +function createMessenger(messengerConfig) { + return new MqttWorkerMessenger(messengerConfig); +} + +module.exports.createMessenger = createMessenger; diff --git a/packages/caliper-core/lib/common/messaging/process-master.js b/packages/caliper-core/lib/common/messaging/process-master.js new file mode 100644 index 0000000000..cdce5b1cde --- /dev/null +++ b/packages/caliper-core/lib/common/messaging/process-master.js @@ -0,0 +1,88 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const MessengerInterface = require('./messenger-interface'); +const Logger = require('../utils/caliper-utils').getLogger('process-master-messenger'); + + +/** + * Messenger that is based on an process implementation + */ +class ProcessMasterMessenger extends MessengerInterface { + + /** + * Constructor for process Master + * @param {object} configuration configuration details + */ + constructor(configuration) { + super(configuration); + } + + /** + * Initialize the Messenger + * @async + */ + async initialize() { + return Promise.resolve(); + } + + /** + * Configure the Messenger with the worker orchestrator + * @param {WorkerOrchestrator} orchestrator the worker orchestrator + */ + configure(orchestrator) { + this.workerProcesses = orchestrator.workerObjects; + for (const workerProcess of this.workerProcesses) { + workerProcess.on('message', function(message) { + Logger.debug(`Processing worker message: ${message}`); + orchestrator.processWorkerUpdate(JSON.parse(message)); + }); + } + } + + /** + * Get the client UUID + * @returns {string} the UUID of the process + */ + getUUID() { + return process.pid.toString(); + } + + /** + * Send a message using the messenger + * @param {object} message the message to send + */ + send(message) { + // Convert to string and send + const msg = JSON.stringify(message); + for (const workerProcess of this.workerProcesses) { + workerProcess.send(msg); + Logger.debug(`Sent message: ${msg}`); + } + } + +} + +/** + * Creates a new ProcessMasterMessenger instance. + * @param {object} messengerConfig the messenger configuration + * @return {ProcessMasterMessenger} The ProcessMasterMessenger instance. + */ +function createMessenger(messengerConfig) { + return new ProcessMasterMessenger(messengerConfig); +} + +module.exports.createMessenger = createMessenger; diff --git a/packages/caliper-core/lib/common/messaging/process-worker.js b/packages/caliper-core/lib/common/messaging/process-worker.js new file mode 100644 index 0000000000..de10dc1bb5 --- /dev/null +++ b/packages/caliper-core/lib/common/messaging/process-worker.js @@ -0,0 +1,90 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const MessengerInterface = require('./messenger-interface'); +const Logger = require('../utils/caliper-utils').getLogger('mqtt-worker-messenger'); +const MessageHandler = require('../../worker/client/message-handler'); + +/** + * Messenger that is based on a process + */ +class ProcessWorkerMessenger extends MessengerInterface { + + /** + * Constructor for process Worker + * @param {object} configuration configuration details + */ + constructor(configuration) { + super(configuration); + } + + /** + * Initialize the Messenger + * @async + */ + async initialize() { + return Promise.resolve(); + } + + /** + * Get the client UUID + * @returns {string} the UUID of the process + */ + getUUID() { + return process.pid.toString(); + } + + /** + * Send a message using the messenger + * @param {object} message the message to send + */ + send(message) { + // Convert to string and send + const msg = JSON.stringify(message); + process.send(msg); + Logger.debug(`${this.configuration.sut} worker sent message: ${msg}`); + } + + /** + * Configure the Messenger for use + * @param {MessageHandler} handlerContext a configured message handler + */ + configure(handlerContext) { + + const uuid = this.getUUID(); + const sut = this.configuration.sut; + process.on('message', async function(message) { + Logger.debug(`${sut} worker processing orchestrator message: ${message}`); + const msg = JSON.parse(message); + // Only action if intended for this client + if (msg.to.includes(uuid) || msg.to.includes('all')) { + await MessageHandler.handle(handlerContext, msg.data); + } + }); + } + +} + +/** + * Creates a new ProcessWorkerMessenger instance. + * @param {object} messengerConfig the messenger configuration + * @return {MqttMessenger} The ProcessWorkerMessenger instance. + */ +function createMessenger(messengerConfig) { + return new ProcessWorkerMessenger(messengerConfig); +} + +module.exports.createMessenger = createMessenger; diff --git a/packages/caliper-core/lib/common/prometheus/prometheus-query-client.js b/packages/caliper-core/lib/common/prometheus/prometheus-query-client.js index f2130eea70..21c2617a97 100644 --- a/packages/caliper-core/lib/common/prometheus/prometheus-query-client.js +++ b/packages/caliper-core/lib/common/prometheus/prometheus-query-client.js @@ -109,7 +109,7 @@ class PrometheusQueryClient { if (response.status.localeCompare('success') === 0 ) { return response; } else { - Logger.error(`Prometheus query to ${this.requestParams.href} failed`); + Logger.error(`Prometheus query to ${targetParams.requestParams.href} failed with: ${JSON.stringify(response)}`); return null; } } catch (error) { diff --git a/packages/caliper-core/lib/common/utils/caliper-utils.js b/packages/caliper-core/lib/common/utils/caliper-utils.js index 28d9db637d..996c0435a3 100644 --- a/packages/caliper-core/lib/common/utils/caliper-utils.js +++ b/packages/caliper-core/lib/common/utils/caliper-utils.js @@ -235,13 +235,16 @@ class CaliperUtils { /** * Executes the given command asynchronously. * @param {string} command The command to execute through a newly spawn shell. + * @param {boolean} logAction Boolean flag to inform the command being run, default true * @return {Promise} The return promise is resolved upon the successful execution of the command, or rejected with an Error instance. * @async */ - static execAsync(command) { + static execAsync(command, logAction = true) { const logger = CaliperUtils.getLogger('caliper-utils'); return new Promise((resolve, reject) => { - logger.info(`Executing command: ${command}`); + if (logAction) { + logger.info(`Executing command: ${command}`); + } let child = exec(command, (err, stdout, stderr) => { if (err) { logger.error(`Unsuccessful command execution. Error code: ${err.code}. Terminating signal: ${err.signal}`); @@ -347,6 +350,7 @@ class CaliperUtils { return flowOpts; } + } module.exports = CaliperUtils; diff --git a/packages/caliper-core/lib/master/caliper-engine.js b/packages/caliper-core/lib/master/caliper-engine.js index 3ad7089375..6351e7ed38 100644 --- a/packages/caliper-core/lib/master/caliper-engine.js +++ b/packages/caliper-core/lib/master/caliper-engine.js @@ -96,6 +96,7 @@ class CaliperEngine { const blockchainWrapper = new Blockchain(this.blockchainAdapter); try { + // Conditional running of 'start' commands if (!flowOpts.performStart) { logger.info('Skipping start commands due to benchmark flow conditioning'); @@ -143,9 +144,9 @@ class CaliperEngine { if (!flowOpts.performTest) { logger.info('Skipping benchmark test phase due to benchmark flow conditioning'); } else { - let numberSet = this.benchmarkConfig.test && this.benchmarkConfig.test.clients && this.benchmarkConfig.test.clients.number; - let numberOfClients = numberSet ? this.benchmarkConfig.test.clients.number : 1; - let workerArguments = await blockchainWrapper.prepareClients(numberOfClients); + let numberSet = this.benchmarkConfig.test && this.benchmarkConfig.test.workers && this.benchmarkConfig.test.workers.number; + let numberOfWorkers = numberSet ? this.benchmarkConfig.test.workers.number : 1; + let workerArguments = await blockchainWrapper.prepareWorkerArguments(numberOfWorkers); const roundOrchestrator = new RoundOrchestrator(this.benchmarkConfig, this.networkConfig, this.workerFactory, workerArguments); await roundOrchestrator.run(); diff --git a/packages/caliper-core/lib/master/client/client-orchestrator.js b/packages/caliper-core/lib/master/client/client-orchestrator.js deleted file mode 100644 index f7499cf9cb..0000000000 --- a/packages/caliper-core/lib/master/client/client-orchestrator.js +++ /dev/null @@ -1,324 +0,0 @@ -/* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - - -'use strict'; - -const util = require('../../common/utils/caliper-utils'); - -const logger = util.getLogger('client.js'); - -/** - * Class for Client Orchestration - */ -class ClientOrchestrator { - /** - * Constructor - * @param {object} benchmarkConfig The benchmark configuration object. - * @param {object} workerFactory The factory for the worker processes. - * @param {object[]} workerArguments Array of arbitrary arguments to pass to the worker processes. - */ - constructor(benchmarkConfig, workerFactory, workerArguments) { - this.config = benchmarkConfig.test.clients; - this.workerFactory = workerFactory; - this.workerArguments = workerArguments; - - if(this.config.hasOwnProperty('number')) { - this.number = this.config.number; - } else { - this.number = 1; - } - - logger.info(`Configured number of worker processes: ${this.number}`); - - this.updates = {id:0, data:[]}; // contains txUpdated messages - this.processes = {}; - } - - /** - * Start the test - * message = { - * type: 'test', - * label : label name, - * numb: total number of simulated txs, - * rateControl: rate controller to use - * trim: trim options - * args: user defined arguments, - * cb : path of the callback js file, - * config: path of the blockchain config file // TODO: how to deal with the local config file when transfer it to a remote client (via zookeeper), as well as any local materials like crypto keys?? - * }; - * @param {JSON} test test specification - * @returns {Object[]} the test results array - * @async - */ - async startTest(test) { - this.updates.data = []; - this.updates.id++; - - const results = []; - await this._startTest(this.number, test, this.updates.data, results); - const testOutput = this.formatResults(results); - return testOutput; - } - - /** - * Start a test - * @param {Number} number test clients' count - * @param {JSON} test test specification - * @param {Array} updates array to save txUpdate results - * @param {Array} results array to save the test results - * @async - */ - async _startTest(number, test, updates, results) { - - // Conditionally launch clients on the test round. Subsequent tests should re-use existing clients. - if (Object.keys(this.processes).length === 0) { - // launch clients - const readyPromises = []; - this.processes = {}; - for (let i = 0 ; i < number ; i++) { - this.launchClient(updates, results, readyPromises); - } - // wait for all clients to have initialized - logger.info(`Waiting for ${readyPromises.length} clients to be ready... `); - - await Promise.all(readyPromises); - - logger.info(`${readyPromises.length} clients ready, starting test phase`); - } else { - logger.info(`Existing ${Object.keys(this.processes).length} clients will be reused in next test round... `); - } - - let txPerClient; - let totalTx = test.numb; - if (test.numb) { - // Run specified number of transactions - txPerClient = Math.floor(test.numb / number); - - // trim should be based on client number if specified with txNumber - if (test.trim) { - test.trim = Math.floor(test.trim / number); - } - - if(txPerClient < 1) { - txPerClient = 1; - } - test.numb = txPerClient; - } else if (test.txDuration) { - // Run for time specified txDuration based on clients - // Do nothing, we run for the time specified within test.txDuration - } else { - throw new Error('Unconditioned transaction rate driving mode'); - } - - let promises = []; - let idx = 0; - for (let id in this.processes) { - let client = this.processes[id]; - let p = new Promise((resolve, reject) => { - client.obj.promise = { - resolve: resolve, - reject: reject - }; - }); - promises.push(p); - client.results = results; - client.updates = updates; - test.clientArgs = this.workerArguments[idx]; - test.clientIdx = idx; - test.totalClients = number; - - if(totalTx % number !== 0 && idx === number-1){ - test.numb = totalTx - txPerClient*(number - 1); - } - - // send test specification to client and update idx - client.obj.send(test); - idx++; - } - - await Promise.all(promises); - // clear promises - for (let client in this.processes) { - if (client.obj && client.ob.promise) { - delete client.obj.promise; - } - } - } - - /** - * Stop all test clients(child processes) - */ - stop() { - for (let pid in this.processes) { - this.processes[pid].obj.kill(); - } - this.processes = {}; - } - - /** - * Get the update array - * @return {Array} update array - */ - getUpdates() { - return this.updates; - } - - /** - * Call the Promise function for a process - * @param {String} pid pid of the client process - * @param {Boolean} isResolve indicates resolve(true) or reject(false) - * @param {Object} msg input for the Promise function - * @param {Boolean} isReady indicates promise type ready(true) promise(false) - */ - setPromise(pid, isResolve, msg, isReady) { - const client = this.processes[pid]; - if (client) { - const type = isReady ? 'ready' : 'promise'; - const clientObj = client.obj; - if(clientObj && clientObj[type] && typeof clientObj[type] !== 'undefined') { - if(isResolve) { - clientObj[type].resolve(msg); - } - else { - clientObj[type].reject(msg); - } - } else { - throw new Error('Unconditioned case within setPromise()'); - } - } - } - - /** - * Push test result from a child process into the global array - * @param {String} pid pid of the child process - * @param {Object} data test result - */ - pushResult(pid, data) { - let p = this.processes[pid]; - if (p && p.results && typeof p.results !== 'undefined') { - p.results.push(data); - } - } - - /** - * Push update value from a child process into the global array - * @param {String} pid pid of the child process - * @param {Object} data update value - */ - pushUpdate(pid, data) { - let p = this.processes[pid]; - if (p && p.updates && typeof p.updates !== 'undefined') { - p.updates.push(data); - } - } - - /** - * Launch a client process to do the test - * @param {Array} updates array to save txUpdate results - * @param {Array} results array to save the test results - * @param {Array} readyPromises array to hold ready promises - */ - launchClient(updates, results, readyPromises) { - let client = this.workerFactory.spawnWorker(); - let pid = client.pid.toString(); - - logger.info('Launching client with PID ', pid); - this.processes[pid] = {obj: client, results: results, updates: updates}; - - let p = new Promise((resolve, reject) => { - client.ready = { - resolve: resolve, - reject: reject - }; - }); - - readyPromises.push(p); - - const self = this; - client.on('message', function(msg) { - if (msg.type === 'ready') { - logger.info('Client ready message received'); - self.setPromise(pid, true, null, true); - } else if (msg.type === 'txUpdated') { - self.pushUpdate(pid, msg.data); - } else if (msg.type === 'txReset') { - self.pushUpdate(pid, msg.data); - } else if (msg.type === 'testResult') { - self.pushResult(pid, msg.data); - self.setPromise(pid, true, null); - } else if (msg.type === 'error') { - self.setPromise(pid, false, new Error('Client encountered error:' + msg.data)); - } else { - self.setPromise(pid, false, new Error('Client returned unexpected message type:' + msg.type)); - } - }); - - client.on('error', function() { - self.setPromise(pid, false, new Error('Client encountered unexpected error')); - }); - - client.on('exit', function(code, signal) { - logger.info(`Client exited with code ${code}`); - self.setPromise(pid, false, new Error('Client already exited')); - }); - } - - /** - * Send message to all child processes - * @param {JSON} message message - * @return {Number} number of child processes - */ - sendMessage(message) { - for (let pid in this.processes) { - this.processes[pid].obj.send(message); - } - return this.processes.length; - } - - /** - * Format the final test results for subsequent consumption from [ {result: [], start: val, end: val}, {result: [], start: val, end: val}, {result: [], start: val, end: val}] - * to {results: [val, val], start: val, end: val} - * @param {JSON[]} results an Array of JSON objects - * @return {JSON} an appropriately formatted result - */ - formatResults(results) { - - let resultArray = []; - let allStartedTime = null; - let allFinishedTime = null; - for (const clientResult of results){ - // Start building the array of all client results - resultArray = resultArray.concat(clientResult.results); - - // Track all started/complete times - if (!allStartedTime || clientResult.start > allStartedTime) { - allStartedTime = clientResult.start; - } - - if (!allFinishedTime || clientResult.end < allFinishedTime) { - allFinishedTime = clientResult.end; - } - } - - return { - results: resultArray, - start: allStartedTime, - end: allFinishedTime - }; - } - -} - -module.exports = ClientOrchestrator; diff --git a/packages/caliper-core/lib/master/monitor/monitor-docker.js b/packages/caliper-core/lib/master/monitors/monitor-docker.js similarity index 100% rename from packages/caliper-core/lib/master/monitor/monitor-docker.js rename to packages/caliper-core/lib/master/monitors/monitor-docker.js diff --git a/packages/caliper-core/lib/master/monitor/monitor-interface.js b/packages/caliper-core/lib/master/monitors/monitor-interface.js similarity index 100% rename from packages/caliper-core/lib/master/monitor/monitor-interface.js rename to packages/caliper-core/lib/master/monitors/monitor-interface.js diff --git a/packages/caliper-core/lib/master/monitor/monitor-orchestrator.js b/packages/caliper-core/lib/master/monitors/monitor-orchestrator.js similarity index 100% rename from packages/caliper-core/lib/master/monitor/monitor-orchestrator.js rename to packages/caliper-core/lib/master/monitors/monitor-orchestrator.js diff --git a/packages/caliper-core/lib/master/monitor/monitor-process.js b/packages/caliper-core/lib/master/monitors/monitor-process.js similarity index 100% rename from packages/caliper-core/lib/master/monitor/monitor-process.js rename to packages/caliper-core/lib/master/monitors/monitor-process.js diff --git a/packages/caliper-core/lib/master/monitor/monitor-prometheus.js b/packages/caliper-core/lib/master/monitors/monitor-prometheus.js similarity index 99% rename from packages/caliper-core/lib/master/monitor/monitor-prometheus.js rename to packages/caliper-core/lib/master/monitors/monitor-prometheus.js index 2b475fb9f2..19ec03143e 100644 --- a/packages/caliper-core/lib/master/monitor/monitor-prometheus.js +++ b/packages/caliper-core/lib/master/monitors/monitor-prometheus.js @@ -17,7 +17,7 @@ const Util = require('../../common/utils/caliper-utils.js'); const ConfigUtil = require('../../common/config/config-util'); const ChartBuilder = require('../charts/chart-builder'); -const Logger = Util.getLogger('monitor-prometheus.js'); +const Logger = Util.getLogger('monitor-prometheus'); const MonitorInterface = require('./monitor-interface'); const PrometheusPushClient = require('../../common/prometheus/prometheus-push-client'); const PrometheusQueryClient = require('../../common/prometheus/prometheus-query-client'); diff --git a/packages/caliper-core/lib/master/monitor/monitor-utilities.js b/packages/caliper-core/lib/master/monitors/monitor-utilities.js similarity index 100% rename from packages/caliper-core/lib/master/monitor/monitor-utilities.js rename to packages/caliper-core/lib/master/monitors/monitor-utilities.js diff --git a/packages/caliper-core/lib/master/orchestrators/worker-orchestrator.js b/packages/caliper-core/lib/master/orchestrators/worker-orchestrator.js new file mode 100644 index 0000000000..56f4ef4f10 --- /dev/null +++ b/packages/caliper-core/lib/master/orchestrators/worker-orchestrator.js @@ -0,0 +1,578 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + +'use strict'; + +const CaliperUtils = require('../../common/utils/caliper-utils'); +const ConfigUtils = require('../../common/config/config-util'); +const logger = CaliperUtils.getLogger('worker-orchestrator'); +const Messenger = require('../../common/messaging/messenger'); + + +// Orchestrator message typings +const TYPES = { + REGISTER: 'register', + ASSIGN: 'assignId', + INITIALIZE: 'initialize', + PREPARE: 'prepare', + TEST: 'test', +}; + +// Add in worker message typings +TYPES.CONNECTED = 'connected'; +TYPES.ASSIGNED = 'assigned'; +TYPES.READY = 'ready'; +TYPES.PREPARED = 'prepared'; +TYPES.TXUPDATE = 'txUpdate'; +TYPES.TESTRESULT = 'testResult'; +TYPES.TXRESET = 'txReset'; + +/** + * Class for Worker Orchestration + */ +class WorkerOrchestrator { + /** + * Constructor + * @param {object} benchmarkConfig The benchmark configuration object. + * @param {object} workerFactory The factory for the worker processes. + * @param {object[]} workerArguments List of adaptor specific arguments to pass for each worker processes. + */ + constructor(benchmarkConfig, workerFactory, workerArguments) { + this.config = benchmarkConfig.test.workers; + this.workerFactory = workerFactory; + this.workerArguments = workerArguments; + + this.workers = {}; + this.workerObjects = []; + this.updates = {id:0, data:[]}; // contains txUpdated messages + this.results = []; // cumulative results + + // Messenger information + const type = `${ConfigUtils.get(ConfigUtils.keys.Worker.Communication.Method)}-master`; + this.messenger = new Messenger({type}); + this.messengerConfigured = false; + this.workerPollingInterval = ConfigUtils.get(ConfigUtils.keys.Worker.PollInterval); + + // Worker information + this.workersRemote = ConfigUtils.get(ConfigUtils.keys.Worker.Remote); + this.workersConnected = false; + this.workersAssigned = false; + this.workersReady = false; + + if (this.config.hasOwnProperty('number')) { + this.number = this.config.number; + } else { + this.number = 1; + } + + // Global promises used for setup + this.brokerConnectedPromise = {}; + this.workersConnectedPromise = {}; + this.workersReadyPromise = {}; + } + + /** + * Configure the messaging service implementation + * @async + */ + async configureMessenger() { + await this.messenger.initialize(); + const self = this; + await this.messenger.configure(self); + } + + /** + * Send a global message to topic, indicating that orchestrator is pending worker registration + */ + pollForWorkers() { + this.messenger.send(['all'], TYPES.REGISTER, {}); + } + + /** + * Process a worker message + * @param {object} message the worker message to process + */ + processWorkerUpdate(message) { + switch (message.data.type) { + case 'connected': + logger.debug(`Dealing with connected message ${JSON.stringify(message)}`); + this.updateWorkerPhase(message.from, TYPES.CONNECTED, message.data); + break; + case 'assigned': + logger.debug(`Dealing with assigned message ${JSON.stringify(message)}`); + this.updateWorkerPhase(message.from, TYPES.ASSIGNED, message.data); + break; + case 'ready': + logger.debug(`Dealing with ready message ${JSON.stringify(message)}`); + this.updateWorkerPhase(message.from, TYPES.READY, message.data); + break; + case 'prepared': + logger.debug(`Dealing with prepared message ${JSON.stringify(message)}`); + this.updateWorkerPhase(message.from, TYPES.PREPARED, message.data); + break; + case 'txUpdate': + logger.debug('Dealing with txUpdate message'); + this.pushUpdate(message.from, message.data); + break; + case 'testResult': + logger.debug('Dealing with testResult message'); + this.pushResult(message.from, message.data); + this.updateWorkerPhase(message.from, TYPES.TESTRESULT, {}); + break; + case 'txReset': + logger.debug('Dealing with txReset message'); + this.pushUpdate(message.from, message.data); + break; + default:{ + const msg = `processWorkerUpdate passed unknown message type "${message.type}" by worker ${message.from}`; + logger.error(msg); + throw new Error(msg); + } + } + } + + /** + * Update a worker readiness phase + * @param {number} workerId the worker identifier + * @param {string} phase the phase the update relates to + * @param {object} data data object passed within the worker message + */ + updateWorkerPhase(workerId, phase, data) { + + logger.debug(`Handling ${phase} message from ${workerId} with data ${JSON.stringify(data)}`); + switch (phase) { + case TYPES.CONNECTED:{ + if (data.error) { + this.workersConnectedPromise.reject(new Error(data.error)); + } else { + const phases = {}; + phases[TYPES.ASSIGNED] = false; + phases[TYPES.READY] = false; + phases[TYPES.PREPARED] = {}; + phases[TYPES.TESTRESULT] = {}; + + // Add worker to this.workers + // - updates array to save txUpdate results + // - results array to save the test results + // - phases used to track worker object status + this.workers[workerId] = {results: this.results, updates: this.updates, phases}; + + // Only check if all workers are connected + if (this.allWorkersReachedPhase(phase)) { + clearInterval(this.pollingInterval); + this.workersConnectedPromise.resolve(); + } + } + break; + } + case TYPES.ASSIGNED: + if (!this.workers[workerId]) { + logger.warn(`Discarding ${phase} message from unregistered client ${workerId}`); + } else if (data.error) { + this.workersAssignedPromise.reject(new Error(data.error)); + } else { + this.workers[workerId].phases[phase] = true; + // Only check if all workers are connected + if (this.allWorkersReachedPhase(phase)) { + this.workersAssignedPromise.resolve(); + } + } + break; + case TYPES.READY: + if (!this.workers[workerId]) { + logger.warn(`Discarding ${phase} message from unregistered client ${workerId}`); + } else if (data.error) { + this.workersReadyPromise.reject(new Error(data.error)); + } else { + this.workers[workerId].phases[phase] = true; + // Only check if all workers are connected + if (this.allWorkersReachedPhase(phase)) { + this.workersReadyPromise.resolve(); + } + } + break; + case TYPES.PREPARED: + if (!this.workers[workerId]) { + logger.warn(`Discarding ${phase} message from unregistered client ${workerId}`); + } else if (data.error) { + this.workers[workerId].phases[phase].reject(new Error(data.error)); + } else { + this.workers[workerId].phases[phase].resolve(); + } + break; + case TYPES.TESTRESULT: + if (!this.workers[workerId]) { + logger.warn(`Discarding ${phase} message from unregistered client ${workerId}`); + } else if (data.error) { + this.workers[workerId].phases[phase].reject(new Error(data.error)); + } else { + this.workers[workerId].phases[phase].resolve(); + } + break; + default: + throw new Error(`updateWorkerPhase passed unknown phase ${phase} by worker ${workerId}`); + } + } + + /** + * Check if all workers have notified that they are ready + * @param {string} phase the worker phase to be checked + * @returns {boolean} boolean true if all worker processes have notified of being ready; otherwise false + */ + allWorkersReachedPhase(phase) { + switch (phase) { + case TYPES.CONNECTED: + return this.number === Object.keys(this.workers).length; + case TYPES.ASSIGNED: + case TYPES.READY: { + const pendingWorkers = []; + for (const workerId in this.workers) { + const worker = this.workers[workerId]; + if (!worker.phases[phase]) { + pendingWorkers.push(workerId); + } + } + // Debug logging of notifications + if (pendingWorkers.length === 0) { + logger.debug(`All workers completed phase ${phase}`); + } else { + logger.debug(`Pending ready messages from workers: [${pendingWorkers}]`); + } + return pendingWorkers.length === 0; + } + default: + throw new Error(`allWorkersReachedPhase passed unknown phase ${phase}`); + } + + } + + /** + * Prepare the test + * message = { + * type: 'test', + * label : label name, + * numb: total number of simulated txs, + * txDuration: time duration of test, + * rateControl: rate controller to use, + * trim: trim options, + * args: user defined arguments, + * cb : path of the callback js file, + * config: path of the blockchain config file + * }; + * @param {JSON} test test specification + * @async + */ + async prepareTestRound(test) { + + // conditionally launch workers - they might exist in containers and not launched as processes + if (!this.workersRemote && !this.workersLaunched) { + for (let number = 1 ; number <= this.number ; number++) { + this.launchWorker(number); + } + this.workersLaunched = true; + } + + // Conditionally configure the messenger + if (!this.messengerConfigured) { + logger.info('Messenger not configured, entering configure phase...'); + await this.configureMessenger(); + this.messengerConfigured = true; + } + + // Conditionally connect to workers + if (!this.workersConnected) { + logger.info('No existing workers detected, entering worker launch phase...'); + // Use promise array to await all worker connection operations + const workersConnectedPromise = new Promise((resolve, reject) => { + this.workersConnectedPromise = { + resolve: resolve, + reject: reject + }; + }); + + // Since we cannot always guarantee that workers are being spawned by the orchestrator; the orchestrator should poll for workers to register. Interval is cleared once all workers have registered + const self = this; + this.pollingInterval = setInterval( () => { self.pollForWorkers(); } , self.workerPollingInterval); + + // wait for all workers to have initialized + logger.info(`Waiting for ${this.number} workers to be connected...`); + + await workersConnectedPromise; + this.workersConnected = true; + + logger.info(`${this.number} workers connected, progressing to worker assignment phase.`); + } else { + logger.info(`Existing ${Object.keys(this.workers).length} connected workers detected, progressing to worker assignment phase.`); + } + + // Make sure the clients have been assigned an index + if(!this.workersAssigned) { + logger.info('Workers currently unassigned, awaiting index assignment...'); + // Use promise array to await all worker init operations + const workersAssignedPromise = new Promise((resolve, reject) => { + this.workersAssignedPromise = { + resolve: resolve, + reject: reject + }; + }); + + // Inform connected worker of their workerIndex (!== mqttClientId) + // worker index is integer based, and maps to worker arguments passed by a user + const workerArray = Object.keys(this.workers); + for (let clientId of workerArray) { + const worker = this.workers[clientId]; + worker.workerId = workerArray.indexOf(clientId); + this.messenger.send([clientId], TYPES.ASSIGN, {workerId: worker.workerId}); + } + + // wait for all workers to have initialized + logger.info(`Waiting for ${this.number} workers to be assigned...`); + + await workersAssignedPromise; + this.workersAssigned = true; + + logger.info(`${this.number} workers assigned, progressing to worker initialization phase.`); + } else { + logger.info(`Existing ${Object.keys(this.workers).length} connected workers detected are assigned, progressing to worker initialization phase.`); + } + + // Make sure the clients are ready + if (!this.workersReady) { + // Use promise array to await all worker init operations + const workersReadyPromise = new Promise((resolve, reject) => { + this.workersReadyPromise = { + resolve: resolve, + reject: reject + }; + }); + + // wait for all workers to have initialized + logger.info(`Waiting for ${this.number} workers to be ready...`); + this.messenger.send(['all'], TYPES.INITIALIZE, {}); + + await workersReadyPromise; + this.workersReady = true; + + logger.info(`${this.number} workers ready, progressing to test preparation phase.`); + } else { + logger.info(`Existing ${Object.keys(this.workers).length} prepared workers detected, progressing to test preparation phase.`); + } + + // Work with a cloned message as we need to transform the passed message + const prepSpec = JSON.parse(JSON.stringify(test)); + + // send test preparation specification to each worker + let preparePromises = []; + for (let index in this.workers) { + let worker = this.workers[index]; + let p = new Promise((resolve, reject) => { + worker.phases[TYPES.PREPARED] = { + resolve: resolve, + reject: reject + }; + }); + preparePromises.push(p); + prepSpec.clientArgs = this.workerArguments[worker.workerId]; + prepSpec.totalClients = this.number; + + // Send to worker + this.messenger.send([index], TYPES.PREPARE, prepSpec); + } + + await Promise.all(preparePromises); + logger.info(`${this.number} workers prepared, progressing to test phase.`); + + // clear worker prepare promises so they can be reused + for (let worker in this.workers) { + this.workers[worker].phases[TYPES.PREPARED] = {}; + } + } + + /** + * Start the test + * message = { + * label: label name, + * numb: total number of simulated txs, + * txDuration: time duration of test, + * rateControl: rate controller to use, + * trim: trim options, + * args: user defined arguments, + * cb : path of the callback js file, + * config: path of the blockchain config file + * }; + * @param {JSON} test test specification + * @returns {Object[]} the test results array + * @async + */ + async startTest(test) { + this.updates.data = []; + this.updates.id++; + + await this._startTest(test); + const testOutput = this.formatResults(this.results); + return testOutput; + } + + /** + * Start a test + * @param {JSON} testSpecification test specification + * @param {Array} updates array to save txUpdate results + * @param {Array} results array to save the test results + * @async + */ + async _startTest(testSpecification) { + + let txPerWorker; + if (testSpecification.numb) { + // Run specified number of transactions + txPerWorker = Math.floor(testSpecification.numb / this.number); + + // trim should be based on number of workers, if specified with txNumber + if (testSpecification.trim) { + testSpecification.trim = Math.floor(testSpecification.trim / this.number); + } + + if (txPerWorker < 1) { + txPerWorker = 1; + } + testSpecification.numb = txPerWorker; + } else if (testSpecification.txDuration) { + // Run for time specified txDuration based on workers + // Do nothing, we run for the time specified within testSpecification.txDuration + } else { + throw new Error('Unconditioned transaction rate driving mode'); + } + + // Ensure results are reset + this.results = []; + + let testPromises = []; + for (let index in this.workers) { + let worker = this.workers[index]; + let p = new Promise((resolve, reject) => { + worker.phases[TYPES.TESTRESULT] = { + resolve: resolve, + reject: reject + }; + }); + testPromises.push(p); + worker.results = this.results; + worker.updates = this.updates.data; + testSpecification.clientArgs = this.workerArguments[worker.workerId]; + testSpecification.totalClients = this.number; + + // Publish to worker + this.messenger.send([index], TYPES.TEST, testSpecification); + } + + await Promise.all(testPromises); + // clear worker test promises so they can be reused + for (let worker in this.workers) { + this.workers[worker].phases[TYPES.TESTRESULT] = {}; + } + } + + /** + * Stop all test workers (child processes) + */ + stop() { + for (let workerObject of this.workerObjects) { + workerObject.kill(); + } + this.workerObjects = []; + } + + /** + * Get the update array + * @return {Array} update array + */ + getUpdates() { + return this.updates; + } + + /** + * Push test result from a worker into the global array + * @param {String} uuid uuid of the worker + * @param {Object} data test result + */ + pushResult(uuid, data) { + let p = this.workers[uuid]; + if (p && p.results && typeof p.results !== 'undefined') { + p.results.push(data); + } + } + + /** + * Push update value from a worker into the global array + * @param {String} uuid uuid of the worker + * @param {Object} data update value + */ + pushUpdate(uuid, data) { + let p = this.workers[uuid]; + if (p && p.updates && typeof p.updates !== 'undefined') { + p.updates.push(data); + } + } + + /** + * Launch a worker process to do the test + * @param {number} index the worker index + */ + launchWorker(index) { + logger.info(`Launching worker ${index} of ${this.number}`); + + // Spawn the worker. The index is assigned upon connection + let worker = this.workerFactory.spawnWorker(); + + // Collect the launched process so it can be killed later + this.workerObjects.push(worker); + + } + + /** + * Format the final test results for subsequent consumption from [ {result: [], start: val, end: val}, {result: [], start: val, end: val}, {result: [], start: val, end: val}] + * to {results: [val, val], start: val, end: val} + * @param {JSON[]} results an Array of JSON objects + * @return {JSON} an appropriately formatted result + */ + formatResults(results) { + + let resultArray = []; + let allStartedTime = null; + let allFinishedTime = null; + for (const workerResult of results){ + // Start building the array of all worker results + resultArray = resultArray.concat(workerResult.results); + + // Track all started/complete times + if (!allStartedTime || workerResult.start > allStartedTime) { + allStartedTime = workerResult.start; + } + + if (!allFinishedTime || workerResult.end < allFinishedTime) { + allFinishedTime = workerResult.end; + } + } + + return { + results: resultArray, + start: allStartedTime, + end: allFinishedTime + }; + } + +} + +module.exports = WorkerOrchestrator; diff --git a/packages/caliper-core/lib/master/test-runners/round-orchestrator.js b/packages/caliper-core/lib/master/test-runners/round-orchestrator.js index 2573c091c5..59a7c59a3a 100644 --- a/packages/caliper-core/lib/master/test-runners/round-orchestrator.js +++ b/packages/caliper-core/lib/master/test-runners/round-orchestrator.js @@ -15,8 +15,8 @@ 'use strict'; -const ClientOrchestrator = require('../client/client-orchestrator'); -const MonitorOrchestrator = require('../monitor/monitor-orchestrator'); +const WorkerOrchestrator = require('../orchestrators/worker-orchestrator'); +const MonitorOrchestrator = require('../monitors/monitor-orchestrator'); const Report = require('../report/report'); const TestObserver = require('../test-observers/test-observer'); const CaliperUtils = require('../../common/utils/caliper-utils'); @@ -32,13 +32,13 @@ class RoundOrchestrator { * @param {object} benchmarkConfig The benchmark configuration object. * @param {object} networkConfig The network configuration object. * @param {object} workerFactory The factory for worker processes. - * @param {object[]} workerArguments List of arbitrary arguments to pass for each worker processes. + * @param {object[]} workerArguments List of adaptor specific arguments to pass for each worker processes. */ constructor(benchmarkConfig, networkConfig, workerFactory, workerArguments) { this.networkConfig = networkConfig; this.benchmarkConfig = benchmarkConfig; - this.clientOrchestrator = new ClientOrchestrator(this.benchmarkConfig, workerFactory, workerArguments); + this.workerOrchestrator = new WorkerOrchestrator(this.benchmarkConfig, workerFactory, workerArguments); this.monitorOrchestrator = new MonitorOrchestrator(this.benchmarkConfig); this.report = new Report(this.monitorOrchestrator, this.benchmarkConfig, this.networkConfig); this.testObserver = new TestObserver(this.benchmarkConfig); @@ -139,10 +139,9 @@ class RoundOrchestrator { // validate each round before starting any rounds.forEach((round, index) => RoundOrchestrator._validateRoundConfig(round, index)); - // create messages for clients from each round config + // create messages for workers from each round config let roundConfigs = rounds.map((round, index) => { let config = { - type: 'test', label: round.label, rateControl: round.rateControl, trim: round.trim || 0, @@ -182,8 +181,12 @@ class RoundOrchestrator { this.testObserver.setRound(index); try { - this.testObserver.startWatch(this.clientOrchestrator); - const {results, start, end} = await this.clientOrchestrator.startTest(roundConfig); + // Run test preparation (cb.init() function if specified) + await this.workerOrchestrator.prepareTestRound(roundConfig); + + // Run main test round + this.testObserver.startWatch(this.workerOrchestrator); + const {results, start, end} = await this.workerOrchestrator.startTest(roundConfig); await this.testObserver.stopWatch(); // Build the report @@ -229,9 +232,9 @@ class RoundOrchestrator { } try { - this.clientOrchestrator.stop(); + this.workerOrchestrator.stop(); } catch (err) { - logger.error(`Error while stopping clients: ${err.stack || err}`); + logger.error(`Error while stopping workers: ${err.stack || err}`); } let benchEndTime = Date.now(); diff --git a/packages/caliper-core/lib/worker/client/caliper-local-client.js b/packages/caliper-core/lib/worker/client/caliper-local-client.js index 50b8a501cc..d6f319cee4 100644 --- a/packages/caliper-core/lib/worker/client/caliper-local-client.js +++ b/packages/caliper-core/lib/worker/client/caliper-local-client.js @@ -20,7 +20,7 @@ const bc = require('../../common/core/blockchain.js'); const RateControl = require('../rate-control/rateControl.js'); const PrometheusClient = require('../../common/prometheus/prometheus-push-client'); -const Logger = CaliperUtils.getLogger('local-client.js'); +const Logger = CaliperUtils.getLogger('caliper-local-client.js'); /** * Class for Client Interaction */ @@ -29,9 +29,16 @@ class CaliperLocalClient { /** * Create the test client * @param {Object} bcClient blockchain client + * @param {number} clientIndex the client index + * @param {Messenger} messenger a configured Messenger instance used to communicate with the orchestrator */ - constructor(bcClient) { + constructor(bcClient, clientIndex, messenger) { this.blockchain = new bc(bcClient); + this.clientIndex = clientIndex; + this.messenger = messenger; + this.context = undefined; + + // Internal stats this.results = []; this.txNum = 0; this.txLastNum = 0; @@ -46,6 +53,13 @@ class CaliperLocalClient { this.totalTxDelay = 0; } + /** + * Initialization update + */ + initUpdate() { + Logger.info('Initialization ongoing...'); + } + /** * Calculate real-time transaction statistics and send the txUpdated message */ @@ -88,7 +102,8 @@ class CaliperLocalClient { this.prometheusClient.push('caliper_txn_pending', (this.txNum - (this.totalTxnSuccess + this.totalTxnFailure))); } else { // client-orchestrator based update - process.send({type: 'txUpdated', data: {type: 'txUpdate', submitted: newNum, committed: newStats}}); + // send(to, type, data) + this.messenger.send(['orchestrator'],'txUpdate', {submitted: newNum, committed: newStats}); } if (this.resultStats.length === 0) { @@ -119,11 +134,13 @@ class CaliperLocalClient { } /** - * Method to reset values between `init` and `test` phase + * Method to reset values */ txReset(){ // Reset txn counters + this.results = []; + this.resultStats = []; this.txNum = 0; this.txLastNum = 0; @@ -136,7 +153,8 @@ class CaliperLocalClient { this.prometheusClient.push('caliper_txn_pending', 0); } else { // Reset Local - process.send({type: 'txReset', data: {type: 'txReset'}}); + // send(to, type, data) + this.messenger.send(['orchestrator'],'txReset', {type: 'txReset'}); } } @@ -154,16 +172,12 @@ class CaliperLocalClient { } } - /** * Call before starting a new test * @param {JSON} msg start test message */ beforeTest(msg) { - this.results = []; - this.resultStats = []; - this.txNum = 0; - this.txLastNum = 0; + this.txReset(); // TODO: once prometheus is enabled, trim occurs as part of the retrieval query // conditionally trim beginning and end results for this test run @@ -189,7 +203,7 @@ class CaliperLocalClient { this.prometheusClient.setGateway(msg.pushUrl); } // - set target for this round test/round/client - this.prometheusClient.configureTarget(msg.label, msg.testRound, msg.clientIdx); + this.prometheusClient.configureTarget(msg.label, msg.testRound, this.clientIndex); } } @@ -223,7 +237,7 @@ class CaliperLocalClient { * @async */ async runFixedNumber(cb, number, rateController) { - Logger.info('Info: client ' + process.pid + ' start test runFixedNumber()' + (cb.info ? (':' + cb.info) : '')); + Logger.info('Info: client ' + this.clientIndex + ' start test runFixedNumber()' + (cb.info ? (':' + cb.info) : '')); this.startTime = Date.now(); let promises = []; @@ -252,7 +266,7 @@ class CaliperLocalClient { * @async */ async runDuration(cb, duration, rateController) { - Logger.info('Info: client ' + process.pid + ' start test runDuration()' + (cb.info ? (':' + cb.info) : '')); + Logger.info('Info: client ' + this.clientIndex + ' start test runDuration()' + (cb.info ? (':' + cb.info) : '')); this.startTime = Date.now(); let promises = []; @@ -286,11 +300,63 @@ class CaliperLocalClient { } } + /** + * Perform test init within Benchmark + * @param {JSON} test the test details + * message = { + * label : label name, + * numb: total number of simulated txs, + * rateControl: rate controller to use + * trim: trim options + * args: user defined arguments, + * cb : path of the callback js file, + * config: path of the blockchain config file + * totalClients = total number of clients, + * pushUrl = the url for the push gateway + * }; + * @async + */ + async prepareTest(test) { + Logger.debug('prepareTest() with:', test); + let cb = require(CaliperUtils.resolvePath(test.cb)); + + this.txUpdateTime = Config.get(Config.keys.TxUpdateTime, 1000); + const self = this; + let initUpdateInter = setInterval( () => { self.initUpdate(); } , self.txUpdateTime); + + try { + // Retrieve context for this round + this.context = await this.blockchain.getContext(test.label, test.clientArgs); + if (typeof this.context === 'undefined') { + this.context = { + engine : { + submitCallback : (count) => { self.submitCallback(count); } + } + }; + } else { + this.context.engine = { + submitCallback : (count) => { self.submitCallback(count); } + }; + } + + // Run init phase of callback + Logger.info(`Info: client ${this.clientIndex} prepare test ${(cb.info ? (':' + cb.info) : 'phase starting...')}`); + await cb.init(this.blockchain, this.context, test.args); + Logger.info(`Info: client ${this.clientIndex} prepare test ${(cb.info ? (':' + cb.info) : 'phase complete')}`); + await CaliperUtils.sleep(this.txUpdateTime); + } catch (err) { + Logger.info(`Client[${this.clientIndex}] encountered an error during prepare test phase: ${(err.stack ? err.stack : err)}`); + throw err; + } finally { + clearInterval(initUpdateInter); + Logger.info(`Info: client ${this.clientIndex} prepare test ${(cb.info ? (':' + cb.info) : 'phase complete')}`); + } + } + /** * Perform the test * @param {JSON} test start test message * message = { - * type: 'test', * label : label name, * numb: total number of simulated txs, * rateControl: rate controller to use @@ -298,8 +364,6 @@ class CaliperLocalClient { * args: user defined arguments, * cb : path of the callback js file, * config: path of the blockchain config file - * clientIdx = this client index, - * clientArgs = clientArgs[clientIdx], * totalClients = total number of clients, * pushUrl = the url for the push gateway * }; @@ -317,30 +381,11 @@ class CaliperLocalClient { let txUpdateInter = setInterval( () => { self.txUpdate(); } , self.txUpdateTime); try { - let context = await this.blockchain.getContext(test.label, test.clientArgs, test.clientIdx, test.txFile); - if (typeof context === 'undefined') { - context = { - engine : { - submitCallback : (count) => { self.submitCallback(count); } - } - }; - } else { - context.engine = { - submitCallback : (count) => { self.submitCallback(count); } - }; - } // Configure - let rateController = new RateControl(test.rateControl, test.clientIdx, test.testRound); + let rateController = new RateControl(test.rateControl, this.clientIndex, test.testRound); await rateController.init(test); - // Run init phase of callback - Logger.info(`Info: client ${process.pid} init test ${(cb.info ? (':' + cb.info) : 'phase')}`); - await cb.init(this.blockchain, context, test.args); - // Reset and sleep the configured update duration to flush any results that are resulting from the 'init' stage - this.txReset(); - await CaliperUtils.sleep(this.txUpdateTime); - // Run the test loop if (test.txDuration) { const duration = test.txDuration; // duration in seconds @@ -352,7 +397,7 @@ class CaliperLocalClient { // Clean up await rateController.end(); - await this.blockchain.releaseContext(context); + await this.blockchain.releaseContext(this.context); this.clearUpdateInter(txUpdateInter); await cb.end(); @@ -372,8 +417,10 @@ class CaliperLocalClient { } } catch (err) { this.clearUpdateInter(); - Logger.info(`Client[${process.pid}] encountered an error: ${(err.stack ? err.stack : err)}`); + Logger.info(`Client[${this.clientIndex}] encountered an error: ${(err.stack ? err.stack : err)}`); throw err; + } finally { + this.txReset(); } } } diff --git a/packages/caliper-core/lib/worker/client/message-handler.js b/packages/caliper-core/lib/worker/client/message-handler.js index 52b7c20b09..1e6871021f 100644 --- a/packages/caliper-core/lib/worker/client/message-handler.js +++ b/packages/caliper-core/lib/worker/client/message-handler.js @@ -27,18 +27,31 @@ class MessageHandler { /** * Initializes the BaseMessageHandler instance. * @param {object} handlers Object of message handler functions. + * @param {Messenger} messenger the Messenger to use for communication with the orchestrator */ - constructor(handlers) { + constructor(handlers, messenger) { if (!handlers.init) { let msg = 'Handler for "init" is not specified'; logger.error(msg); throw new Error(msg); } + if (!messenger) { + let msg = '"messenger" is not specified'; + logger.error(msg); + throw new Error(msg); + } + + this.messenger = messenger; + this.beforeInitHandler = handlers.beforeInit || MessageHandler.beforeInit; this.afterInitHandler = handlers.afterInit || MessageHandler.afterInit; this.initHandler = handlers.init; + this.beforePrepareHandler = handlers.beforePrepare || MessageHandler.beforePrepare; + this.afterPrepareHandler = handlers.afterPrepare || MessageHandler.afterPrepare; + this.prepareHandler = handlers.prepare || MessageHandler.prepare; + this.beforeTestHandler = handlers.beforeTest || MessageHandler.beforeTest; this.afterTestHandler = handlers.afterTest || MessageHandler.afterTest; this.testHandler = handlers.test || MessageHandler.test; @@ -47,9 +60,11 @@ class MessageHandler { this.workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); this.networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); this.networkConfigPath = CaliperUtils.resolvePath(this.networkConfigPath); - this.adapter = undefined; - this.workerClient = undefined; - this.testResult = undefined; + this.adapter = undefined; // The adaptor to use when creating a CaliperLocalClient + this.workerClient = undefined; // An instantiated CaliperLocalClient + this.workerId = undefined; // The Caliper client index (zero based) + this.testResult = undefined; // The result of running a test + } /** @@ -66,11 +81,48 @@ class MessageHandler { * Called after processing the "init" message. * @param {object} context The context/state of the message handler. * @param {object} message The message object. + * @param {object} error Possible error object + */ + static async afterInit(context, message, error) { + const type = 'ready'; + if (error) { + // send(to, type, data) + context.messenger.send(['orchestrator'], type, {error: error.toString()}); + logger.error(`Handled unsuccessful "init" message for worker ${context.workerId} with error ${error.toString()}`); + } else { + context.workerClient = new CaliperLocalClient(context.adapter, context.workerId, context.messenger); + context.messenger.send(['orchestrator'], type, {}); + logger.info(`Handled successful "init" message for worker ${context.workerId}`); + } + } + + /** + * Called before processing the "prepare" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. */ - static async afterInit(context, message) { - context.workerClient = new CaliperLocalClient(context.adapter); - process.send({type: 'ready', data: {pid: process.pid, complete: true}}); - logger.info('Handled "init" message'); + static async beforePrepare(context, message) { + logger.info('Handling "prepare" message'); + logger.debug('Message content', message); + } + + /** + * Called after processing the "prepare" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + * @param {object} error An error conditioning message + */ + static async afterPrepare(context, message, error) { + const type = 'prepared'; + if (error) { + // send(to, type, data) + context.messenger.send(['orchestrator'], type, {error: error.toString()}); + logger.error(`Handled unsuccessful "prepare" message for worker ${context.workerId} and test round ${message.testRound} with error ${error.toString()}`); + } else { + context.messenger.send(['orchestrator'], type, {}); + logger.info(`Handled successful "prepare" message for worker ${context.workerId} and test round ${message.testRound}`); + } + } /** @@ -90,8 +142,19 @@ class MessageHandler { */ static async afterTest(context, message) { await CaliperUtils.sleep(200); - process.send({type: 'testResult', data: context.testResult}); - logger.info('Handled "test" message'); + const type = 'testResult'; + context.messenger.send(['orchestrator'], type, context.testResult); + logger.info(`Handled "test" message for worker ${context.workerId}`); + } + + /** + * Called for processing the "prepare" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + * @return {Promise} The result object. + */ + static async prepare(context, message) { + await context.workerClient.prepareTest(message); } /** @@ -113,16 +176,42 @@ class MessageHandler { if (!message.hasOwnProperty('type')) { let msg = 'Message type is missing'; logger.error(msg, message); - process.send({type: 'error', data: msg}); + context.messenger.send(['orchestrator'], 'error', {error: msg}); return; } try { switch (message.type) { - case 'init': { - await context.beforeInitHandler(context, message); - context.adapter = await context.initHandler(context, message); - await context.afterInitHandler(context, message); + case 'register': + if (!context.registered) { + // Register availability with orchestrator + context.messenger.send(['orchestrator'], 'connected', {}); + context.registered = true; + } + break; + case 'assignId': + context.workerId = message.workerId; + context.messenger.send(['orchestrator'], 'assigned', {}); + break; + case 'initialize': { + try { + await context.beforeInitHandler(context, message); + context.adapter = await context.initHandler(context, message); + await context.afterInitHandler(context, message, undefined); + } catch (error) { + await context.afterInitHandler(context, message, error); + } + + break; + } + case 'prepare': { + try { + await context.beforePrepareHandler(context, message); + await context.prepareHandler(context, message); + await context.afterPrepareHandler(context, message, undefined); + } catch (error) { + await context.afterPrepareHandler(context, message, error); + } break; } @@ -136,13 +225,13 @@ class MessageHandler { default: { let msg = `Unknown message type "${message.type}"`; logger.error(msg, message); - process.send({type: 'error', data: msg}); + context.messenger.send(['orchestrator'], 'error', {error: msg}); } } } catch (err) { logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); + context.messenger.send(['orchestrator'], 'error', {error: err.toString()}); } } } diff --git a/packages/caliper-core/package.json b/packages/caliper-core/package.json index 9f2be6288f..67cedf2a1d 100644 --- a/packages/caliper-core/package.json +++ b/packages/caliper-core/package.json @@ -28,9 +28,9 @@ "fs-extra": "^8.0.1", "js-yaml": "^3.12.0", "mustache": "^2.3.0", + "mqtt": "3.0.0", "nconf": "^0.10.0", "nconf-yaml": "^1.0.2", - "node-zookeeper-client": "^0.2.2", "pidusage": "^1.1.6", "ps-node": "^0.1.6", "request": "^2.81.0", diff --git a/packages/caliper-core/test/master/monitor/monitor-utilities.js b/packages/caliper-core/test/master/monitor/monitor-utilities.js index 7fdb28fcd9..d0b7a24d14 100644 --- a/packages/caliper-core/test/master/monitor/monitor-utilities.js +++ b/packages/caliper-core/test/master/monitor/monitor-utilities.js @@ -14,7 +14,7 @@ 'use strict'; -const Utilities = require('../../../lib/master/monitor/monitor-utilities'); +const Utilities = require('../../../lib/master/monitors/monitor-utilities'); const mocha = require('mocha'); const fail = mocha.fail; diff --git a/packages/caliper-core/test/master/monitor/monitor-prometheus.js b/packages/caliper-core/test/master/monitors/monitor-prometheus.js similarity index 99% rename from packages/caliper-core/test/master/monitor/monitor-prometheus.js rename to packages/caliper-core/test/master/monitors/monitor-prometheus.js index 0c23901179..2327768d8a 100644 --- a/packages/caliper-core/test/master/monitor/monitor-prometheus.js +++ b/packages/caliper-core/test/master/monitors/monitor-prometheus.js @@ -15,7 +15,7 @@ 'use strict'; const rewire = require('rewire'); -const PrometheusMonitorRewire = rewire('../../../lib/master/monitor/monitor-prometheus'); +const PrometheusMonitorRewire = rewire('../../../lib/master/monitors/monitor-prometheus'); const chai = require('chai'); const should = chai.should(); diff --git a/packages/caliper-core/test/master/client/client-orchestrator.js b/packages/caliper-core/test/master/orchestrators/worker-orchestrator.js similarity index 80% rename from packages/caliper-core/test/master/client/client-orchestrator.js rename to packages/caliper-core/test/master/orchestrators/worker-orchestrator.js index 6b13d1472c..4d0ee365c8 100644 --- a/packages/caliper-core/test/master/client/client-orchestrator.js +++ b/packages/caliper-core/test/master/orchestrators/worker-orchestrator.js @@ -15,40 +15,40 @@ 'use strict'; const rewire = require('rewire'); -const ClientOrchestratorRewire = rewire('../../../lib/master/client/client-orchestrator'); +const WorkerOrchestratorRewire = rewire('../../../lib/master/orchestrators/worker-orchestrator'); const chai = require('chai'); chai.should(); const sinon = require('sinon'); -describe('client orchestrator implementation', () => { +describe('worker orchestrator implementation', () => { const benchmarkConfig = { test: { - clients: { + workers: { number: 7 } } }; - const workerArguments = [1,2,3]; + const workerFactory = {}; describe('#constructor', () => { - it('should read the number of test clients if present in the config file', () => { - const myOrchestrator = new ClientOrchestratorRewire(benchmarkConfig, workerFactory, workerArguments); + it('should read the number of test workers if present in the config file', () => { + const myOrchestrator = new WorkerOrchestratorRewire(benchmarkConfig, workerFactory); myOrchestrator.number.should.equal(7); }); - it('should default to one client in the test if not specified in the config file ', () => { - const myOrchestrator = new ClientOrchestratorRewire({ test: { clients: {notNumber: 2}}}, workerFactory, workerArguments); + it('should default to one worker in the test if not specified in the config file ', () => { + const myOrchestrator = new WorkerOrchestratorRewire({ test: { workers: {notNumber: 2}}}, workerFactory); myOrchestrator.number.should.equal(1); }); }); describe('#startTest', () => { - const myOrchestrator = new ClientOrchestratorRewire(benchmarkConfig, workerFactory, workerArguments); + const myOrchestrator = new WorkerOrchestratorRewire(benchmarkConfig, workerFactory); let _startTestStub; let formatResultsStub; @@ -73,7 +73,7 @@ describe('client orchestrator implementation', () => { await myOrchestrator.startTest(testMsg); sinon.assert.calledOnce(_startTestStub); - sinon.assert.calledWith(_startTestStub, 7, testMsg, [], []); + sinon.assert.calledWith(_startTestStub, testMsg); }); it('should call formatResults', async() => { @@ -96,7 +96,7 @@ describe('client orchestrator implementation', () => { describe('#getUpdates', () => { - const myOrchestrator = new ClientOrchestratorRewire(benchmarkConfig, workerFactory, workerArguments); + const myOrchestrator = new WorkerOrchestratorRewire(benchmarkConfig, workerFactory); it('should return the updates', () => { const checkVal = 'this is my update'; @@ -109,9 +109,9 @@ describe('client orchestrator implementation', () => { }); describe('#formatResults', () => { - const myOrchestrator = new ClientOrchestratorRewire(benchmarkConfig, workerFactory, workerArguments); + const myOrchestrator = new WorkerOrchestratorRewire(benchmarkConfig, workerFactory); - it('should group all client results into an array under a results label', () => { + it('should group all worker results into an array under a results label', () => { const result0 = {results: [1] , start: new Date(2018, 11, 24, 10, 33), end: new Date(2018, 11, 24, 11, 33)}; const result1 = {results: [2] , start: new Date(2018, 11, 24, 10, 34), end: new Date(2018, 11, 24, 11, 23)}; const result2 = {results: [3] , start: new Date(2018, 11, 24, 10, 35), end: new Date(2018, 11, 24, 11, 13)}; @@ -121,7 +121,7 @@ describe('client orchestrator implementation', () => { output.results.should.deep.equal([1,2,3]); }); - it('should determine and persist the time when all clients have started', () => { + it('should determine and persist the time when all workers have started', () => { const compareStart = new Date(2018, 11, 24, 10, 35); const result0 = {results: [1] , start: new Date(2018, 11, 24, 10, 33), end: new Date(2018, 11, 24, 11, 33)}; const result1 = {results: [2] , start: new Date(2018, 11, 24, 10, 34), end: new Date(2018, 11, 24, 11, 13)}; @@ -132,7 +132,7 @@ describe('client orchestrator implementation', () => { output.start.should.equal(compareStart); }); - it('should determine and persist the last time when all clients were running', () => { + it('should determine and persist the last time when all workers were running', () => { const compareEnd = new Date(2018, 11, 24, 11, 13); const result0 = {results: [1] , start: new Date(2018, 11, 24, 10, 33), end: new Date(2018, 11, 24, 11, 33)}; const result1 = {results: [2] , start: new Date(2018, 11, 24, 10, 34), end: compareEnd}; diff --git a/packages/caliper-ethereum/lib/ethereum.js b/packages/caliper-ethereum/lib/ethereum.js index 0a9c6a43ea..dca3b91ff2 100644 --- a/packages/caliper-ethereum/lib/ethereum.js +++ b/packages/caliper-ethereum/lib/ethereum.js @@ -36,14 +36,16 @@ class Ethereum extends BlockchainInterface { * Create a new instance of the {Ethereum} class. * @param {string} config_path The path of the network configuration file. * @param {string} workspace_root The absolute path to the root location for the application configuration files. + * @param {number} clientIndex The client index */ - constructor(config_path, workspace_root) { + constructor(config_path, workspace_root, clientIndex) { super(config_path); this.bcType = 'ethereum'; this.workspaceRoot = workspace_root; this.ethereumConfig = require(config_path).ethereum; this.web3 = new Web3(this.ethereumConfig.url); this.web3.transactionConfirmationBlocks = this.ethereumConfig.transactionConfirmationBlocks; + this.clientIndex = clientIndex; } /** @@ -87,17 +89,17 @@ class Ethereum extends BlockchainInterface { * Return the Ethereum context associated with the given callback module name. * @param {string} name The name of the callback module as defined in the configuration files. * @param {object} args Unused. - * @param {integer} clientIdx the client index * @return {object} The assembled Ethereum context. * @async */ - async getContext(name, args, clientIdx) { + async getContext(name, args) { let context = { - clientIdx: clientIdx, + clientIndex: this.clientIndex, contracts: {}, nonces: {}, web3: this.web3 }; + for (const key of Object.keys(args.contracts)) { context.contracts[key] = { contract: new this.web3.eth.Contract(args.contracts[key].abi, args.contracts[key].address), @@ -110,7 +112,7 @@ class Ethereum extends BlockchainInterface { } if (this.ethereumConfig.fromAddressSeed) { let hdwallet = EthereumHDKey.fromMasterSeed(this.ethereumConfig.fromAddressSeed); - let wallet = hdwallet.derivePath('m/44\'/60\'/' + clientIdx + '\'/0/0').getWallet(); + let wallet = hdwallet.derivePath('m/44\'/60\'/' + this.clientIndex + '\'/0/0').getWallet(); context.fromAddress = wallet.getChecksumAddressString(); context.nonces[context.fromAddress] = await this.web3.eth.getTransactionCount(context.fromAddress); this.web3.eth.accounts.wallet.add(wallet.getPrivateKeyString()); @@ -273,13 +275,14 @@ class Ethereum extends BlockchainInterface { } /** - * It passes deployed contracts addresses to all clients + * It passes deployed contracts addresses to all clients (only known after deploy contract) * @param {Number} number of clients to prepare * @returns {Array} client args + * @async */ - async prepareClients(number) { + async prepareWorkerArguments(number) { let result = []; - for (let i = 0 ; i< number ; i++) { + for (let i = 0 ; i<= number ; i++) { result[i] = {contracts: this.ethereumConfig.contracts}; } return result; diff --git a/packages/caliper-ethereum/lib/ethereumClientFactory.js b/packages/caliper-ethereum/lib/ethereumClientFactory.js index ac7a5a8487..09f4d652b7 100644 --- a/packages/caliper-ethereum/lib/ethereumClientFactory.js +++ b/packages/caliper-ethereum/lib/ethereumClientFactory.js @@ -28,12 +28,6 @@ class EthereumClientFactory { */ spawnWorker() { const child = childProcess.fork(path.join(__dirname, './ethereumClientWorker.js'), process.argv.slice(2), { env: process.env}); - - const msg = { - type: 'init' - }; - child.send(msg); - return child; } } diff --git a/packages/caliper-ethereum/lib/ethereumClientWorker.js b/packages/caliper-ethereum/lib/ethereumClientWorker.js index 07e882e025..145b70c492 100644 --- a/packages/caliper-ethereum/lib/ethereumClientWorker.js +++ b/packages/caliper-ethereum/lib/ethereumClientWorker.js @@ -14,7 +14,7 @@ 'use strict'; -const { MessageHandler } = require('@hyperledger/caliper-core'); +const { ConfigUtil, Messenger, MessageHandler } = require('@hyperledger/caliper-core'); const EthereumClient = require('./ethereum'); /** @@ -25,16 +25,27 @@ const EthereumClient = require('./ethereum'); * @async */ async function initHandler(context, message) { - return new EthereumClient(context.networkConfigPath, context.workspacePath); + return new EthereumClient(context.networkConfigPath, context.workspacePath, context.workerId); } -const handlerContext = new MessageHandler({ - init: initHandler -}); - /** - * Message handler + * Main process */ -process.on('message', async (message) => { - await MessageHandler.handle(handlerContext, message); -}); +async function main (){ + + // Create the message client using the specified type + const type = `${ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Method)}-worker`; + const messenger = new Messenger({type, sut: 'ethereum'}); + await messenger.initialize(); + + // Create a handler context for this worker + const handlerContext = new MessageHandler({ + init: initHandler + }, messenger); + + // Pass to the messenger to configure + messenger.configure(handlerContext); + +} + +main(); diff --git a/packages/caliper-fabric/lib/fabric.js b/packages/caliper-fabric/lib/fabric.js index ac77a52361..6451412437 100644 --- a/packages/caliper-fabric/lib/fabric.js +++ b/packages/caliper-fabric/lib/fabric.js @@ -101,7 +101,7 @@ const fs = require('fs'); * profiles for each defined registrar. Maps the custom organization names to the Client instances * (since only one registrar per org is supported). * @property {EventSource[]} eventSources Collection of potential event sources to listen to for transaction confirmation events. - * @property {number} clientIndex The index of the client process using the adapter that is set when calling @link{getContext}. + * @property {number} clientIndex The index of the client process using the adapter that is set in the constructor * @property {number} txIndex A counter for keeping track of the index of the currently submitted transaction. * @property {FabricNetwork} networkUtil Utility object containing easy-to-query information about the topology * and settings of the network. @@ -132,8 +132,9 @@ class Fabric extends BlockchainInterface { * Initializes the Fabric adapter. * @param {string|object} networkConfig The relative or absolute file path, or the object itself of the Common Connection Profile settings. * @param {string} workspace_root The absolute path to the root location for the application configuration files. + * @param {number} clientIndex the client index */ - constructor(networkConfig, workspace_root) { + constructor(networkConfig, workspace_root, clientIndex) { super(networkConfig); this.bcType = 'fabric'; this.workspaceRoot = workspace_root; @@ -154,7 +155,7 @@ class Fabric extends BlockchainInterface { this.adminProfiles = new Map(); this.registrarProfiles = new Map(); this.eventSources = []; - this.clientIndex = 0; + this.clientIndex = clientIndex; this.txIndex = -1; this.randomTargetPeerCache = new Map(); this.channelEventSourcesCache = new Map(); @@ -2195,13 +2196,11 @@ class Fabric extends BlockchainInterface { * * @param {string} name Unused. * @param {Array} args Unused. - * @param {number} clientIdx The client index. * @return {Promise<{networkInfo : FabricNetwork, eventSources: EventSource[]}>} Returns the network utility object. * @async */ - async getContext(name, args, clientIdx) { - // Set client index and reset counter for new test round - this.clientIndex = clientIdx; + async getContext(name, args) { + // Reset counter for new test round this.txIndex = -1; // Branch on use of a Gateway or standard Caliper client @@ -2212,8 +2211,7 @@ class Fabric extends BlockchainInterface { // We are done - return the networkUtil object return { - networkInfo: this.networkUtil, - clientIdx + networkInfo: this.networkUtil }; } else { // Configure the adaptor @@ -2291,8 +2289,7 @@ class Fabric extends BlockchainInterface { } return { - networkInfo: this.networkUtil, - clientIdx + networkInfo: this.networkUtil }; } } diff --git a/packages/caliper-fabric/lib/fabricClientFactory.js b/packages/caliper-fabric/lib/fabricClientFactory.js index eb00aa4089..893ef00c18 100644 --- a/packages/caliper-fabric/lib/fabricClientFactory.js +++ b/packages/caliper-fabric/lib/fabricClientFactory.js @@ -28,12 +28,6 @@ class FabricClientFactory { */ spawnWorker() { const child = childProcess.fork(path.join(__dirname, './fabricClientWorker.js'), process.argv.slice(2), { env: process.env}); - - const msg = { - type: 'init' - }; - child.send(msg); - return child; } } diff --git a/packages/caliper-fabric/lib/fabricClientWorker.js b/packages/caliper-fabric/lib/fabricClientWorker.js index 26388fa2b3..5d3d1d313b 100644 --- a/packages/caliper-fabric/lib/fabricClientWorker.js +++ b/packages/caliper-fabric/lib/fabricClientWorker.js @@ -14,7 +14,7 @@ 'use strict'; -const { MessageHandler } = require('@hyperledger/caliper-core'); +const { ConfigUtil, Messenger, MessageHandler } = require('@hyperledger/caliper-core'); const FabricClient = require('./fabric'); /** @@ -25,23 +25,32 @@ const FabricClient = require('./fabric'); * @async */ async function initHandler(context, message) { - const blockchain = new FabricClient(context.networkConfigPath, context.workspacePath); + const worker = new FabricClient(context.networkConfigPath, context.workspacePath, context.workerId); + await worker._initializeRegistrars(false); + await worker._initializeAdmins(false); + await worker._initializeUsers(false); - // reload the profiles silently - await blockchain._initializeRegistrars(false); - await blockchain._initializeAdmins(false); - await blockchain._initializeUsers(false); - - return blockchain; + return worker; } -const handlerContext = new MessageHandler({ - init: initHandler -}); - /** - * Message handler + * Main process */ -process.on('message', async (message) => { - await MessageHandler.handle(handlerContext, message); -}); +async function main (){ + + // Create the message client using the specified type + const type = `${ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Method)}-worker`; + const messenger = new Messenger({type, sut: 'fabric'}); + await messenger.initialize(); + + // Create a handler context for this worker + const handlerContext = new MessageHandler({ + init: initHandler + }, messenger); + + // Pass to the messenger to configure + messenger.configure(handlerContext); + +} + +main(); diff --git a/packages/caliper-fisco-bcos/lib/fiscoBcos.js b/packages/caliper-fisco-bcos/lib/fiscoBcos.js index 7885c73200..3299297f83 100644 --- a/packages/caliper-fisco-bcos/lib/fiscoBcos.js +++ b/packages/caliper-fisco-bcos/lib/fiscoBcos.js @@ -32,8 +32,9 @@ class FiscoBcos extends BlockchainInterface { * Create a new instance of the {FISCO BCOS} class. * @param {string} config_path The absolute path of the FISCO BCOS network configuration file. * @param {string} workspace_root The absolute path to the root location for the application configuration files. + * @param {number} clientIdx The client index */ - constructor(config_path, workspace_root) { + constructor(config_path, workspace_root, clientIdx) { super(config_path); this.bcType = 'fisco-bcos'; this.workspaceRoot = workspace_root; @@ -44,6 +45,7 @@ class FiscoBcos extends BlockchainInterface { this.fiscoBcosSettings.network.authentication[k] = CaliperUtils.resolvePath(this.fiscoBcosSettings.network.authentication[k], workspace_root); } } + this.clientIdx = clientIdx; } /** @@ -77,10 +79,9 @@ class FiscoBcos extends BlockchainInterface { * } * @param {String} name name of the context * @param {Object} args adapter specific arguments - * @param {Integer} clientIdx the client index * @return {Promise} The promise for the result of the execution. */ - async getContext(name, args, clientIdx) { + async getContext(name, args) { return Promise.resolve(); } diff --git a/packages/caliper-fisco-bcos/lib/fiscoBcosClientFactory.js b/packages/caliper-fisco-bcos/lib/fiscoBcosClientFactory.js index f50d72a2da..fe665eafb8 100644 --- a/packages/caliper-fisco-bcos/lib/fiscoBcosClientFactory.js +++ b/packages/caliper-fisco-bcos/lib/fiscoBcosClientFactory.js @@ -28,12 +28,6 @@ class FiscoBcosClientFactory { */ spawnWorker() { const child = childProcess.fork(path.join(__dirname, './fiscoBcosClientWorker.js'), process.argv.slice(2), { env: process.env }); - - const msg = { - type: 'init' - }; - child.send(msg); - return child; } } diff --git a/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js b/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js index d84531e0b6..4292f78d23 100644 --- a/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js +++ b/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js @@ -14,7 +14,7 @@ 'use strict'; -const { MessageHandler } = require('@hyperledger/caliper-core'); +const { ConfigUtil, Messenger, MessageHandler } = require('@hyperledger/caliper-core'); const FiscoBcosClient = require('./fiscoBcos'); /** @@ -25,16 +25,27 @@ const FiscoBcosClient = require('./fiscoBcos'); * @async */ async function initHandler(context, message) { - return new FiscoBcosClient(context.networkConfigPath, context.workspacePath); + return new FiscoBcosClient(context.networkConfigPath, context.workspacePath, context.workerId); } -const handlerContext = new MessageHandler({ - init: initHandler -}); - /** - * Message handler + * Main process */ -process.on('message', async (message) => { - await MessageHandler.handle(handlerContext, message); -}); +async function main (){ + + // Create the message client using the specified type + const type = `${ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Method)}-worker`; + const messenger = new Messenger({type, sut: 'fiscoBCOS'}); + await messenger.initialize(); + + // Create a handler context for this worker + const handlerContext = new MessageHandler({ + init: initHandler + }, messenger); + + // Pass to the messenger to configure + messenger.configure(handlerContext); + +} + +main(); diff --git a/packages/caliper-generator/generator-caliper/generators/benchmark/index.js b/packages/caliper-generator/generator-caliper/generators/benchmark/index.js index 229a4d6f7e..019dc8e81c 100644 --- a/packages/caliper-generator/generator-caliper/generators/benchmark/index.js +++ b/packages/caliper-generator/generator-caliper/generators/benchmark/index.js @@ -2,9 +2,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -78,10 +78,10 @@ module.exports = class extends Generator { }], clientQuestions: [{ type: 'number', - name: 'clients', - message: 'How many clients would you like to have?', + name: 'workers', + message: 'How many workers would you like to have?', default: defaultClientValue, - when: () => !this.options.clients + when: () => !this.options.workers }], roundQuestions: [{ type: 'input', @@ -128,11 +128,11 @@ module.exports = class extends Generator { inititalAnswers = await this.prompt(configQuestions.initialQuestions); clientAnswer = await this.prompt(configQuestions.clientQuestions); - if (isNaN(parseFloat(this.options.clients)) && isNaN(parseFloat(clientAnswer.clients))) { + if (isNaN(parseFloat(this.options.workers)) && isNaN(parseFloat(clientAnswer.workers))) { this.log(`Error: Not a valid input. Using default client value of ${defaultClientValue}.`) } - if (this.options.clients < 0 || clientAnswer.clients < 0) { - this.log(`Error: Negative values not accepted. Defaulting to ${Math.abs(clientAnswer.clients)}.`) + if (this.options.workers < 0 || clientAnswer.workers < 0) { + this.log(`Error: Negative values not accepted. Defaulting to ${Math.abs(clientAnswer.workers)}.`) } roundAnswers = await this.prompt(configQuestions.roundQuestions); @@ -176,7 +176,7 @@ module.exports = class extends Generator { // Successfully parsed, now set it answersObject.chaincodeArguments = argsString; } catch (error) { - answersObject.chaincodeArguments = '[]'; + answersObject.chaincodeArguments = '[]'; } } @@ -189,18 +189,18 @@ module.exports = class extends Generator { _configWrite() { answersObject.benchmarkName = promptAnswers.benchmarkName; answersObject.benchmarkDescription = promptAnswers.benchmarkDescription; - answersObject.clients = promptAnswers.clients + answersObject.workers = promptAnswers.workers answersObject.label = promptAnswers.label; answersObject.txType = promptAnswers.txType; answersObject.chaincodeId = promptAnswers.chaincodeId; - if (isNaN(promptAnswers.clients)) { - answersObject.clients = defaultClientValue; - } else if (promptAnswers.clients < 0) { - answersObject.clients = Math.abs(promptAnswers.clients); + if (isNaN(promptAnswers.workers)) { + answersObject.workers = defaultClientValue; + } else if (promptAnswers.workers < 0) { + answersObject.workers = Math.abs(promptAnswers.workers); } else { - answersObject.clients = promptAnswers.clients; + answersObject.workers = promptAnswers.workers; } if (promptAnswers.txType === 'txDuration') { @@ -212,8 +212,8 @@ module.exports = class extends Generator { answersObject.txValue = promptAnswers.txDuration; } }; - - + + if (promptAnswers.txType === 'txNumber') { if (isNaN(promptAnswers.txNumber)) { answersObject.txValue = defaultTxValue; @@ -235,7 +235,7 @@ module.exports = class extends Generator { console.log('Generating benchmark files...'); this._callbackWrite(); answersObject.rateController = promptAnswers.rateController; - + switch(promptAnswers.rateController) { case 'fixed-rate': answersObject.opts = `tps: 10`; diff --git a/packages/caliper-generator/generator-caliper/generators/benchmark/templates/config.yaml b/packages/caliper-generator/generator-caliper/generators/benchmark/templates/config.yaml index 8425bd50ad..2aa4c19871 100644 --- a/packages/caliper-generator/generator-caliper/generators/benchmark/templates/config.yaml +++ b/packages/caliper-generator/generator-caliper/generators/benchmark/templates/config.yaml @@ -16,9 +16,9 @@ test: name: <%= benchmarkName %> description: <%= benchmarkDescription %> - clients: + workers: type: local - number: <%= clients %> + number: <%= workers %> rounds: - label: <%= label %> chaincodeId: <%= chaincodeId %> @@ -26,5 +26,5 @@ test: rateControl: { type: <%= rateController %>, opts: { <%= opts %> } } callback: benchmarks/callbacks/<%= callback %> monitor: - type: + type: - none diff --git a/packages/caliper-generator/generator-caliper/test/benchmark/callback.js b/packages/caliper-generator/generator-caliper/test/benchmark/callback.js index a344d2ade4..193b851aae 100644 --- a/packages/caliper-generator/generator-caliper/test/benchmark/callback.js +++ b/packages/caliper-generator/generator-caliper/test/benchmark/callback.js @@ -2,9 +2,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,7 +28,7 @@ describe('', () => { chaincodeArguments: `["args1", "args2", "args3"]`, name: 'x contract benchmark', description: 'benchmark for contract x', - clients: 5, + workers: 5, label: 'function test', chaincodeId: 'xContract', version: '1.0.0', @@ -61,7 +61,7 @@ describe('', () => { it('should populate the file based on answers to user prompts', async () => { await runGenerator(); - let callbackFileContent = `/*\n` + + let callbackFileContent = `/*\n` + `* Licensed under the Apache License, Version 2.0 (the "License");\n` + `* you may not use this file except in compliance with the License.\n` + `* You may obtain a copy of the License at\n` + @@ -70,7 +70,7 @@ describe('', () => { `* \n` + `* Unless required by applicable law or agreed to in writing, software\n` + `* distributed under the License is distributed on an "AS IS" BASIS,\n` + - `* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n` + + `* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n` + `* See the License for the specific language governing permissions and\n` + `* limitations under the License.\n` + `*/\n` + diff --git a/packages/caliper-generator/generator-caliper/test/benchmark/config.js b/packages/caliper-generator/generator-caliper/test/benchmark/config.js index e3af1109de..c36767d69d 100644 --- a/packages/caliper-generator/generator-caliper/test/benchmark/config.js +++ b/packages/caliper-generator/generator-caliper/test/benchmark/config.js @@ -2,9 +2,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,7 +31,7 @@ describe ('benchmark configuration generator', () => { chaincodeArguments: '["args1", "args2", "args3"]', benchmarkName: 'x contract benchmark', benchmarkDescription: 'benchmark for contract x', - clients: 10, + workers: 10, label: 'function test', chaincodeId: 'xContract', txType: 'txDuration', @@ -42,7 +42,7 @@ describe ('benchmark configuration generator', () => { test: { name: 'x contract benchmark', description: 'benchmark for contract x', - clients: { + workers: { type: 'local', number: 10 }, @@ -76,13 +76,13 @@ describe ('benchmark configuration generator', () => { .withPrompts(options); tmpConfigPath = path.join(dir, configPath); }; - + it('should create a workspace directory with a name defined by the user', async () => { options.txDuration = 30; await runGenerator(); assert.file(`${options.workspace}/`); }); - + it('should create a folder called benchmarks inside the workspace', async () => { options.txDuration = 30; await runGenerator(); @@ -142,7 +142,7 @@ describe ('benchmark configuration generator', () => { options.txDuration = 30; options.rateController = 'linear-rate'; await runGenerator(); - + const config = yaml.safeLoad(fs.readFileSync(tmpConfigPath),'utf8'); const configStr = JSON.stringify(config); const fileContains = configStr.includes('"type":"linear-rate"'); @@ -154,7 +154,7 @@ describe ('benchmark configuration generator', () => { options.txType = 'txDuration'; options.txDuration = 30; await runGenerator(); - + const config = yaml.safeLoad(fs.readFileSync(tmpConfigPath),'utf8'); const configStr = JSON.stringify(config); const fileContains = configStr.includes('"txDuration"'); @@ -166,7 +166,7 @@ describe ('benchmark configuration generator', () => { options.txType = 'txNumber'; options.txNumber = 30; await runGenerator(); - + const config = yaml.safeLoad(fs.readFileSync(tmpConfigPath),'utf8'); const configStr = JSON.stringify(config); const fileContains = configStr.includes('"txNumber"'); @@ -174,24 +174,24 @@ describe ('benchmark configuration generator', () => { fileContains.should.equal(true); }); - it('should provide a default client value if user answered prompt with a string for clients', async () => { - options.clients = "penguin"; + it('should provide a default client value if user answered prompt with a string for workers', async () => { + options.workers = "penguin"; await runGenerator(); const config = yaml.safeLoad(fs.readFileSync(tmpConfigPath),'utf8'); const configStr = JSON.stringify(config); - const fileContains = configStr.includes('"clients":{"type":"local","number":5'); + const fileContains = configStr.includes('"workers":{"type":"local","number":5'); fileContains.should.equal(true); }); - it('should provide an absolute value for client if user answered prompt with a negative number for clients', async () => { - options.clients = -10; + it('should provide an absolute value for client if user answered prompt with a negative number for workers', async () => { + options.workers = -10; await runGenerator(); const config = yaml.safeLoad(fs.readFileSync(tmpConfigPath),'utf8'); const configStr = JSON.stringify(config); - const fileContains = configStr.includes('"clients":{"type":"local","number":10'); + const fileContains = configStr.includes('"workers":{"type":"local","number":10'); fileContains.should.equal(true); }); diff --git a/packages/caliper-gui-server/src/gui-caliper-flow.js b/packages/caliper-gui-server/src/gui-caliper-flow.js index 04d1790e39..6fe69b2fe0 100644 --- a/packages/caliper-gui-server/src/gui-caliper-flow.js +++ b/packages/caliper-gui-server/src/gui-caliper-flow.js @@ -109,8 +109,7 @@ module.exports.run = async function(absConfigFile, absNetworkFile, admin, client await adminClient.init(); // bug is in here! #1434 of fabric.js await adminClient.installSmartContract(); let numberOfClients = await clientOrchestrator.init(); - let clientArgs = await adminClient.prepareClients(numberOfClients); - + let clientArgs = await adminClient.prepareWorkerArguments(numberOfClients); try { await monitorOrchestrator.startAllMonitors(); @@ -168,4 +167,4 @@ ${'#'.repeat(testSummary.length)} } return errorStatus; -}; \ No newline at end of file +}; diff --git a/packages/caliper-iroha/lib/iroha.js b/packages/caliper-iroha/lib/iroha.js index f81a523109..e993f1f905 100644 --- a/packages/caliper-iroha/lib/iroha.js +++ b/packages/caliper-iroha/lib/iroha.js @@ -195,10 +195,11 @@ function irohaQuery(queryOptions, commands) { * Implements {BlockchainInterface} for a Iroha backend. */ class Iroha extends BlockchainInterface { - constructor(config_path, workspace_root) { + constructor(config_path, workspace_root, clientIndex) { super(config_path); this.bcType = 'iroha'; this.workspaceRoot = workspace_root; + this.clientIndex = clientIndex; } /** @@ -223,11 +224,11 @@ class Iroha extends BlockchainInterface { } /** - * Perform required preparation for test clients + * Perform required information for test clients * @param {Number} number count of test clients * @return {Promise} obtained material for test clients */ - async prepareClients(number) { + async prepareWorkerArguments(number) { try{ // get admin info let config = require(this.configPath); @@ -239,13 +240,13 @@ class Iroha extends BlockchainInterface { let adminPriv = fs.readFileSync(privPath).toString(); let adminPub = fs.readFileSync(pubPath).toString(); // test - logger.info(`Admin's private key: ${adminPriv}`); - logger.info(`Admin's public key: ${adminPub}`); + logger.debug(`Admin's private key: ${adminPriv}`); + logger.debug(`Admin's public key: ${adminPub}`); // create account for each client let result = []; let node = this._findNode(); - logger.info('node: ' + node.torii); + logger.debug('node: ' + node.torii); let commandService = new CommandService_v1Client( node.torii, @@ -325,20 +326,18 @@ class Iroha extends BlockchainInterface { } catch(err){ logger.error(err); - return Promise.reject(new Error('Failed when prepareClients')); + return Promise.reject(new Error('Failed in prepareWorkerArguments')); } } /** * Return the Iroha context associated with the given callback module name. * @param {string} name The name of the callback module as defined in the configuration files, for example open or query. - * @param {object} args Unused, the client material returned by function prepareClient. - * @param {Integer} clientIdx The client index. - * @param {Object} txFile the file information for reading or writing. + * @param {object} args The client material returned by function prepareWorkerArguments. * @return {object} The assembled Iroha context. * @async */ - async getContext(name, args, clientIdx, txFile) { + async getContext(name, args) { try { if(!args.hasOwnProperty('name') || !args.hasOwnProperty('domain') || !args.hasOwnProperty('id') || !args.hasOwnProperty('pubKey') || !args.hasOwnProperty('privKey')) { throw new Error('Invalid Iroha::getContext arguments'); diff --git a/packages/caliper-iroha/lib/irohaClientFactory.js b/packages/caliper-iroha/lib/irohaClientFactory.js index 516d9fd3dc..1f469ef611 100644 --- a/packages/caliper-iroha/lib/irohaClientFactory.js +++ b/packages/caliper-iroha/lib/irohaClientFactory.js @@ -28,12 +28,6 @@ class IrohaClientFactory { */ spawnWorker() { const child = childProcess.fork(path.join(__dirname, './irohaClientWorker.js'), process.argv.slice(2), { env: process.env}); - - const msg = { - type: 'init' - }; - child.send(msg); - return child; } } diff --git a/packages/caliper-iroha/lib/irohaClientWorker.js b/packages/caliper-iroha/lib/irohaClientWorker.js index 0d7860c0e6..125fe1f781 100644 --- a/packages/caliper-iroha/lib/irohaClientWorker.js +++ b/packages/caliper-iroha/lib/irohaClientWorker.js @@ -14,7 +14,7 @@ 'use strict'; -const { MessageHandler } = require('@hyperledger/caliper-core'); +const { ConfigUtil, Messenger, MessageHandler } = require('@hyperledger/caliper-core'); const IrohaClient = require('./iroha'); /** @@ -25,16 +25,27 @@ const IrohaClient = require('./iroha'); * @async */ async function initHandler(context, message) { - return new IrohaClient(context.networkConfigPath, context.workspacePath); + return new IrohaClient(context.networkConfigPath, context.workspacePath, context.workerId); } -const handlerContext = new MessageHandler({ - init: initHandler -}); - /** - * Message handler + * Main process */ -process.on('message', async (message) => { - await MessageHandler.handle(handlerContext, message); -}); +async function main (){ + + // Create the message client using the specified type + const type = `${ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Method)}-worker`; + const messenger = new Messenger({type, sut: 'iroha'}); + await messenger.initialize(); + + // Create a handler context for this worker + const handlerContext = new MessageHandler({ + init: initHandler + }, messenger); + + // Pass to the messenger to configure + messenger.configure(handlerContext); + +} + +main(); diff --git a/packages/caliper-sawtooth/lib/sawtooth.js b/packages/caliper-sawtooth/lib/sawtooth.js index 96e44cbf99..07dbb128b1 100644 --- a/packages/caliper-sawtooth/lib/sawtooth.js +++ b/packages/caliper-sawtooth/lib/sawtooth.js @@ -319,12 +319,14 @@ class Sawtooth extends BlockchainInterface { * Constructor * @param {String} config_path path of the Sawtooth configuration file * @param {string} workspace_root The absolute path to the root location for the application configuration files. + * @param {number} clientIndex The client index */ - constructor(config_path, workspace_root) { + constructor(config_path, workspace_root, clientIndex) { super(config_path); configPath = config_path; this.bcType = 'sawtooth'; this.workspaceRoot = workspace_root; + this.clientIndex = clientIndex; } /** diff --git a/packages/caliper-sawtooth/lib/sawtoothClientFactory.js b/packages/caliper-sawtooth/lib/sawtoothClientFactory.js index 00c8d7db13..b70bc05382 100644 --- a/packages/caliper-sawtooth/lib/sawtoothClientFactory.js +++ b/packages/caliper-sawtooth/lib/sawtoothClientFactory.js @@ -28,12 +28,6 @@ class SawtoothClientFactory { */ spawnWorker() { const child = childProcess.fork(path.join(__dirname, './sawtoothClientWorker.js'), process.argv.slice(2), { env: process.env}); - - const msg = { - type: 'init' - }; - child.send(msg); - return child; } } diff --git a/packages/caliper-sawtooth/lib/sawtoothClientWorker.js b/packages/caliper-sawtooth/lib/sawtoothClientWorker.js index 22b4b3f1fa..821c552258 100644 --- a/packages/caliper-sawtooth/lib/sawtoothClientWorker.js +++ b/packages/caliper-sawtooth/lib/sawtoothClientWorker.js @@ -14,7 +14,7 @@ 'use strict'; -const { MessageHandler } = require('@hyperledger/caliper-core'); +const { ConfigUtil, Messenger, MessageHandler } = require('@hyperledger/caliper-core'); const SawtoothClient = require('./sawtooth'); /** @@ -25,16 +25,27 @@ const SawtoothClient = require('./sawtooth'); * @async */ async function initHandler(context, message) { - return new SawtoothClient(context.networkConfigPath, context.workspacePath); + return new SawtoothClient(context.networkConfigPath, context.workspacePath, context.workerId); } -const handlerContext = new MessageHandler({ - init: initHandler -}); - /** - * Message handler + * Main process */ -process.on('message', async (message) => { - await MessageHandler.handle(handlerContext, message); -}); +async function main (){ + + // Create the message client using the specified type + const type = `${ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Method)}-worker`; + const messenger = new Messenger({type, sut: 'sawtooth'}); + await messenger.initialize(); + + // Create a handler context for this worker + const handlerContext = new MessageHandler({ + init: initHandler + }, messenger); + + // Pass to the messenger to configure + messenger.configure(handlerContext); + +} + +main(); diff --git a/packages/caliper-tests-integration/besu_tests/phase1/benchconfig.yaml b/packages/caliper-tests-integration/besu_tests/phase1/benchconfig.yaml index 527115e316..5d74c188d3 100644 --- a/packages/caliper-tests-integration/besu_tests/phase1/benchconfig.yaml +++ b/packages/caliper-tests-integration/besu_tests/phase1/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 1 observer: diff --git a/packages/caliper-tests-integration/besu_tests/phase2/benchconfig.yaml b/packages/caliper-tests-integration/besu_tests/phase2/benchconfig.yaml index 95640097f9..caec02db4a 100644 --- a/packages/caliper-tests-integration/besu_tests/phase2/benchconfig.yaml +++ b/packages/caliper-tests-integration/besu_tests/phase2/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 1 rounds: diff --git a/packages/caliper-tests-integration/besu_tests/phase3/benchconfig.yaml b/packages/caliper-tests-integration/besu_tests/phase3/benchconfig.yaml index 6207d75b2b..fd691b4b96 100644 --- a/packages/caliper-tests-integration/besu_tests/phase3/benchconfig.yaml +++ b/packages/caliper-tests-integration/besu_tests/phase3/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/ethereum_tests/benchconfig.yaml b/packages/caliper-tests-integration/ethereum_tests/benchconfig.yaml index f59586e34e..910bc5e0a9 100644 --- a/packages/caliper-tests-integration/ethereum_tests/benchconfig.yaml +++ b/packages/caliper-tests-integration/ethereum_tests/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/fabric_tests/caliper.yaml b/packages/caliper-tests-integration/fabric_tests/caliper.yaml index bcd0e9228b..3585171f76 100644 --- a/packages/caliper-tests-integration/fabric_tests/caliper.yaml +++ b/packages/caliper-tests-integration/fabric_tests/caliper.yaml @@ -41,7 +41,11 @@ caliper: target: console enabled: true options: - level: debug + level: info file: target: file enabled: false + worker: + communication: + method: mqtt + address: mqtt://localhost:1883 diff --git a/packages/caliper-tests-integration/fabric_tests/docker-compose.yaml b/packages/caliper-tests-integration/fabric_tests/docker-compose.yaml index 7526419ead..7d57c19804 100644 --- a/packages/caliper-tests-integration/fabric_tests/docker-compose.yaml +++ b/packages/caliper-tests-integration/fabric_tests/docker-compose.yaml @@ -298,3 +298,19 @@ services: ports: - 8080:8080 restart: always + +############### +# MQTT BROKER # +############### + + mosquitto: + image: eclipse-mosquitto + hostname: mosquitto + container_name: mosquitto + restart: always + ports: + - "1883:1883" + - "9001:9001" + volumes: + - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + diff --git a/packages/caliper-tests-integration/fabric_tests/mosquitto/mosquitto.conf b/packages/caliper-tests-integration/fabric_tests/mosquitto/mosquitto.conf new file mode 100644 index 0000000000..4f9950610c --- /dev/null +++ b/packages/caliper-tests-integration/fabric_tests/mosquitto/mosquitto.conf @@ -0,0 +1 @@ +persistence false diff --git a/packages/caliper-tests-integration/fabric_tests/phase1/benchconfig.yaml b/packages/caliper-tests-integration/fabric_tests/phase1/benchconfig.yaml index 1ab6e5719a..a9555e37cd 100644 --- a/packages/caliper-tests-integration/fabric_tests/phase1/benchconfig.yaml +++ b/packages/caliper-tests-integration/fabric_tests/phase1/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/fabric_tests/phase2/benchconfig.yaml b/packages/caliper-tests-integration/fabric_tests/phase2/benchconfig.yaml index c4db4ca1ad..028b27009b 100644 --- a/packages/caliper-tests-integration/fabric_tests/phase2/benchconfig.yaml +++ b/packages/caliper-tests-integration/fabric_tests/phase2/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/fabric_tests/phase3/benchconfig.yaml b/packages/caliper-tests-integration/fabric_tests/phase3/benchconfig.yaml index fd895c902d..30398a2c02 100644 --- a/packages/caliper-tests-integration/fabric_tests/phase3/benchconfig.yaml +++ b/packages/caliper-tests-integration/fabric_tests/phase3/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/fabric_tests/phase4/benchconfig.yaml b/packages/caliper-tests-integration/fabric_tests/phase4/benchconfig.yaml index b75227f79e..2f5136a86e 100644 --- a/packages/caliper-tests-integration/fabric_tests/phase4/benchconfig.yaml +++ b/packages/caliper-tests-integration/fabric_tests/phase4/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/fabric_tests/phase5/benchconfig.yaml b/packages/caliper-tests-integration/fabric_tests/phase5/benchconfig.yaml index c4db4ca1ad..028b27009b 100644 --- a/packages/caliper-tests-integration/fabric_tests/phase5/benchconfig.yaml +++ b/packages/caliper-tests-integration/fabric_tests/phase5/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/fisco-bcos_tests/benchconfig.yaml b/packages/caliper-tests-integration/fisco-bcos_tests/benchconfig.yaml index a41599e2ed..a427317574 100644 --- a/packages/caliper-tests-integration/fisco-bcos_tests/benchconfig.yaml +++ b/packages/caliper-tests-integration/fisco-bcos_tests/benchconfig.yaml @@ -14,7 +14,7 @@ --- test: - clients: + workers: type: local number: 2 rounds: diff --git a/packages/caliper-tests-integration/package.json b/packages/caliper-tests-integration/package.json index 4a999b6ae5..af12af3c74 100644 --- a/packages/caliper-tests-integration/package.json +++ b/packages/caliper-tests-integration/package.json @@ -60,6 +60,7 @@ "fabric_tests/config/bin", "fabric_tests/config/config", "fabric_tests/config/crypto-config", + "fabric_tests/mosquitto", "ethereum_tests/.gitignore", "ethereum_tests/config/data", "ethereum_tests/config/keys", diff --git a/packages/caliper-tests-integration/sawtooth_tests/benchconfig.yaml b/packages/caliper-tests-integration/sawtooth_tests/benchconfig.yaml index b824ee4449..28580f078a 100644 --- a/packages/caliper-tests-integration/sawtooth_tests/benchconfig.yaml +++ b/packages/caliper-tests-integration/sawtooth_tests/benchconfig.yaml @@ -16,7 +16,7 @@ test: name: smallbank description: This is smallbank benchmark for caliper - clients: + workers: type: local number: 2 rounds: