Skip to content

Commit

Permalink
Merge pull request #682 from nklincoln/client-mods-mqtt
Browse files Browse the repository at this point in the history
Enable alternative messaging between master and worker
  • Loading branch information
nklincoln authored Jan 15, 2020
2 parents 04b01c0 + d6ef2cf commit 32c0e77
Show file tree
Hide file tree
Showing 67 changed files with 1,708 additions and 628 deletions.
6 changes: 0 additions & 6 deletions packages/caliper-burrow/lib/burrowClientFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ class BurrowClientFactory {
*/
async spawnWorker() {
const child = childProcess.fork(path.join(__dirname, './burrowClientWorker.js'), process.argv.slice(2), { env: process.env});

const msg = {
type: 'init'
};
child.send(msg);

return child;
}
}
Expand Down
31 changes: 21 additions & 10 deletions packages/caliper-burrow/lib/burrowClientWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

'use strict';

const { MessageHandler } = require('@hyperledger/caliper-core');
const { ConfigUtil, Messenger, MessageHandler } = require('@hyperledger/caliper-core');
const BurrowClient = require('./burrow');

/**
Expand All @@ -25,16 +25,27 @@ const BurrowClient = require('./burrow');
* @async
*/
async function initHandler(context, message) {
return new BurrowClient(context.networkConfigPath, context.workspacePath);
return new BurrowClient(context.networkConfigPath, context.workspacePath, context.workerId);
}

const handlerContext = new MessageHandler({
init: initHandler
});

/**
* Message handler
* Main process
*/
process.on('message', async (message) => {
await MessageHandler.handle(handlerContext, message);
});
async function main (){

// Create the message client using the specified type
const type = `${ConfigUtil.get(ConfigUtil.keys.Worker.Communication.Method)}-worker`;
const messenger = new Messenger({type, sut: 'burrow'});
await messenger.initialize();

// Create a handler context for this worker
const handlerContext = new MessageHandler({
init: initHandler
}, messenger);

// Pass to the messenger to configure
messenger.configure(handlerContext);

}

main();
1 change: 1 addition & 0 deletions packages/caliper-core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ module.exports.CaliperUtils = require('./lib/common/utils/caliper-utils');
module.exports.Version = require('./lib/common/utils/version');
module.exports.ConfigUtil = require('./lib/common/config/config-util');
module.exports.MessageHandler = require('./lib/worker/client/message-handler');
module.exports.Messenger = require('./lib/common/messaging/messenger');
module.exports.CaliperEngine = require('./lib/master/caliper-engine');
10 changes: 8 additions & 2 deletions packages/caliper-core/lib/common/config/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ const keys = {
MachineConfig: 'caliper-machineconfig',
BenchConfig: 'caliper-benchconfig',
NetworkConfig: 'caliper-networkconfig',
ZooAddress: 'caliper-zooaddress',
ZooConfig: 'caliper-zooconfig',
TxUpdateTime: 'caliper-txupdatetime',
LoggingRoot: 'caliper-logging',
Logging: {
Expand Down Expand Up @@ -87,6 +85,14 @@ const keys = {
},
Targets: 'caliper-logging-targets'
},
Worker: {
Remote: 'caliper-worker-remote',
PollInterval: 'caliper-worker-pollinterval',
Communication: {
Method: 'caliper-worker-communication-method',
Address: 'caliper-worker-communication-address',
}
},
Flow: {
Skip: {
Start : 'caliper-flow-skip-start',
Expand Down
13 changes: 12 additions & 1 deletion packages/caliper-core/lib/common/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,18 @@ caliper:
options:
flags: a
mode: 0666

# Worker options
worker:
# Indicate if workers are in distributed mode
remote: false
# Polling interval to use once created, in milliseconds
pollinterval: 5000
# Worker communication details
communication:
# Method used (process | mqtt)
method: process
# Address used for mqtt communications
address: mqtt://localhost:1883
# Caliper flow options
flow:
# Skip options
Expand Down
13 changes: 7 additions & 6 deletions packages/caliper-core/lib/common/core/blockchain-interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ class BlockchainInterface {
}

/**
* Perform required preparation for test clients
* @param {Number} number count of test clients
* @return {Promise} obtained material for test clients
* Retrieve required arguments for test workers, e.g. retrieve information from the adaptor that is generated during an admin phase such as contract installation.
* Information returned here is passed to the worker through the messaging protocol on test.
* @param {Number} number total count of test workers
* @return {Promise} array of obtained material for each test worker
* @async
*/
async prepareClients (number) {
async prepareWorkerArguments(number) {
let result = [];
for(let i = 0 ; i< number ; i++) {
result[i] = {}; // as default, return an empty object for each client
Expand All @@ -61,9 +63,8 @@ class BlockchainInterface {
* }
* @param {String} name name of the context
* @param {Object} args adapter specific arguments
* @param {Integer} clientIdx the client index
*/
async getContext(name, args, clientIdx) {
async getContext(name, args) {
throw new Error('getContext is not implemented for this blockchain system');
}

Expand Down
18 changes: 8 additions & 10 deletions packages/caliper-core/lib/common/core/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class Blockchain {
}

/**
* Perform required preparation for test clients, e.g. enroll clients and obtain key pairs
* @param {Number} number count of test clients
* @return {Promise} array of obtained material for test clients
* @async
* Retrieve required arguments for test workers, e.g. retrieve information from the adaptor that is generated during an admin phase such as contract installation.
* Information returned here is passed to the worker through the messaging protocol on test.
* @param {Number} number total count of test workers
* @return {Promise} array of obtained material for each test worker
*/
async prepareClients (number) {
return await this.bcObj.prepareClients(number);
async prepareWorkerArguments(number) {
return await this.bcObj.prepareWorkerArguments(number);
}

/**
Expand All @@ -67,13 +67,11 @@ class Blockchain {
* Get a context for subsequent operations, e.g. invoke smart contract or query state
* @param {String} name name of the context
* @param {Object} args adapter specific arguments
* @param {Integer} clientIdx the client index
* @param {Object} txFile the file information for reading or writing.
* @return {Promise} obtained context object
* @async
*/
async getContext(name, args, clientIdx, txFile) {
return await this.bcObj.getContext(name, args, clientIdx, txFile);
async getContext(name, args) {
return await this.bcObj.getContext(name, args);
}

/**
Expand Down
77 changes: 77 additions & 0 deletions packages/caliper-core/lib/common/messaging/messenger-interface.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const Logger = require('../utils/caliper-utils').getLogger('messenger-base');


/**
* Interface of messenger. Messenger implementations must follow a naming convention that is <type>-observer.js so
* that they may be dynamically loaded in the WorkerOrchestrator and WorkerAdaptor
*/
class MessengerInterface {

/**
* Set configuration details
* @param {object} configuration configuration details for the messenger
*/
constructor(configuration) {
this.configuration = configuration;
}

/**
* Initialize the Messenger
* @async
*/
async initialize() {
this._throwNotImplementedError('initialize');
}

/**
* Configure the Messenger for use
* @async
*/
async configure() {
this._throwNotImplementedError('configure');
}

/**
* Send a message using the messenger
*/
send() {
this._throwNotImplementedError('send');
}

/**
* Get the UUID for the messenger
*/
getUUID() {
this._throwNotImplementedError('getUUID');
}

/**
* Logs and throws a "not implemented" error for the given function.
* @param {string} functionName The name of the function.
* @private
*/
_throwNotImplementedError(functionName) {
let msg = `The function "${functionName}" is not implemented for this messenger`;
Logger.error(msg);
throw new Error(msg);
}

}

module.exports = MessengerInterface;
100 changes: 100 additions & 0 deletions packages/caliper-core/lib/common/messaging/messenger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';

const CaliperUtils = require('..//utils/caliper-utils');
const Logger = CaliperUtils.getLogger('messenger.js');

const builtInMessengers = new Map([
['mqtt-master', './mqtt-master.js'],
['mqtt-worker', './mqtt-worker.js'],
['process-master', './process-master.js'],
['process-worker', './process-worker.js']
]);

const Messenger = class {

/**
* Instantiates the proxy messenger and creates the configured messenger behind it.
* @param {object} configuration The messenger configuration object.
*/
constructor(configuration) {
this.configuration = configuration;

Logger.info(`Creating messenger of type "${configuration.type}" ${configuration.sut ? ` for SUT ${configuration.sut}` : ''}`);

// resolve the type to a module path
let modulePath = builtInMessengers.has(configuration.type)
? builtInMessengers.get(configuration.type) : CaliperUtils.resolvePath(configuration.type); // TODO: what if it's an external module name?

let factoryFunction = require(modulePath).createMessenger;
if (!factoryFunction) {
throw new Error(`${configuration.type} does not export the mandatory factory function 'createMessenger'`);
}

this.messenger = factoryFunction(configuration);
}

/**
* Initialize the Messenger
* @async
*/
async initialize() {
await this.messenger.initialize();
}

/**
* Configure the Messenger for use
* @param {object} configurationObject configuration object
* @async
*/
async configure(configurationObject) {
await this.messenger.configure(configurationObject);
}

/**
* Get the UUID for the messenger
* @returns {string} the UUID of the messenger
*/
getUUID() {
return this.messenger.getUUID();
}

/**
* Method used to publish message to worker clients
* @param {string[]} to string array of workers that the update is intended for
* @param {*} type the string type of the update
* @param {*} data data pertinent to the update type
*/
send(to, type, data) {
// Create Date object for timestamp
const date = new Date();

// Augment data object with type
data.type = type;

// Create complete message
const message = {
to,
from: this.messenger.getUUID(),
timestamp: date.toISOString(),
data
};

this.messenger.send(message);
}

};

module.exports = Messenger;
Loading

0 comments on commit 32c0e77

Please sign in to comment.