Skip to content

Commit

Permalink
Merge branch 'feature/BB-309-add-oplog-populator-metrics' into q/8.4
Browse files Browse the repository at this point in the history
  • Loading branch information
bert-e committed Jan 6, 2023
2 parents 6f95321 + 2621a81 commit 5290012
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 94 deletions.
51 changes: 48 additions & 3 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ const { constructConnectionString } = require('../utils/MongoUtils');
const ChangeStream = require('../../lib/wrappers/ChangeStream');
const Allocator = require('./modules/Allocator');
const ConnectorsManager = require('./modules/ConnectorsManager');
const { ZenkoMetrics } = require('arsenal').metrics;
const OplogPopulatorMetrics = require('./OplogPopulatorMetrics');

const paramsJoi = joi.object({
config: joi.object().required(),
mongoConfig: joi.object().required(),
activeExtensions: joi.array().required(),
logger: joi.object().required(),
enableMetrics: joi.boolean().default(true),
}).required();

/**
Expand All @@ -37,7 +40,8 @@ class OplogPopulator {
* @param {Object} params.logger - logger
*/
constructor(params) {
joi.attempt(params, paramsJoi);
const validatedParams = joi.attempt(params, paramsJoi);
Object.assign(params, validatedParams);
this._config = params.config;
this._mongoConfig = params.mongoConfig;
this._activeExtensions = params.activeExtensions;
Expand All @@ -54,6 +58,11 @@ class OplogPopulator {
this._mongoUrl = constructConnectionString(this._mongoConfig);
this._replicaSet = this._mongoConfig.replicaSet;
this._database = this._mongoConfig.database;
// initialize metrics
this._metricsHandler = new OplogPopulatorMetrics(this._logger);
if (params.enableMetrics) {
this._metricsHandler.registerMetrics();
}
}

/**
Expand All @@ -67,6 +76,7 @@ class OplogPopulator {
const client = await MongoClient.connect(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
});
// connect to metadata DB
this._mongoClient = client.db(this._database, {
Expand Down Expand Up @@ -181,6 +191,8 @@ class OplogPopulator {
});
break;
}
const delta = (Date.now() - new Date(change.clusterTime).getTime()) / 1000;
this._metricsHandler.onOplogEventProcessed(change.operationType, delta);
this._logger.info('Change stream event processed', {
method: 'OplogPopulator._handleChangeStreamChange',
type: change.operationType,
Expand All @@ -200,7 +212,16 @@ class OplogPopulator {
'_id': 1,
'operationType': 1,
'documentKey._id': 1,
'fullDocument.value': 1
'fullDocument.value': 1,
// transforming the BSON timestamp
// into a usable date
'clusterTime': {
$toDate: {
$dateToString: {
date: '$clusterTime'
}
}
},
},
},
];
Expand Down Expand Up @@ -232,11 +253,13 @@ class OplogPopulator {
prefix: this._config.prefix,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
await this._connectorsManager.initializeConnectors();
this._allocator = new Allocator({
connectorsManager: this._connectorsManager,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
// initialize mongo client
Expand All @@ -248,7 +271,14 @@ class OplogPopulator {
// establish change stream
this._setMetastoreChangeStream();
// remove no longer valid buckets from old connectors
await this._connectorsManager.removeInvalidBuckets(validBuckets);
const oldConnectorBuckets = this._connectorsManager.oldConnectors
.map(connector => connector.buckets)
.flat();
const invalidBuckets = oldConnectorBuckets.filter(bucket => !validBuckets.includes(bucket));
await Promise.all(invalidBuckets.map(bucket => this._allocator.stopListeningToBucket(bucket)));
this._logger.info('Successfully removed invalid buckets from old connectors', {
method: 'ConnectorsManager.removeInvalidBuckets',
});
// start scheduler for updating connectors
this._connectorsManager.scheduleConnectorUpdates();
this._logger.info('OplogPopulator setup complete', {
Expand Down Expand Up @@ -282,6 +312,21 @@ class OplogPopulator {
}
return allReady;
}

/**
* Handle ProbeServer metrics
*
* @param {http.HTTPServerResponse} res - HTTP Response to respond with
* @param {Logger} log - Logger
* @returns {undefined}
*/
handleMetrics(res, log) {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
});
res.end(ZenkoMetrics.asPrometheus());
}
}

module.exports = OplogPopulator;
165 changes: 165 additions & 0 deletions extensions/oplogPopulator/OplogPopulatorMetrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
const { ZenkoMetrics } = require('arsenal').metrics;

const { getStringSizeInBytes } = require('../../lib/util/buffer');

class OplogPopulatorMetrics {
/**
* @param {Logger} logger logger instance
*/
constructor(logger) {
this.acknowledgementLag = null;
this.connectorConfiguration = null;
this.requestSize = null;
this.connectors = null;
this.reconfigurationLag = null;
this.connectorConfigurationApplied = null;
this._logger = logger;
}

registerMetrics() {
this.acknowledgementLag = ZenkoMetrics.createHistogram({
name: 'oplog_populator_acknowledgement_lag_sec',
help: 'Delay between a config change in mongo and the start of processing by the oplogPopulator in seconds',
labelNames: ['opType'],
buckets: [0.001, 0.01, 1, 10, 100, 1000, 10000],
});
this.connectorConfiguration = ZenkoMetrics.createCounter({
name: 'oplog_populator_connector_configuration',
help: 'Number of times we update the configuration of a connector',
labelNames: ['connector', 'opType'],
});
this.buckets = ZenkoMetrics.createGauge({
name: 'oplog_populator_connector_buckets',
help: 'Number of buckets per connector',
labelNames: ['connector'],
});
this.requestSize = ZenkoMetrics.createCounter({
name: 'oplog_populator_connector_request_bytes',
help: 'Size of kafka connect request in bytes',
labelNames: ['connector'],
});
this.mongoPipelineSize = ZenkoMetrics.createGauge({
name: 'oplog_populator_connector_pipeline_bytes',
help: 'Size of mongo pipeline in bytes',
labelNames: ['connector'],
});
this.connectors = ZenkoMetrics.createGauge({
name: 'oplog_populator_connectors',
help: 'Total number of configured connectors',
labelNames: ['isOld'],
});
this.reconfigurationLag = ZenkoMetrics.createHistogram({
name: 'oplog_populator_reconfiguration_lag_sec',
help: 'Time it takes kafka-connect to respond to a connector configuration request',
labelNames: ['connector'],
buckets: [0.001, 0.01, 1, 10, 100, 1000, 10000],
});
this.connectorConfigurationApplied = ZenkoMetrics.createCounter({
name: 'oplog_populator_connector_configuration_applied',
help: 'Number of times we submit the configuration of a connector to kafka-connect',
labelNames: ['connector', 'success'],
});
}

/**
* updates oplog_populator_acknowledgement_lag_sec metric
* @param {string} opType oplog operation type
* @param {number} delta delay between a config change
* in mongo and it getting processed by the oplogPopulator
* @returns {undefined}
*/
onOplogEventProcessed(opType, delta) {
try {
this.acknowledgementLag.observe({
opType,
}, delta);
} catch (error) {
this._logger.error('An error occured while pushing metric', {
method: 'OplogPopulatorMetrics.onOplogEventProcessed',
error: error.message,
});
}
}

/**
* updates oplog_populator_connector_configuration &
* oplog_populator_connector_request_bytes metrics
* @param {Connector} connector connector instance
* @param {string} opType operation type, could be one of
* "add" and "delete"
* @param {number} buckets number of buckets updated
* @returns {undefined}
*/
onConnectorConfigured(connector, opType, buckets = 1) {
try {
this.connectorConfiguration.inc({
connector: connector.name,
opType,
}, buckets);
const reqSize = getStringSizeInBytes(JSON.stringify(connector.config));
this.requestSize.inc({
connector: connector.name,
}, reqSize);
const pipelineSize = getStringSizeInBytes(JSON.stringify(connector.config.pipeline));
this.mongoPipelineSize.set({
connector: connector.name,
}, pipelineSize);
} catch (error) {
this._logger.error('An error occured while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorConfigured',
error: error.message,
});
}
}

/**
* updates oplog_populator_connectors metric
* @param {boolean} isOld true if connectors were not
* created by this OplogPopulator instance
* @param {number} count number of connectors added
* @returns {undefined}
*/
onConnectorsInstantiated(isOld, count = 1) {
try {
this.connectors.inc({
isOld,
}, count);
} catch (error) {
this._logger.error('An error occured while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorsInstantiated',
error: error.message,
});
}
}

/**
* updates oplog_populator_reconfiguration_lag_sec metric
* @param {Connector} connector connector instance
* @param {Boolean} success true if reconfiguration was successful
* @param {number} delta time it takes to reconfigure a connector
* @returns {undefined}
*/
onConnectorReconfiguration(connector, success, delta = null) {
try {
this.connectorConfigurationApplied.inc({
connector: connector.name,
success,
});
if (success) {
this.reconfigurationLag.observe({
connector: connector.name,
}, delta);
this.buckets.set({
connector: connector.name,
}, connector.bucketCount);
}
} catch (error) {
this._logger.error('An error occured while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorReconfiguration',
error: error.message,
});
}
}
}

module.exports = OplogPopulatorMetrics;
6 changes: 5 additions & 1 deletion extensions/oplogPopulator/OplogPopulatorTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const OplogPopulator = require('./OplogPopulator');
const {
DEFAULT_LIVE_ROUTE,
DEFAULT_READY_ROUTE,
DEFAULT_METRICS_ROUTE,
} = require('arsenal').network.probe.ProbeServer;
const { sendSuccess, sendError } = require('arsenal').network.probe.Utils;
const { startProbeServerPromise } = require('../../lib/util/probe');
Expand Down Expand Up @@ -40,7 +41,7 @@ const oplogPopulator = new OplogPopulator({
if (oplogPopulator.isReady()) {
sendSuccess(res, log);
} else {
log.error('Notification Queue Processor is not ready');
log.error('OplogPopulator is not ready');
sendError(res, log, errors.ServiceUnavailable, 'unhealthy');
}
}
Expand All @@ -53,6 +54,9 @@ const oplogPopulator = new OplogPopulator({
// following the same pattern as other extensions, where liveness
// and readiness are handled by the same handler
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE], handleLiveness);
probeServer.addHandler(DEFAULT_METRICS_ROUTE,
(res, log) => oplogPopulator.handleMetrics(res, log)
);
}
} catch (error) {
logger.error('Error when starting up the oplog populator', {
Expand Down
12 changes: 11 additions & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
const joi = require('joi');
const { errors } = require('arsenal');

const OplogPopulatorMetrics = require('../OplogPopulatorMetrics');
const LeastFullConnector = require('../allocationStrategy/LeastFullConnector');

const paramsJoi = joi.object({
connectorsManager: joi.object().required(),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
}).required();

Expand All @@ -28,6 +32,7 @@ class Allocator {
this._allocationStrategy = new LeastFullConnector({
logger: params.logger,
});
this._metricsHandler = params.metricsHandler;
// Stores connector assigned for each bucket
this._bucketsToConnectors = new Map();
this._initConnectorToBucketMap();
Expand All @@ -41,7 +46,10 @@ class Allocator {
const connectors = this._connectorsManager.connectors;
connectors.forEach(connector => {
connector.buckets
.forEach(bucket => this._bucketsToConnectors.set(bucket, connector));
.forEach(bucket => {
this._bucketsToConnectors.set(bucket, connector);
this._metricsHandler.onConnectorConfigured(connector, 'add');
});
});
}

Expand Down Expand Up @@ -69,6 +77,7 @@ class Allocator {
const connector = this._allocationStrategy.getConnector(connectors);
await connector.addBucket(bucket);
this._bucketsToConnectors.set(bucket, connector);
this._metricsHandler.onConnectorConfigured(connector, 'add');
this._logger.info('Started listening to bucket', {
method: 'Allocator.listenToBucket',
bucket,
Expand Down Expand Up @@ -99,6 +108,7 @@ class Allocator {
if (connector) {
await connector.removeBucket(bucket);
this._bucketsToConnectors.delete(bucket);
this._metricsHandler.onConnectorConfigured(connector, 'delete');
this._logger.info('Stopped listening to bucket', {
method: 'Allocator.listenToBucket',
bucket,
Expand Down
25 changes: 25 additions & 0 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,31 @@ class Connector {
*/
get bucketCount() { return this._buckets.size; }

/**
* Get connector config
* @returns {Object} connector config
*/
get config() { return this._config; }

/**
* Calculate config size in bytes
* @returns {number} config size
*/
getConfigSizeInBytes() {
try {
const configSize = Buffer.byteLength(JSON.stringify(this._config));
return configSize;
} catch (err) {
this._logger.error('Error while calculating config size', {
method: 'Connector.getConfigSizeInBytes',
connector: this._name,
config: this._config,
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
}

/**
* Creates the Kafka-connect mongo connector
* @returns {Promise|undefined} undefined
Expand Down
Loading

0 comments on commit 5290012

Please sign in to comment.