Skip to content

Commit

Permalink
Add TxObserver for Prometheus manager
Browse files Browse the repository at this point in the history
Signed-off-by: CaptainIRS <[email protected]>
  • Loading branch information
CaptainIRS committed Sep 21, 2022
1 parent 64f3868 commit 96c71c9
Show file tree
Hide file tree
Showing 3 changed files with 348 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const Message = require('./message');
const MessageTypes = require('./../utils/constants').Messages.Types;

/**
* Class for the "prometheusUpdate" message.
*/
class PrometheusUpdateMessage extends Message {
/**
* Constructor for a "prometheusUpdate" message instance.
* @param {string} sender The sender of the message.
* @param {string[]} recipients The recipients of the message.
* @param {object} content The content of the message.
* @param {string} date The date string of the message.
* @param {string} error The potential error associated with the message.
*/
constructor(sender, recipients, content, date = undefined, error = undefined) {
super(sender, recipients, MessageTypes.PrometheusUpdate, content, date, error);
}
}

module.exports = PrometheusUpdateMessage;
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const TxObserverInterface = require('./tx-observer-interface');

const PrometheusUpdateMessage = require('../../common/messages/prometheusUpdateMessage');

/**
* Prometheus Manager TX observer used to send updates to the Prometheus scrape server in the manager.
*/
class PrometheusManagerTxObserver extends TxObserverInterface {
/**
* Initializes the instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance. Not used.
* @param {number} workerIndex The 0-based index of the worker node.
* @param {string} managerUuid The UUID of the manager messenger.
*/
constructor(options, messenger, workerIndex, managerUuid) {
super(messenger, workerIndex);

this.managerUuid = managerUuid;
}
/**
* Called when TXs are submitted.
* @param {number} count The number of submitted TXs. Can be greater than one for a batch of TXs.
*/
txSubmitted(count) {
const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txSubmitted',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
count: count
});
this.messenger.send(message);
}

/**
* Called when TXs are finished.
* @param {TxStatus | TxStatus[]} results The result information of the finished TXs. Can be a collection of results for a batch of TXs.
*/
txFinished(results) {
if (Array.isArray(results)) {
for (const result of results) {
// pass/fail status from result.GetStatus()
const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txFinished',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
status: result.GetStatus(),
latency: result.GetTimeFinal() - result.GetTimeCreate()
});
this.messenger.send(message);
}
} else {
// pass/fail status from result.GetStatus()
const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txFinished',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
status: results.GetStatus(),
latency: (results.GetTimeFinal() - results.GetTimeCreate())/1000
});
this.messenger.send(message);
}
}
}

/**
* Factory function for creating a PrometheusManagerTxObserver instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance.
* @param {number} workerIndex The 0-based index of the worker node.
* @param {string} managerUuid The UUID of the manager messenger.
* @return {TxObserverInterface} The observer instance.
*/
function createTxObserver(options, messenger, workerIndex, managerUuid) {
return new PrometheusManagerTxObserver(options, messenger, workerIndex, managerUuid);
}

module.exports.createTxObserver = createTxObserver;
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const mockery = require('mockery');
const sinon = require('sinon');

/**
* simulate Util
*/
class Utils {
/**
*
* @param {*} path path
* @return {string} the fake path
*/
static resolvePath(path) {
return 'fake/path';
}

/**
*
* @return {boolean} the fake path
*/
static isForkedProcess() {
return false;
}

/**
*
* @param {*} yaml res
* @return {string} the fake yaml
*/
static parseYaml(yaml) {
return 'yaml';
}

/**
* @returns {*} logger stub
*/
static getLogger() {
return {
debug: sinon.stub(),
error: sinon.stub()
};
}

/**
* @param {*} url url
* @returns {*} url
*/
static augmentUrlWithBasicAuth(url) {
return url;
}
}

mockery.enable({
warnOnReplace: false,
warnOnUnregistered: false
});
mockery.registerMock('../../common/utils/caliper-utils', Utils);


describe('When using a PrometheusManagerTxObserver', () => {

// Require here to enable mocks to be established
const PrometheusManagerTxObserver = require('../../../lib/worker/tx-observers/prometheus-manager-tx-observer');

after(()=> {
mockery.deregisterAll();
mockery.disable();
});

it('should set managerUuid passed through constructor', () => {
const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, undefined, undefined, 'fakeUuid');
observer.managerUuid.should.equal('fakeUuid');
});


it('should send update message when TXs are submitted', () => {
const senderUuid = 'senderUuid';
const messenger = {
getUUID: sinon.stub().returns(senderUuid),
send: sinon.spy()
};
const workerIndex = 0;
const roundIndex = 1;
const roundLabel = 'roundLabel';
const managerUuid = 'managerUuid';
const txCount = 1;

const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid);
observer.messenger = messenger;
observer.currentRound = roundIndex;
observer.roundLabel = roundLabel;

observer.txSubmitted(txCount);

sinon.assert.calledWith(messenger.send, sinon.match({
content: sinon.match({
event: 'txSubmitted',
roundIndex: roundIndex,
roundLabel: roundLabel,
count: txCount,
}),
sender: senderUuid,
recipients: sinon.match.array.contains([managerUuid])
}));
});

it('should send update message when single TX is finished', () => {
const senderUuid = 'senderUuid';
const messenger = {
getUUID: sinon.stub().returns(senderUuid),
send: sinon.spy()
};
const workerIndex = 0;
const roundIndex = 1;
const roundLabel = 'roundLabel';
const managerUuid = 'managerUuid';
const timeFinal = 1000;
const timeCreate = 0;

const result = {
GetStatus: sinon.stub().returns('success'),
GetTimeFinal: sinon.stub().returns(timeFinal),
GetTimeCreate: sinon.stub().returns(timeCreate),
};

const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid);
observer.messenger = messenger;
observer.currentRound = roundIndex;
observer.roundLabel = roundLabel;

observer.txFinished(result);

sinon.assert.calledWith(messenger.send, sinon.match({
content: sinon.match({
event: 'txFinished',
roundIndex: roundIndex,
roundLabel: roundLabel,
status: 'success',
latency: (timeFinal - timeCreate) / 1000,
}),
sender: senderUuid,
recipients: sinon.match.array.contains([managerUuid])
}));
});

it('should send update message when multiple TXs are finished', () => {
const senderUuid = 'senderUuid';
const messenger = {
getUUID: sinon.stub().returns(senderUuid),
send: sinon.spy()
};
const workerIndex = 0;
const roundIndex = 1;
const roundLabel = 'roundLabel';
const managerUuid = 'managerUuid';
const timeFinal = 1000;
const timeCreate = 0;

const result = {
GetStatus: sinon.stub().returns('success'),
GetTimeFinal: sinon.stub().returns(timeFinal),
GetTimeCreate: sinon.stub().returns(timeCreate),
};

const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid);
observer.messenger = messenger;
observer.currentRound = roundIndex;
observer.roundLabel = roundLabel;

observer.txFinished([result, result]);

sinon.assert.calledWith(messenger.send, sinon.match({
content: sinon.match({
event: 'txFinished',
roundIndex: roundIndex,
roundLabel: roundLabel,
status: 'success',
latency: (timeFinal - timeCreate),
}),
sender: senderUuid,
recipients: sinon.match.array.contains([managerUuid])
}));
sinon.assert.calledWith(messenger.send, sinon.match({
content: sinon.match({
event: 'txFinished',
roundIndex: roundIndex,
roundLabel: roundLabel,
status: 'success',
latency: (timeFinal - timeCreate),
}),
sender: senderUuid,
recipients: sinon.match.array.contains([managerUuid])
}));
});
});

0 comments on commit 96c71c9

Please sign in to comment.