Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
* Rename prometheusUpdateMessage to workerMetricsMessage
* Fix doc comment in test
* Add collate and periodic update methods
* Add logs for config property errors
* Fix histogram calculation

Signed-off-by: CaptainIRS <[email protected]>
  • Loading branch information
CaptainIRS committed Oct 10, 2022
1 parent 96c71c9 commit b6506f6
Show file tree
Hide file tree
Showing 5 changed files with 570 additions and 70 deletions.
5 changes: 5 additions & 0 deletions packages/caliper-core/lib/common/config/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const keys = {
},
PrometheusPush: {
Interval: 'caliper-observer-prometheuspush-interval'
},
PrometheusManager: {
Method: 'caliper-observer-prometheusmanager-method',
Interval: 'caliper-observer-prometheusmanager-interval',
CollationCount: 'caliper-observer-prometheusmanager-collationcount'
}
},
Workspace: 'caliper-workspace',
Expand Down
7 changes: 7 additions & 0 deletions packages/caliper-core/lib/common/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ caliper:
prometheus:
# Default scrape port for prometheus tx observer
scrapeport: 3000
prometheusmanager:
# Update method
method: periodic
# Default update interval for the periodic update method
interval: 1000
# Collation count for the collate update method
collationcount: 10
# Configurations related to the logging mechanism
logging:
# Specifies the message structure through placeholders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@
'use strict';

const Message = require('./message');
const MessageTypes = require('./../utils/constants').Messages.Types;
const MessageTypes = require('../utils/constants').Messages.Types;

/**
* Class for the "prometheusUpdate" message.
* Class for the "workerMetricsMessage" message.
*/
class PrometheusUpdateMessage extends Message {
class WorkerMetricsMessage extends Message {
/**
* Constructor for a "prometheusUpdate" message instance.
* Constructor for a "workerMetricsMessage" message instance.
* @param {string} sender The sender of the message.
* @param {string[]} recipients The recipients of the message.
* @param {object} content The content of the message.
* @param {string} date The date string of the message.
* @param {string} error The potential error associated with the message.
*/
constructor(sender, recipients, content, date = undefined, error = undefined) {
super(sender, recipients, MessageTypes.PrometheusUpdate, content, date, error);
super(sender, recipients, MessageTypes.WorkerMetricsMessage, content, date, error);
}
}

module.exports = PrometheusUpdateMessage;
module.exports = WorkerMetricsMessage;
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
'use strict';

const TxObserverInterface = require('./tx-observer-interface');
const WorkerMetricsMessage = require('../../common/messages/workerMetricsMessage');
const CaliperUtils = require('../../common/utils/caliper-utils');
const ConfigUtil = require('../../common/config/config-util');

const PrometheusUpdateMessage = require('../../common/messages/prometheusUpdateMessage');
const Logger = CaliperUtils.getLogger('prometheus-tx-observer');

/**
* Prometheus Manager TX observer used to send updates to the Prometheus scrape server in the manager.
Expand All @@ -32,21 +35,45 @@ class PrometheusManagerTxObserver extends TxObserverInterface {
constructor(options, messenger, workerIndex, managerUuid) {
super(messenger, workerIndex);

this.method = (options && options.method) ? options.method : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Method);
if (this.method === 'periodic') {
this.updateInterval = (options && options.interval) ? options.interval : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Interval);
this.intervalObject = undefined;
if (this.updateInterval <= 0) {
Logger.error('Invalid update interval specified, using default value');
this.updateInterval = ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Interval);
}
if (options && options.collationCount) {
Logger.warn('Collation count is ignored when using periodic method');
}
} else if (this.method === 'collate') {
this.collationCount = (options && options.collationCount) ? options.collationCount : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.CollationCount);
if (this.collationCount <= 0) {
Logger.error('Invalid collation count specified, using default value');
this.collationCount = ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.CollationCount);
}
if (options && options.interval) {
Logger.warn('Update interval is ignored when using collate method');
}
}

this.pendingMessages = [];

this.managerUuid = managerUuid;
}
/**
* Called when TXs are submitted.
* @param {number} count The number of submitted TXs. Can be greater than one for a batch of TXs.
*/
txSubmitted(count) {
const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], {
const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txSubmitted',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
count: count
});
this.messenger.send(message);
this._appendMessage(message);
}

/**
Expand All @@ -57,28 +84,78 @@ class PrometheusManagerTxObserver extends TxObserverInterface {
if (Array.isArray(results)) {
for (const result of results) {
// pass/fail status from result.GetStatus()
const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], {
const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txFinished',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
status: result.GetStatus(),
latency: result.GetTimeFinal() - result.GetTimeCreate()
latency: (result.GetTimeFinal() - result.GetTimeCreate()) / 1000
});
this.messenger.send(message);
this._appendMessage(message);
}
} else {
// pass/fail status from result.GetStatus()
const message = new PrometheusUpdateMessage(this.messenger.getUUID(), [this.managerUuid], {
const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txFinished',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
status: results.GetStatus(),
latency: (results.GetTimeFinal() - results.GetTimeCreate())/1000
latency: (results.GetTimeFinal() - results.GetTimeCreate()) / 1000
});
this._appendMessage(message);
}
}

/**
* Adds message to the pending message queue
* @param {object} message Pending message
* @private
*/
async _appendMessage(message) {
this.pendingMessages.push(message);

if (this.method === 'collate' && this.pendingMessages.length === this.collationCount) {
await this._sendUpdate();
}
}

/**
* Sends the current aggregated statistics to the manager node when triggered by "setInterval".
* @private
*/
async _sendUpdate() {
for (const message of this.pendingMessages) {
this.messenger.send(message);
}
this.pendingMessages = [];
}

/**
* Activates the TX observer instance and starts the regular update scheduling.
* @param {number} roundIndex The 0-based index of the current round.
* @param {string} roundLabel The roundLabel name.
*/
async activate(roundIndex, roundLabel) {
await super.activate(roundIndex, roundLabel);

if (this.method === 'periodic') {
this.intervalObject = setInterval(async () => { await this._sendUpdate(); }, this.updateInterval);
}
}

/**
* Deactivates the TX observer interface, and stops the regular update scheduling.
*/
async deactivate() {
await super.deactivate();

if (this.intervalObject) {
clearInterval(this.intervalObject);
this.intervalObject = undefined;
}
await this._sendUpdate();
}
}

Expand Down
Loading

0 comments on commit b6506f6

Please sign in to comment.