Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TxObserver for Prometheus manager #1448

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/caliper-core/lib/common/config/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const keys = {
},
PrometheusPush: {
Interval: 'caliper-observer-prometheuspush-interval'
},
PrometheusManager: {
Method: 'caliper-observer-prometheusmanager-method',
Interval: 'caliper-observer-prometheusmanager-interval',
CollationCount: 'caliper-observer-prometheusmanager-collationcount'
}
},
Workspace: 'caliper-workspace',
Expand Down
7 changes: 7 additions & 0 deletions packages/caliper-core/lib/common/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ caliper:
prometheus:
# Default scrape port for prometheus tx observer
scrapeport: 3000
prometheusmanager:
# Update method
method: periodic
# Default update interval for the periodic update method
interval: 1000
# Collation count for the collate update method
collationcount: 10
# Configurations related to the logging mechanism
logging:
# Specifies the message structure through placeholders
Expand Down
37 changes: 37 additions & 0 deletions packages/caliper-core/lib/common/messages/workerMetricsMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const Message = require('./message');
const MessageTypes = require('../utils/constants').Messages.Types;

/**
* Class for the "workerMetricsMessage" message.
*/
class WorkerMetricsMessage extends Message {
/**
* Constructor for a "workerMetricsMessage" message instance.
* @param {string} sender The sender of the message.
* @param {string[]} recipients The recipients of the message.
* @param {object} content The content of the message.
* @param {string} date The date string of the message.
* @param {string} error The potential error associated with the message.
*/
constructor(sender, recipients, content, date = undefined, error = undefined) {
super(sender, recipients, MessageTypes.WorkerMetricsMessage, content, date, error);
}
}

module.exports = WorkerMetricsMessage;
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't prometheus specific at this point so I think we could just call this ManagerTxObserver and the file manager-tx-observer,js

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have named it this way because it would be inline with the transaction monitor module in the config file. In the existing codebase module: prometheus-push is bound to a PrometheusPushTxObserver and module: prometheus is bound to a PrometheusTxObserver, etc., so PrometheusManagerTxObserver would be the equivalent for the planned module: prometheus-manager config property: https://github.com/hyperledger/caliper/blob/4aa36ea91e21966d4aac65d04a36720ee93ca36d/packages/caliper-core/lib/worker/tx-observers/tx-observer-dispatch.js#L21-L25

Should I perform the refactor to ManagerTxObserver?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed we, lets leave the names as they currently are

* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const TxObserverInterface = require('./tx-observer-interface');
const WorkerMetricsMessage = require('../../common/messages/workerMetricsMessage');
const CaliperUtils = require('../../common/utils/caliper-utils');
const ConfigUtil = require('../../common/config/config-util');

const Logger = CaliperUtils.getLogger('prometheus-tx-observer');

/**
* Prometheus Manager TX observer used to send updates to the Prometheus scrape server in the manager.
*/
class PrometheusManagerTxObserver extends TxObserverInterface {
/**
* Initializes the instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance. Not used.
* @param {number} workerIndex The 0-based index of the worker node.
* @param {string} managerUuid The UUID of the manager messenger.
*/
constructor(options, messenger, workerIndex, managerUuid) {
super(messenger, workerIndex);

this.method = (options && options.method) ? options.method : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Method);

switch (this.method) {
case 'periodic': {
this.updateInterval = (options && options.interval) ? options.interval : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Interval);
this.intervalObject = undefined;
if (this.updateInterval <= 0) {
this.updateInterval = ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Interval);
Logger.warn(`Invalid update interval specified, using default value of ${this.updateInterval}`);
}
if (options && options.collationCount) {
Logger.warn('Collation count is ignored when using periodic method');
}
break;
}

case 'collate' : {
this.collationCount = (options && options.collationCount) ? options.collationCount : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.CollationCount);
if (this.collationCount <= 0) {
this.collationCount = ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.CollationCount);
Logger.warn(`Invalid collation count specified, using default value of ${this.collationCount}`);
}
if (options && options.interval) {
Logger.warn('Update interval is ignored when using collate method');
}
break;
}

default: {
const msg = `Unrecognised method '${this.method}' specified for prometheus manager, must be either 'collate' or 'periodic' `;
Logger.error(msg);
throw new Error(msg);
}

}

this.pendingMessages = [];
this.managerUuid = managerUuid;
}
/**
* Called when TXs are submitted.
* @param {number} count The number of submitted TXs. Can be greater than one for a batch of TXs.
*/
txSubmitted(count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we send messages for every submit and finished txn, this could result in a large number of messages being sent on high tps runs. This may be fine but it's also not tuneable if there is an issue. It would be good to allow for periodic sending as well with the workers collating the information so that we can do bulk sending.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few doubts regarding how this could be implemented.

I'm thinking of the following approach:

  1. For the periodic-sending feature: A timer is started during the instantiation of the observer, which would regularly send the messages present in an array that keeps track of the pending messages.
  2. For the collation feature: The max collation size is obtained in the config and an array is maintained with pending messages. When a new message arrives and the collation size exceeds on appending the message, the pending messages are sent in bulk.

However I'm not sure about how we could handle the sending of messages during termination of Caliper after the benchmark is successful. In both these features, there might be some loss of data as:

  1. In case of periodic sending, the timer for the last batch of messages might not be sent as the timer might trigger after the scrape server is shut down during the termination of Caliper.
  2. In case of collation, the last batch of messages might have a count less than the size defined in the config, in which case, it would not be sent when Caliper terminates.

I would like to know if the approaches would be feasible and if there is any way we could handle these cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed the deactivate method should help in handling the termination of a worker case

const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txSubmitted',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
count: count
});
this._appendMessage(message);
}

/**
* Called when TXs are finished.
* @param {TxStatus | TxStatus[]} results The result information of the finished TXs. Can be a collection of results for a batch of TXs.
*/
txFinished(results) {
if (Array.isArray(results)) {
for (const result of results) {
// pass/fail status from result.GetStatus()
const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txFinished',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
status: result.GetStatus(),
latency: (result.GetTimeFinal() - result.GetTimeCreate()) / 1000
});
this._appendMessage(message);
}
} else {
// pass/fail status from result.GetStatus()
const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], {
event: 'txFinished',
workerIndex: this.workerIndex,
roundIndex: this.currentRound,
roundLabel: this.roundLabel,
status: results.GetStatus(),
latency: (results.GetTimeFinal() - results.GetTimeCreate()) / 1000
});
this._appendMessage(message);
}
}

/**
* Adds message to the pending message queue
* @param {object} message Pending message
* @private
*/
async _appendMessage(message) {
this.pendingMessages.push(message);

if (this.method === 'collate' && this.pendingMessages.length === this.collationCount) {
this._sendUpdate();
}
}

/**
* Sends the current aggregated statistics to the manager node when triggered by "setInterval".
* @private
*/
_sendUpdate() {
for (const message of this.pendingMessages) {
this.messenger.send(message);
}
this.pendingMessages = [];
}

/**
* Activates the TX observer instance and starts the regular update scheduling.
* @param {number} roundIndex The 0-based index of the current round.
* @param {string} roundLabel The roundLabel name.
*/
async activate(roundIndex, roundLabel) {
await super.activate(roundIndex, roundLabel);

if (this.method === 'periodic') {
this.intervalObject = setInterval(async () => { this._sendUpdate(); }, this.updateInterval);
}
}

/**
* Deactivates the TX observer interface, and stops the regular update scheduling.
*/
async deactivate() {
await super.deactivate();

if (this.intervalObject) {
clearInterval(this.intervalObject);
this.intervalObject = undefined;
}
await this._sendUpdate();
}
}

/**
* Factory function for creating a PrometheusManagerTxObserver instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance.
* @param {number} workerIndex The 0-based index of the worker node.
* @param {string} managerUuid The UUID of the manager messenger.
* @return {TxObserverInterface} The observer instance.
*/
function createTxObserver(options, messenger, workerIndex, managerUuid) {
return new PrometheusManagerTxObserver(options, messenger, workerIndex, managerUuid);
}

module.exports.createTxObserver = createTxObserver;
Loading