Skip to content

Commit

Permalink
Simplify message handling of worker processes
Browse files Browse the repository at this point in the history
Signed-off-by: Attila Klenik <[email protected]>
  • Loading branch information
aklenik committed Nov 6, 2019
1 parent 23c045e commit 92e19bc
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 352 deletions.
65 changes: 17 additions & 48 deletions packages/caliper-burrow/lib/burrowClientWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,27 @@

'use strict';

const { CaliperLocalClient, CaliperUtils, ConfigUtil } = require('@hyperledger/caliper-core');
const logger = CaliperUtils.getLogger('burrow-client');
const { MessageHandler } = require('@hyperledger/caliper-core');
const BurrowClient = require('./burrow');

let caliperClient;
/**
* Message handler
* Handles the init message. Constructs the Burrow adapter.
* @param {object} context The context of the message handler object.
* @param {object} message The message object.
* @return {Promise<BurrowClient>} The initialized adapter instance.
* @async
*/
process.on('message', async (message) => {

if (!message.hasOwnProperty('type')) {
let msg = 'Message type is missing';
logger.error(msg, message);
process.send({type: 'error', data: msg});
return;
}

try {
switch (message.type) {
case 'init': {
logger.info('Handling "init" message');
logger.debug('Message content', message);

const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace);
let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig);
networkConfigPath = CaliperUtils.resolvePath(networkConfigPath);
async function initHandler(context, message) {
return new BurrowClient(context.networkConfigPath, context.workspacePath);
}

const blockchain = new BurrowClient(networkConfigPath, workspacePath);
caliperClient = new CaliperLocalClient(blockchain);
process.send({ type: 'ready', data: { pid: process.pid, complete: true } });
logger.info('Handled "init" message');
break;
}
case 'test': {
logger.info('Handling "test" message');
logger.debug('Message content', message);
let result = await caliperClient.doTest(message);
const handlerContext = new MessageHandler({
init: initHandler
});

await CaliperUtils.sleep(200);
process.send({ type: 'testResult', data: result });
logger.info('Handled "test" message');
break;
}
default: {
let msg = `Unknown message type "${message.type}"`;
logger.error(msg, message);
process.send({type: 'error', data: msg});
}
}
}
catch (err) {
logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`);
process.send({type: 'error', data: err.toString()});
}
/**
* Message handler
*/
process.on('message', async (message) => {
await MessageHandler.handle(handlerContext, message);
});
65 changes: 17 additions & 48 deletions packages/caliper-composer/lib/composerClientWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,27 @@

'use strict';

const {CaliperLocalClient, CaliperUtils, ConfigUtil} = require('@hyperledger/caliper-core');
const logger = CaliperUtils.getLogger('composer-client');
const { MessageHandler } = require('@hyperledger/caliper-core');
const ComposerClient = require('./composer');

let caliperClient;
/**
* Message handler
* Handles the init message. Constructs the Composer adapter.
* @param {object} context The context of the message handler object.
* @param {object} message The message object.
* @return {Promise<ComposerClient>} The initialized adapter instance.
* @async
*/
process.on('message', async (message) => {

if (!message.hasOwnProperty('type')) {
let msg = 'Message type is missing';
logger.error(msg, message);
process.send({type: 'error', data: msg});
return;
}

try {
switch (message.type) {
case 'init': {
logger.info('Handling "init" message');
logger.debug('Message content', message);

const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace);
let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig);
networkConfigPath = CaliperUtils.resolvePath(networkConfigPath);
async function initHandler(context, message) {
return new ComposerClient(context.networkConfigPath, context.workspacePath);
}

const blockchain = new ComposerClient(networkConfigPath, workspacePath);
caliperClient = new CaliperLocalClient(blockchain);
process.send({type: 'ready', data: {pid: process.pid, complete: true}});
logger.info('Handled "init" message');
break;
}
case 'test': {
logger.info('Handling "test" message');
logger.debug('Message content', message);
let result = await caliperClient.doTest(message);
const handlerContext = new MessageHandler({
init: initHandler
});

await CaliperUtils.sleep(200);
process.send({type: 'testResult', data: result});
logger.info('Handled "test" message');
break;
}
default: {
let msg = `Unknown message type "${message.type}"`;
logger.error(msg, message);
process.send({type: 'error', data: msg});
}
}
}
catch (err) {
logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`);
process.send({type: 'error', data: err.toString()});
}
/**
* Message handler
*/
process.on('message', async (message) => {
await MessageHandler.handle(handlerContext, message);
});
1 change: 1 addition & 0 deletions packages/caliper-core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ module.exports.TxStatus = require('./lib/common/core/transaction-status');
module.exports.CaliperUtils = require('./lib/common/utils/caliper-utils');
module.exports.Version = require('./lib/common/utils/version');
module.exports.ConfigUtil = require('./lib/common/config/config-util');
module.exports.MessageHandler = require('./lib/worker/client/message-handler');
module.exports.CaliperEngine = require('./lib/master/caliper-engine');
150 changes: 150 additions & 0 deletions packages/caliper-core/lib/worker/client/message-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const ConfigUtil = require('../../common/config/config-util.js');
const CaliperUtils = require('../../common/utils/caliper-utils.js');
const CaliperLocalClient = require('../../worker/client/caliper-local-client');

const logger = CaliperUtils.getLogger('message-handler');

/**
* Base class for handling IPC messages in worker processes.
*/
class MessageHandler {
/**
* Initializes the BaseMessageHandler instance.
* @param {object} handlers Object of message handler functions.
*/
constructor(handlers) {
if (!handlers.init) {
let msg = 'Handler for "init" is not specified';
logger.error(msg);
throw new Error(msg);
}

this.beforeInitHandler = handlers.beforeInit || MessageHandler.beforeInit;
this.afterInitHandler = handlers.afterInit || MessageHandler.afterInit;
this.initHandler = handlers.init;

this.beforeTestHandler = handlers.beforeTest || MessageHandler.beforeTest;
this.afterTestHandler = handlers.afterTest || MessageHandler.afterTest;
this.testHandler = handlers.test || MessageHandler.test;

// context/this fields
this.workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace);
this.networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig);
this.networkConfigPath = CaliperUtils.resolvePath(this.networkConfigPath);
this.adapter = undefined;
this.workerClient = undefined;
this.testResult = undefined;
}

/**
* Called before processing the "init" message.
* @param {object} context The context/state of the message handler.
* @param {object} message The message object.
*/
static async beforeInit(context, message) {
logger.info('Handling "init" message');
logger.debug('Message content', message);
}

/**
* Called after processing the "init" message.
* @param {object} context The context/state of the message handler.
* @param {object} message The message object.
*/
static async afterInit(context, message) {
context.workerClient = new CaliperLocalClient(context.adapter);
process.send({type: 'ready', data: {pid: process.pid, complete: true}});
logger.info('Handled "init" message');
}

/**
* Called before processing the "test" message.
* @param {object} context The context/state of the message handler.
* @param {object} message The message object.
*/
static async beforeTest(context, message) {
logger.info('Handling "test" message');
logger.debug('Message content', message);
}

/**
* Called after processing the "test" message.
* @param {object} context The context/state of the message handler.
* @param {object} message The message object.
*/
static async afterTest(context, message) {
await CaliperUtils.sleep(200);
process.send({type: 'testResult', data: context.testResult});
logger.info('Handled "test" message');
}

/**
* Called for processing the "test" message.
* @param {object} context The context/state of the message handler.
* @param {object} message The message object.
* @return {Promise<object>} The result object.
*/
static async test(context, message) {
return context.workerClient.doTest(message);
}

/**
* Handles the IPC message.
* @param {object} context The context/state of the message handler.
* @param {object} message The message object.
*/
static async handle(context, message) {
if (!message.hasOwnProperty('type')) {
let msg = 'Message type is missing';
logger.error(msg, message);
process.send({type: 'error', data: msg});
return;
}

try {
switch (message.type) {
case 'init': {
await context.beforeInitHandler(context, message);
context.adapter = await context.initHandler(context, message);
await context.afterInitHandler(context, message);

break;
}
case 'test': {
await context.beforeTestHandler(context, message);
context.testResult = await context.testHandler(context, message);
await context.afterTestHandler(context, message);

break;
}
default: {
let msg = `Unknown message type "${message.type}"`;
logger.error(msg, message);
process.send({type: 'error', data: msg});
}
}
}
catch (err) {
logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`);
process.send({type: 'error', data: err.toString()});
}
}
}

module.exports = MessageHandler;
65 changes: 17 additions & 48 deletions packages/caliper-ethereum/lib/ethereumClientWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,27 @@

'use strict';

const {CaliperLocalClient, CaliperUtils, ConfigUtil} = require('@hyperledger/caliper-core');
const logger = CaliperUtils.getLogger('ethereum-client');
const { MessageHandler } = require('@hyperledger/caliper-core');
const EthereumClient = require('./ethereum');

let caliperClient;
/**
* Message handler
* Handles the init message. Constructs the Ethereum adapter.
* @param {object} context The context of the message handler object.
* @param {object} message The message object.
* @return {Promise<EthereumClient>} The initialized adapter instance.
* @async
*/
process.on('message', async (message) => {

if (!message.hasOwnProperty('type')) {
let msg = 'Message type is missing';
logger.error(msg, message);
process.send({type: 'error', data: msg});
return;
}

try {
switch (message.type) {
case 'init': {
logger.info('Handling "init" message');
logger.debug('Message content', message);

const workspacePath = ConfigUtil.get(ConfigUtil.keys.Workspace);
let networkConfigPath = ConfigUtil.get(ConfigUtil.keys.NetworkConfig);
networkConfigPath = CaliperUtils.resolvePath(networkConfigPath);
async function initHandler(context, message) {
return new EthereumClient(context.networkConfigPath, context.workspacePath);
}

const blockchain = new EthereumClient(networkConfigPath, workspacePath);
caliperClient = new CaliperLocalClient(blockchain);
process.send({type: 'ready', data: {pid: process.pid, complete: true}});
logger.info('Handled "init" message');
break;
}
case 'test': {
logger.info('Handling "test" message');
logger.debug('Message content', message);
let result = await caliperClient.doTest(message);
const handlerContext = new MessageHandler({
init: initHandler
});

await CaliperUtils.sleep(200);
process.send({type: 'testResult', data: result});
logger.info('Handled "test" message');
break;
}
default: {
let msg = `Unknown message type "${message.type}"`;
logger.error(msg, message);
process.send({type: 'error', data: msg});
}
}
}
catch (err) {
logger.error(`Error while handling "${message.type}" message: ${err.stack || err}`);
process.send({type: 'error', data: err.toString()});
}
/**
* Message handler
*/
process.on('message', async (message) => {
await MessageHandler.handle(handlerContext, message);
});
Loading

0 comments on commit 92e19bc

Please sign in to comment.