From 96c71c9cb86f58f8c7c7e66917349eb2dd606400 Mon Sep 17 00:00:00 2001 From: CaptainIRS <36656347+CaptainIRS@users.noreply.github.com> Date: Wed, 21 Sep 2022 11:06:29 +0530 Subject: [PATCH] Add TxObserver for Prometheus manager Signed-off-by: CaptainIRS <36656347+CaptainIRS@users.noreply.github.com> --- .../messages/prometheusUpdateMessage.js | 37 +++ .../prometheus-manager-tx-observer.js | 97 ++++++++ .../prometheus-manager-tx-observer.js | 214 ++++++++++++++++++ 3 files changed, 348 insertions(+) create mode 100644 packages/caliper-core/lib/common/messages/prometheusUpdateMessage.js create mode 100644 packages/caliper-core/lib/worker/tx-observers/prometheus-manager-tx-observer.js create mode 100644 packages/caliper-core/test/worker/tx-observers/prometheus-manager-tx-observer.js diff --git a/packages/caliper-core/lib/common/messages/prometheusUpdateMessage.js b/packages/caliper-core/lib/common/messages/prometheusUpdateMessage.js new file mode 100644 index 000000000..b11513370 --- /dev/null +++ b/packages/caliper-core/lib/common/messages/prometheusUpdateMessage.js @@ -0,0 +1,37 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const Message = require('./message'); +const MessageTypes = require('./../utils/constants').Messages.Types; + +/** + * Class for the "prometheusUpdate" message. + */ +class PrometheusUpdateMessage extends Message { + /** + * Constructor for a "prometheusUpdate" message instance. + * @param {string} sender The sender of the message. + * @param {string[]} recipients The recipients of the message. + * @param {object} content The content of the message. + * @param {string} date The date string of the message. + * @param {string} error The potential error associated with the message. + */ + constructor(sender, recipients, content, date = undefined, error = undefined) { + super(sender, recipients, MessageTypes.PrometheusUpdate, content, date, error); + } +} + +module.exports = PrometheusUpdateMessage; diff --git a/packages/caliper-core/lib/worker/tx-observers/prometheus-manager-tx-observer.js b/packages/caliper-core/lib/worker/tx-observers/prometheus-manager-tx-observer.js new file mode 100644 index 000000000..46ba85b4e --- /dev/null +++ b/packages/caliper-core/lib/worker/tx-observers/prometheus-manager-tx-observer.js @@ -0,0 +1,97 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const TxObserverInterface = require('./tx-observer-interface'); + +const PrometheusUpdateMessage = require('../../common/messages/prometheusUpdateMessage'); + +/** + * Prometheus Manager TX observer used to send updates to the Prometheus scrape server in the manager. + */ +class PrometheusManagerTxObserver extends TxObserverInterface { + /** + * Initializes the instance. + * @param {object} options The observer configuration object. + * @param {MessengerInterface} messenger The worker messenger instance. Not used. + * @param {number} workerIndex The 0-based index of the worker node. + * @param {string} managerUuid The UUID of the manager messenger. + */ + constructor(options, messenger, workerIndex, managerUuid) { + super(messenger, workerIndex); + + this.managerUuid = managerUuid; + } + /** + * Called when TXs are submitted. + * @param {number} count The number of submitted TXs. Can be greater than one for a batch of TXs. + */ + txSubmitted(count) { + const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], { + event: 'txSubmitted', + workerIndex: this.workerIndex, + roundIndex: this.currentRound, + roundLabel: this.roundLabel, + count: count + }); + this.messenger.send(message); + } + + /** + * Called when TXs are finished. + * @param {TxStatus | TxStatus[]} results The result information of the finished TXs. Can be a collection of results for a batch of TXs. + */ + txFinished(results) { + if (Array.isArray(results)) { + for (const result of results) { + // pass/fail status from result.GetStatus() + const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], { + event: 'txFinished', + workerIndex: this.workerIndex, + roundIndex: this.currentRound, + roundLabel: this.roundLabel, + status: result.GetStatus(), + latency: result.GetTimeFinal() - result.GetTimeCreate() + }); + this.messenger.send(message); + } + } else { + // pass/fail status from result.GetStatus() + const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], { + event: 'txFinished', + workerIndex: this.workerIndex, + roundIndex: this.currentRound, + roundLabel: this.roundLabel, + status: results.GetStatus(), + latency: (results.GetTimeFinal() - results.GetTimeCreate())/1000 + }); + this.messenger.send(message); + } + } +} + +/** + * Factory function for creating a PrometheusManagerTxObserver instance. + * @param {object} options The observer configuration object. + * @param {MessengerInterface} messenger The worker messenger instance. + * @param {number} workerIndex The 0-based index of the worker node. + * @param {string} managerUuid The UUID of the manager messenger. + * @return {TxObserverInterface} The observer instance. + */ +function createTxObserver(options, messenger, workerIndex, managerUuid) { + return new PrometheusManagerTxObserver(options, messenger, workerIndex, managerUuid); +} + +module.exports.createTxObserver = createTxObserver; diff --git a/packages/caliper-core/test/worker/tx-observers/prometheus-manager-tx-observer.js b/packages/caliper-core/test/worker/tx-observers/prometheus-manager-tx-observer.js new file mode 100644 index 000000000..962ac4bda --- /dev/null +++ b/packages/caliper-core/test/worker/tx-observers/prometheus-manager-tx-observer.js @@ -0,0 +1,214 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const chai = require('chai'); +const chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); +const mockery = require('mockery'); +const sinon = require('sinon'); + +/** + * simulate Util + */ +class Utils { + /** + * + * @param {*} path path + * @return {string} the fake path + */ + static resolvePath(path) { + return 'fake/path'; + } + + /** + * + * @return {boolean} the fake path + */ + static isForkedProcess() { + return false; + } + + /** + * + * @param {*} yaml res + * @return {string} the fake yaml + */ + static parseYaml(yaml) { + return 'yaml'; + } + + /** + * @returns {*} logger stub + */ + static getLogger() { + return { + debug: sinon.stub(), + error: sinon.stub() + }; + } + + /** + * @param {*} url url + * @returns {*} url + */ + static augmentUrlWithBasicAuth(url) { + return url; + } +} + +mockery.enable({ + warnOnReplace: false, + warnOnUnregistered: false +}); +mockery.registerMock('../../common/utils/caliper-utils', Utils); + + +describe('When using a PrometheusManagerTxObserver', () => { + + // Require here to enable mocks to be established + const PrometheusManagerTxObserver = require('../../../lib/worker/tx-observers/prometheus-manager-tx-observer'); + + after(()=> { + mockery.deregisterAll(); + mockery.disable(); + }); + + it('should set managerUuid passed through constructor', () => { + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, undefined, undefined, 'fakeUuid'); + observer.managerUuid.should.equal('fakeUuid'); + }); + + + it('should send update message when TXs are submitted', () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.spy() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const txCount = 1; + + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + observer.txSubmitted(txCount); + + sinon.assert.calledWith(messenger.send, sinon.match({ + content: sinon.match({ + event: 'txSubmitted', + roundIndex: roundIndex, + roundLabel: roundLabel, + count: txCount, + }), + sender: senderUuid, + recipients: sinon.match.array.contains([managerUuid]) + })); + }); + + it('should send update message when single TX is finished', () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.spy() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + observer.txFinished(result); + + sinon.assert.calledWith(messenger.send, sinon.match({ + content: sinon.match({ + event: 'txFinished', + roundIndex: roundIndex, + roundLabel: roundLabel, + status: 'success', + latency: (timeFinal - timeCreate) / 1000, + }), + sender: senderUuid, + recipients: sinon.match.array.contains([managerUuid]) + })); + }); + + it('should send update message when multiple TXs are finished', () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.spy() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + observer.txFinished([result, result]); + + sinon.assert.calledWith(messenger.send, sinon.match({ + content: sinon.match({ + event: 'txFinished', + roundIndex: roundIndex, + roundLabel: roundLabel, + status: 'success', + latency: (timeFinal - timeCreate), + }), + sender: senderUuid, + recipients: sinon.match.array.contains([managerUuid]) + })); + sinon.assert.calledWith(messenger.send, sinon.match({ + content: sinon.match({ + event: 'txFinished', + roundIndex: roundIndex, + roundLabel: roundLabel, + status: 'success', + latency: (timeFinal - timeCreate), + }), + sender: senderUuid, + recipients: sinon.match.array.contains([managerUuid]) + })); + }); +});