From 92e19bcabeac753f044d0d5d949b85252acf6696 Mon Sep 17 00:00:00 2001 From: Attila Klenik Date: Wed, 6 Nov 2019 21:55:33 -0100 Subject: [PATCH] Simplify message handling of worker processes Signed-off-by: Attila Klenik --- .../caliper-burrow/lib/burrowClientWorker.js | 65 ++------ .../lib/composerClientWorker.js | 65 ++------ packages/caliper-core/index.js | 1 + .../lib/worker/client/message-handler.js | 150 ++++++++++++++++++ .../lib/ethereumClientWorker.js | 65 ++------ .../caliper-fabric/lib/fabricClientWorker.js | 73 +++------ .../lib/fiscoBcosClientWorker.js | 79 +++------ .../caliper-iroha/lib/irohaClientWorker.js | 65 ++------ .../lib/sawtoothClientWorker.js | 65 ++------ 9 files changed, 276 insertions(+), 352 deletions(-) create mode 100644 packages/caliper-core/lib/worker/client/message-handler.js diff --git a/packages/caliper-burrow/lib/burrowClientWorker.js b/packages/caliper-burrow/lib/burrowClientWorker.js index 41bd8646fb..048c6fd2c6 100644 --- a/packages/caliper-burrow/lib/burrowClientWorker.js +++ b/packages/caliper-burrow/lib/burrowClientWorker.js @@ -14,58 +14,27 @@ 'use strict'; -const { CaliperLocalClient, CaliperUtils, ConfigUtil } = require('@hyperledger/caliper-core'); -const logger = CaliperUtils.getLogger('burrow-client'); +const { MessageHandler } = require('@hyperledger/caliper-core'); const BurrowClient = require('./burrow'); -let caliperClient; /** - * Message handler + * Handles the init message. Constructs the Burrow adapter. + * @param {object} context The context of the message handler object. + * @param {object} message The message object. + * @return {Promise} The initialized adapter instance. + * @async */ -process.on('message', async (message) => { - - if (!message.hasOwnProperty('type')) { - let msg = 'Message type is missing'; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - return; - } - - try { - switch (message.type) { - case 'init': { - logger.info('Handling "init" message'); - logger.debug('Message content', message); - - const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); - let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); - networkConfigPath = CaliperUtils.resolvePath(networkConfigPath); +async function initHandler(context, message) { + return new BurrowClient(context.networkConfigPath, context.workspacePath); +} - const blockchain = new BurrowClient(networkConfigPath, workspacePath); - caliperClient = new CaliperLocalClient(blockchain); - process.send({ type: 'ready', data: { pid: process.pid, complete: true } }); - logger.info('Handled "init" message'); - break; - } - case 'test': { - logger.info('Handling "test" message'); - logger.debug('Message content', message); - let result = await caliperClient.doTest(message); +const handlerContext = new MessageHandler({ + init: initHandler +}); - await CaliperUtils.sleep(200); - process.send({ type: 'testResult', data: result }); - logger.info('Handled "test" message'); - break; - } - default: { - let msg = `Unknown message type "${message.type}"`; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - } - } - } - catch (err) { - logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); - } +/** + * Message handler + */ +process.on('message', async (message) => { + await MessageHandler.handle(handlerContext, message); }); diff --git a/packages/caliper-composer/lib/composerClientWorker.js b/packages/caliper-composer/lib/composerClientWorker.js index 0bb895b762..22d5522abe 100644 --- a/packages/caliper-composer/lib/composerClientWorker.js +++ b/packages/caliper-composer/lib/composerClientWorker.js @@ -14,58 +14,27 @@ 'use strict'; -const {CaliperLocalClient, CaliperUtils, ConfigUtil} = require('@hyperledger/caliper-core'); -const logger = CaliperUtils.getLogger('composer-client'); +const { MessageHandler } = require('@hyperledger/caliper-core'); const ComposerClient = require('./composer'); -let caliperClient; /** - * Message handler + * Handles the init message. Constructs the Composer adapter. + * @param {object} context The context of the message handler object. + * @param {object} message The message object. + * @return {Promise} The initialized adapter instance. + * @async */ -process.on('message', async (message) => { - - if (!message.hasOwnProperty('type')) { - let msg = 'Message type is missing'; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - return; - } - - try { - switch (message.type) { - case 'init': { - logger.info('Handling "init" message'); - logger.debug('Message content', message); - - const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); - let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); - networkConfigPath = CaliperUtils.resolvePath(networkConfigPath); +async function initHandler(context, message) { + return new ComposerClient(context.networkConfigPath, context.workspacePath); +} - const blockchain = new ComposerClient(networkConfigPath, workspacePath); - caliperClient = new CaliperLocalClient(blockchain); - process.send({type: 'ready', data: {pid: process.pid, complete: true}}); - logger.info('Handled "init" message'); - break; - } - case 'test': { - logger.info('Handling "test" message'); - logger.debug('Message content', message); - let result = await caliperClient.doTest(message); +const handlerContext = new MessageHandler({ + init: initHandler +}); - await CaliperUtils.sleep(200); - process.send({type: 'testResult', data: result}); - logger.info('Handled "test" message'); - break; - } - default: { - let msg = `Unknown message type "${message.type}"`; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - } - } - } - catch (err) { - logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); - } +/** + * Message handler + */ +process.on('message', async (message) => { + await MessageHandler.handle(handlerContext, message); }); diff --git a/packages/caliper-core/index.js b/packages/caliper-core/index.js index 81dba35503..7d4cfbc0e4 100644 --- a/packages/caliper-core/index.js +++ b/packages/caliper-core/index.js @@ -20,4 +20,5 @@ module.exports.TxStatus = require('./lib/common/core/transaction-status'); module.exports.CaliperUtils = require('./lib/common/utils/caliper-utils'); module.exports.Version = require('./lib/common/utils/version'); module.exports.ConfigUtil = require('./lib/common/config/config-util'); +module.exports.MessageHandler = require('./lib/worker/client/message-handler'); module.exports.CaliperEngine = require('./lib/master/caliper-engine'); diff --git a/packages/caliper-core/lib/worker/client/message-handler.js b/packages/caliper-core/lib/worker/client/message-handler.js new file mode 100644 index 0000000000..52b7c20b09 --- /dev/null +++ b/packages/caliper-core/lib/worker/client/message-handler.js @@ -0,0 +1,150 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const ConfigUtil = require('../../common/config/config-util.js'); +const CaliperUtils = require('../../common/utils/caliper-utils.js'); +const CaliperLocalClient = require('../../worker/client/caliper-local-client'); + +const logger = CaliperUtils.getLogger('message-handler'); + +/** + * Base class for handling IPC messages in worker processes. + */ +class MessageHandler { + /** + * Initializes the BaseMessageHandler instance. + * @param {object} handlers Object of message handler functions. + */ + constructor(handlers) { + if (!handlers.init) { + let msg = 'Handler for "init" is not specified'; + logger.error(msg); + throw new Error(msg); + } + + this.beforeInitHandler = handlers.beforeInit || MessageHandler.beforeInit; + this.afterInitHandler = handlers.afterInit || MessageHandler.afterInit; + this.initHandler = handlers.init; + + this.beforeTestHandler = handlers.beforeTest || MessageHandler.beforeTest; + this.afterTestHandler = handlers.afterTest || MessageHandler.afterTest; + this.testHandler = handlers.test || MessageHandler.test; + + // context/this fields + this.workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); + this.networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); + this.networkConfigPath = CaliperUtils.resolvePath(this.networkConfigPath); + this.adapter = undefined; + this.workerClient = undefined; + this.testResult = undefined; + } + + /** + * Called before processing the "init" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + */ + static async beforeInit(context, message) { + logger.info('Handling "init" message'); + logger.debug('Message content', message); + } + + /** + * Called after processing the "init" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + */ + static async afterInit(context, message) { + context.workerClient = new CaliperLocalClient(context.adapter); + process.send({type: 'ready', data: {pid: process.pid, complete: true}}); + logger.info('Handled "init" message'); + } + + /** + * Called before processing the "test" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + */ + static async beforeTest(context, message) { + logger.info('Handling "test" message'); + logger.debug('Message content', message); + } + + /** + * Called after processing the "test" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + */ + static async afterTest(context, message) { + await CaliperUtils.sleep(200); + process.send({type: 'testResult', data: context.testResult}); + logger.info('Handled "test" message'); + } + + /** + * Called for processing the "test" message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + * @return {Promise} The result object. + */ + static async test(context, message) { + return context.workerClient.doTest(message); + } + + /** + * Handles the IPC message. + * @param {object} context The context/state of the message handler. + * @param {object} message The message object. + */ + static async handle(context, message) { + if (!message.hasOwnProperty('type')) { + let msg = 'Message type is missing'; + logger.error(msg, message); + process.send({type: 'error', data: msg}); + return; + } + + try { + switch (message.type) { + case 'init': { + await context.beforeInitHandler(context, message); + context.adapter = await context.initHandler(context, message); + await context.afterInitHandler(context, message); + + break; + } + case 'test': { + await context.beforeTestHandler(context, message); + context.testResult = await context.testHandler(context, message); + await context.afterTestHandler(context, message); + + break; + } + default: { + let msg = `Unknown message type "${message.type}"`; + logger.error(msg, message); + process.send({type: 'error', data: msg}); + } + } + } + catch (err) { + logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); + process.send({type: 'error', data: err.toString()}); + } + } +} + +module.exports = MessageHandler; diff --git a/packages/caliper-ethereum/lib/ethereumClientWorker.js b/packages/caliper-ethereum/lib/ethereumClientWorker.js index e4d431dc97..07e882e025 100644 --- a/packages/caliper-ethereum/lib/ethereumClientWorker.js +++ b/packages/caliper-ethereum/lib/ethereumClientWorker.js @@ -14,58 +14,27 @@ 'use strict'; -const {CaliperLocalClient, CaliperUtils, ConfigUtil} = require('@hyperledger/caliper-core'); -const logger = CaliperUtils.getLogger('ethereum-client'); +const { MessageHandler } = require('@hyperledger/caliper-core'); const EthereumClient = require('./ethereum'); -let caliperClient; /** - * Message handler + * Handles the init message. Constructs the Ethereum adapter. + * @param {object} context The context of the message handler object. + * @param {object} message The message object. + * @return {Promise} The initialized adapter instance. + * @async */ -process.on('message', async (message) => { - - if (!message.hasOwnProperty('type')) { - let msg = 'Message type is missing'; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - return; - } - - try { - switch (message.type) { - case 'init': { - logger.info('Handling "init" message'); - logger.debug('Message content', message); - - const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); - let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); - networkConfigPath = CaliperUtils.resolvePath(networkConfigPath); +async function initHandler(context, message) { + return new EthereumClient(context.networkConfigPath, context.workspacePath); +} - const blockchain = new EthereumClient(networkConfigPath, workspacePath); - caliperClient = new CaliperLocalClient(blockchain); - process.send({type: 'ready', data: {pid: process.pid, complete: true}}); - logger.info('Handled "init" message'); - break; - } - case 'test': { - logger.info('Handling "test" message'); - logger.debug('Message content', message); - let result = await caliperClient.doTest(message); +const handlerContext = new MessageHandler({ + init: initHandler +}); - await CaliperUtils.sleep(200); - process.send({type: 'testResult', data: result}); - logger.info('Handled "test" message'); - break; - } - default: { - let msg = `Unknown message type "${message.type}"`; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - } - } - } - catch (err) { - logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); - } +/** + * Message handler + */ +process.on('message', async (message) => { + await MessageHandler.handle(handlerContext, message); }); diff --git a/packages/caliper-fabric/lib/fabricClientWorker.js b/packages/caliper-fabric/lib/fabricClientWorker.js index 7d12092796..26388fa2b3 100644 --- a/packages/caliper-fabric/lib/fabricClientWorker.js +++ b/packages/caliper-fabric/lib/fabricClientWorker.js @@ -14,63 +14,34 @@ 'use strict'; -const {CaliperLocalClient, CaliperUtils, ConfigUtil } = require('@hyperledger/caliper-core'); -const logger = CaliperUtils.getLogger('fabric-client'); +const { MessageHandler } = require('@hyperledger/caliper-core'); const FabricClient = require('./fabric'); -let caliperClient; /** - * Message handler + * Handles the init message. Constructs and initializes the Fabric adapter. + * @param {object} context The context of the message handler object. + * @param {object} message The message object. + * @return {Promise} The initialized adapter instance. + * @async */ -process.on('message', async (message) => { - - if (!message.hasOwnProperty('type')) { - let msg = 'Message type is missing'; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - return; - } +async function initHandler(context, message) { + const blockchain = new FabricClient(context.networkConfigPath, context.workspacePath); - try { - switch (message.type) { - case 'init': { - logger.info('Handling "init" message'); - logger.debug('Message content', message); + // reload the profiles silently + await blockchain._initializeRegistrars(false); + await blockchain._initializeAdmins(false); + await blockchain._initializeUsers(false); - const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); - let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); - networkConfigPath = CaliperUtils.resolvePath(networkConfigPath); + return blockchain; +} - const blockchain = new FabricClient(networkConfigPath, workspacePath); - // reload the profiles silently - await blockchain._initializeRegistrars(false); - await blockchain._initializeAdmins(false); - await blockchain._initializeUsers(false); - - caliperClient = new CaliperLocalClient(blockchain); - process.send({type: 'ready', data: {pid: process.pid, complete: true}}); - logger.info('Handled "init" message'); - break; - } - case 'test': { - logger.info('Handling "test" message'); - logger.debug('Message content', message); - let result = await caliperClient.doTest(message); +const handlerContext = new MessageHandler({ + init: initHandler +}); - await CaliperUtils.sleep(200); - process.send({type: 'testResult', data: result}); - logger.info('Handled "test" message'); - break; - } - default: { - let msg = `Unknown message type "${message.type}"`; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - } - } - } - catch (err) { - logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); - } +/** + * Message handler + */ +process.on('message', async (message) => { + await MessageHandler.handle(handlerContext, message); }); diff --git a/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js b/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js index 38526aef4b..d84531e0b6 100644 --- a/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js +++ b/packages/caliper-fisco-bcos/lib/fiscoBcosClientWorker.js @@ -14,70 +14,27 @@ 'use strict'; -const { - CaliperLocalClient, - CaliperUtils, - ConfigUtil -} = require('@hyperledger/caliper-core'); -const logger = CaliperUtils.getLogger('fisco-bcos-client'); -const fiscoBcosClient = require('./fiscoBcos'); - -let caliperClient; +const { MessageHandler } = require('@hyperledger/caliper-core'); +const FiscoBcosClient = require('./fiscoBcos'); /** - * Messages handler + * Handles the init message. Constructs the FISCO-BCOS adapter. + * @param {object} context The context of the message handler object. + * @param {object} message The message object. + * @return {Promise} The initialized adapter instance. + * @async */ -process.on('message', async (message) => { - if (!message.hasOwnProperty('type')) { - let msg = 'Message type is missing'; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - return; - } - - try { - switch (message.type) { - case 'init': { - logger.info('Handling "init" message'); - logger.debug('Message content', message); - - const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); - let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); - networkConfigPath = CaliperUtils.resolvePath(networkConfigPath); +async function initHandler(context, message) { + return new FiscoBcosClient(context.networkConfigPath, context.workspacePath); +} - const blockchain = new fiscoBcosClient(networkConfigPath, workspacePath); - caliperClient = new CaliperLocalClient(blockchain); - process.send({ - type: 'ready', - data: { - pid: process.pid, - complete: true - } - }); - logger.info('Handled "init" message'); - break; - } - case 'test': { - logger.info('Handling "test" message'); - logger.debug('Message content', message); - let result = await caliperClient.doTest(message); +const handlerContext = new MessageHandler({ + init: initHandler +}); - await CaliperUtils.sleep(200); - process.send({ - type: 'testResult', - data: result - }); - logger.info('Handled "test" message'); - break; - } - default: { - let msg = `Unknown message type "${message.type}"`; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - } - } - } catch (err) { - logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); - } +/** + * Message handler + */ +process.on('message', async (message) => { + await MessageHandler.handle(handlerContext, message); }); diff --git a/packages/caliper-iroha/lib/irohaClientWorker.js b/packages/caliper-iroha/lib/irohaClientWorker.js index 1f53419d7c..0d7860c0e6 100644 --- a/packages/caliper-iroha/lib/irohaClientWorker.js +++ b/packages/caliper-iroha/lib/irohaClientWorker.js @@ -14,58 +14,27 @@ 'use strict'; -const {CaliperLocalClient, CaliperUtils, ConfigUtil} = require('@hyperledger/caliper-core'); -const logger = CaliperUtils.getLogger('iroha-client'); +const { MessageHandler } = require('@hyperledger/caliper-core'); const IrohaClient = require('./iroha'); -let caliperClient; /** - * Message handler + * Handles the init message. Constructs the Iroha adapter. + * @param {object} context The context of the message handler object. + * @param {object} message The message object. + * @return {Promise} The initialized adapter instance. + * @async */ -process.on('message', async (message) => { - - if (!message.hasOwnProperty('type')) { - let msg = 'Message type is missing'; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - return; - } - - try { - switch (message.type) { - case 'init': { - logger.info('Handling "init" message'); - logger.debug('Message content', message); - - const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); - let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); - networkConfigPath = CaliperUtils.resolvePath(networkConfigPath); +async function initHandler(context, message) { + return new IrohaClient(context.networkConfigPath, context.workspacePath); +} - const blockchain = new IrohaClient(networkConfigPath, workspacePath); - caliperClient = new CaliperLocalClient(blockchain); - process.send({type: 'ready', data: {pid: process.pid, complete: true}}); - logger.info('Handled "init" message'); - break; - } - case 'test': { - logger.info('Handling "test" message'); - logger.debug('Message content', message); - let result = await caliperClient.doTest(message); +const handlerContext = new MessageHandler({ + init: initHandler +}); - await CaliperUtils.sleep(200); - process.send({type: 'testResult', data: result}); - logger.info('Handled "test" message'); - break; - } - default: { - let msg = `Unknown message type "${message.type}"`; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - } - } - } - catch (err) { - logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); - } +/** + * Message handler + */ +process.on('message', async (message) => { + await MessageHandler.handle(handlerContext, message); }); diff --git a/packages/caliper-sawtooth/lib/sawtoothClientWorker.js b/packages/caliper-sawtooth/lib/sawtoothClientWorker.js index 022cb5bc3b..22b4b3f1fa 100644 --- a/packages/caliper-sawtooth/lib/sawtoothClientWorker.js +++ b/packages/caliper-sawtooth/lib/sawtoothClientWorker.js @@ -14,58 +14,27 @@ 'use strict'; -const {CaliperLocalClient, CaliperUtils, ConfigUtil} = require('@hyperledger/caliper-core'); -const logger = CaliperUtils.getLogger('sawtooth-client'); +const { MessageHandler } = require('@hyperledger/caliper-core'); const SawtoothClient = require('./sawtooth'); -let caliperClient; /** - * Message handler + * Handles the init message. Constructs the Sawtooth adapter. + * @param {object} context The context of the message handler object. + * @param {object} message The message object. + * @return {Promise} The initialized adapter instance. + * @async */ -process.on('message', async (message) => { - - if (!message.hasOwnProperty('type')) { - let msg = 'Message type is missing'; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - return; - } - - try { - switch (message.type) { - case 'init': { - logger.info('Handling "init" message'); - logger.debug('Message content', message); - - const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace); - let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig); - networkConfigPath = CaliperUtils.resolvePath(networkConfigPath); +async function initHandler(context, message) { + return new SawtoothClient(context.networkConfigPath, context.workspacePath); +} - const blockchain = new SawtoothClient(networkConfigPath, workspacePath); - caliperClient = new CaliperLocalClient(blockchain); - process.send({type: 'ready', data: {pid: process.pid, complete: true}}); - logger.info('Handled "init" message'); - break; - } - case 'test': { - logger.info('Handling "test" message'); - logger.debug('Message content', message); - let result = await caliperClient.doTest(message); +const handlerContext = new MessageHandler({ + init: initHandler +}); - await CaliperUtils.sleep(200); - process.send({type: 'testResult', data: result}); - logger.info('Handled "test" message'); - break; - } - default: { - let msg = `Unknown message type "${message.type}"`; - logger.error(msg, message); - process.send({type: 'error', data: msg}); - } - } - } - catch (err) { - logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`); - process.send({type: 'error', data: err.toString()}); - } +/** + * Message handler + */ +process.on('message', async (message) => { + await MessageHandler.handle(handlerContext, message); });