-
Notifications
You must be signed in to change notification settings - Fork 403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable alternative messaging between master and worker #682
Enable alternative messaging between master and worker #682
Conversation
d192a39
to
82e6d8c
Compare
effe9e5
to
4367e33
Compare
- Added messaging capability with default mqtt comms - client replaced with 'worker' and rename of relevant files - clientId assigned by client factory spawn process - clientId held internal to adaptor - blockchain prepareClients method changed to prepareWorkerArguments - prepare phase added to orchestrator to run the callback `init` function as a distinct and unmonitored step - phasing of test made more obvious, with new promise resolution method 'updateClientPhase' and explicit READY, PREPARED, TEST phases Signed-off-by: [email protected] <[email protected]>
4367e33
to
d6ef2cf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superb PR 🎆 🥇
Almost all of the remarks are general insights, and up for debate. The only "bug" is in the Ethereum adapter's prepareWorkerArguments
function, but even that is harmless, can be fixed later, so I'm approving the PR as is. This way the depending Fabric adapter PR doesn't need to be rebased.
let result = []; | ||
for (let i = 0 ; i< number ; i++) { | ||
for (let i = 0 ; i<= number ; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The i <= number
condition results in number + 1
array items. Which conflicts with the default BlockchainInterface
implementation.
@@ -2212,8 +2211,7 @@ class Fabric extends BlockchainInterface { | |||
|
|||
// We are done - return the networkUtil object | |||
return { | |||
networkInfo: this.networkUtil, | |||
clientIdx | |||
networkInfo: this.networkUtil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we also return the worker index in the context? The Ethereum adapter does that.
@@ -2291,8 +2289,7 @@ class Fabric extends BlockchainInterface { | |||
} | |||
|
|||
return { | |||
networkInfo: this.networkUtil, | |||
clientIdx | |||
networkInfo: this.networkUtil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return the worker index here too.
* @return {Promise<object>} The promise for the result of the execution. | ||
*/ | ||
async getContext(name, args, clientIdx) { | ||
async getContext(name, args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should return the worker index for consistency since that's a SUT-independent data.
(Might be worth implementing this at the interface level, so it's always included.)
* @return {object} The assembled Iroha context. | ||
* @async | ||
*/ | ||
async getContext(name, args, clientIdx, txFile) { | ||
async getContext(name, args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should include worker index in returned context (easier at the interface level, this is just another reminder here).
super(config_path); | ||
configPath = config_path; | ||
this.bcType = 'sawtooth'; | ||
this.workspaceRoot = workspace_root; | ||
this.clientIndex = clientIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reminder to include the worker index in the context :)
* Configure the Messenger for use | ||
* @async | ||
*/ | ||
async configure() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configurationObject
parameter used in derived classes is missing from the interface.
const message = { | ||
to, | ||
from: this.messenger.getUUID(), | ||
timestamp: date.toISOString(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The epoch ms returned by Date.now()
might be more general and easier to handle across different systems.
/** | ||
* Messenger that is based on an mqtt implementation | ||
*/ | ||
class MqttWorkerMessenger extends MessengerInterface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of similarities with the master mqtt messenger. Might worth investigating later how to unify things.
* Configure the Messenger for use | ||
* @param {MessageHandler} handlerContext a configured message handler | ||
*/ | ||
configure(handlerContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparing to the master's configure
method, they behave quite differently.
I'm not sure whether the logical type of the message (worker update, or some phase-related message) should matter at this point. Maybe both should just forward the decoded messages to a message handler instance, and the underlying instances would differentiate between the valid message types (like intended for workers or for master). This would make unifying the two mqtt handlers simpler.
Rather large PR this one 😂
This PR modularises the messaging between the master orchestrator and workers that interact with a SUT:
Additionally:
prepare
phase that is responsible for running the callbackinit
functionIntegration tests modified to test Fabric using mqtt communications, other SUT tests default to the process based communications. (fabric log level reduced to info, as log size was being exceeded in builds)
Signed-off-by: [email protected] [email protected]