Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support limiting bucket number in connectors and mongodb 6 #2539

Merged
merged 15 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ jobs:
namespace=zenko
oplog_populator_job=artesca-data-backbeat-oplog-populator-headless
oplogPopulatorChangeStreamLagThreshold=10
oplogPopulatorChangeStreamLimit=10
github_token: ${{ secrets.GIT_ACCESS_TOKEN }}
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ pids

# Coverage directory used by tools like istanbul
coverage
.nyc_output

# Dependencies
node_modules

# Redis
*.rdb

# Python cache, created when refreshing the monitoring files
__pycache__
107 changes: 84 additions & 23 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const { ZenkoMetrics } = require('arsenal').metrics;
const OplogPopulatorMetrics = require('./OplogPopulatorMetrics');
const { OplogPopulatorConfigJoiSchema } = require('./OplogPopulatorConfigValidator');
const { mongoJoi } = require('../../lib/config/configItems.joi');
const ImmutableConnector = require('./allocationStrategy/ImmutableConnector');
const RetainBucketsDecorator = require('./allocationStrategy/RetainBucketsDecorator');
const LeastFullConnector = require('./allocationStrategy/LeastFullConnector');

const paramsJoi = joi.object({
config: OplogPopulatorConfigJoiSchema.required(),
Expand Down Expand Up @@ -51,7 +54,7 @@ class OplogPopulator {
this._logger = params.logger;
this._changeStreamWrapper = null;
this._allocator = null;
this._connectorsManager = null;
this._connectorsManager = null;
// contains OplogPopulatorUtils class of each supported extension
this._extHelpers = {};
// MongoDB related
Expand All @@ -78,9 +81,9 @@ class OplogPopulator {
async _setupMongoClient() {
try {
const client = await MongoClient.connect(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
});
// connect to metadata DB
this._mongoClient = client.db(this._database, {
Expand Down Expand Up @@ -242,35 +245,58 @@ class OplogPopulator {
this._changeStreamWrapper.start();
}

_arePipelinesImmutable() {
return semver.gte(this._mongoVersion, constants.mongodbVersionWithImmutablePipelines);
}

async _initializeConnectorsManager() {
return this._connectorsManager.initializeConnectors();
}

/**
* Sets the OplogPopulator
* @returns {Promise|undefined} undefined
* @throws {InternalError}
*/
async setup() {
try {
this._loadOplogHelperClasses();
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
try {
this._loadOplogHelperClasses();
// initialize mongo client
await this._setupMongoClient();
this._allocationStrategy = this.initStrategy();
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
allocationStrategy: this._allocationStrategy,
logger: this._logger,
});
await this._connectorsManager.initializeConnectors();
await this._initializeConnectorsManager();
this._allocator = new Allocator({
connectorsManager: this._connectorsManager,
metricsHandler: this._metricsHandler,
allocationStrategy: this._allocationStrategy,
logger: this._logger,
});
// initialize mongo client
await this._setupMongoClient();
// For now, we always use the RetainBucketsDecorator
// so, we map the events from the classes
this._connectorsManager.on('connector-updated', connector =>
this._allocationStrategy.onConnectorUpdatedOrDestroyed(connector));
this._allocator.on('bucket-removed', (bucket, connector) =>
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
this._allocationStrategy.onBucketRemoved(bucket, connector));
this._connectorsManager.on('connectors-reconciled', bucketsExceedingLimit => {
this._metricsHandler.onConnectorsReconciled(
bucketsExceedingLimit,
this._allocationStrategy.retainedBucketsCount,
);
});
// get currently valid buckets from mongo
const validBuckets = await this._getBackbeatEnabledBuckets();
// listen to valid buckets
Expand All @@ -291,13 +317,48 @@ class OplogPopulator {
this._logger.info('OplogPopulator setup complete', {
method: 'OplogPopulator.setup',
});
} catch (err) {
} catch (err) {
this._logger.error('An error occured when setting up the OplogPopulator', {
method: 'OplogPopulator.setup',
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
}
}

/**
* Init the allocation strategy
* @returns {RetainBucketsDecorator} extended allocation strategy
* handling retained buckets
*/
initStrategy() {
let strategy;
if (this._arePipelinesImmutable()) {
// In this case, mongodb does not support reusing a
// resume token from a different pipeline. In other
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
strategy = new ImmutableConnector({
logger: this._logger,
metricsHandler: this._metricsHandler,
connectorsManager: this._connectorsManager,
});
} else {
// In this case, we can have multiple buckets per
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
strategy = new LeastFullConnector({
logger: this._logger,
metricsHandler: this._metricsHandler,
connectorsManager: this._connectorsManager,
});
}
return new RetainBucketsDecorator(
strategy,
{ logger: this._logger },
);
}

/**
Expand Down
28 changes: 28 additions & 0 deletions extensions/oplogPopulator/OplogPopulatorMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
help: 'Total number of buckets per connector',
labelNames: ['connector'],
});
this.bucketsExceedingLimit = ZenkoMetrics.createGauge({

Check warning on line 36 in extensions/oplogPopulator/OplogPopulatorMetrics.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulatorMetrics.js#L36

Added line #L36 was not covered by tests
name: 's3_oplog_populator_connector_buckets_exceeding_limit',
williamlardier marked this conversation as resolved.
Show resolved Hide resolved
help: 'Total number of buckets exceeding the limit for all connectors',
});
this.retainedBuckets = ZenkoMetrics.createGauge({

Check warning on line 40 in extensions/oplogPopulator/OplogPopulatorMetrics.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulatorMetrics.js#L40

Added line #L40 was not covered by tests
name: 's3_oplog_populator_connector_retained_buckets',
williamlardier marked this conversation as resolved.
Show resolved Hide resolved
help: 'Current number of buckets still listened to by immutable connectors despite intended removal',
});
this.requestSize = ZenkoMetrics.createCounter({
name: 's3_oplog_populator_connector_request_bytes_total',
help: 'Total size of kafka connect request in bytes',
Expand Down Expand Up @@ -196,6 +204,26 @@
});
}
}

/**
* updates metrics when the connectors are reconciled
* @param {number} bucketsExceedingLimit number of buckets above the limit
* for all connectors
* @param {number} retainedBuckets number of buckets still listened to by
* immutable connectors despite intended removal
* @returns {undefined}
*/
onConnectorsReconciled(bucketsExceedingLimit, retainedBuckets) {
try {
this.bucketsExceedingLimit.set(bucketsExceedingLimit);
this.retainedBuckets.set(retainedBuckets);

Check warning on line 219 in extensions/oplogPopulator/OplogPopulatorMetrics.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulatorMetrics.js#L219

Added line #L219 was not covered by tests
} catch (error) {
this._logger.error('An error occured while pushing metric', {
method: 'OplogPopulatorMetrics.onConnectorsReconciled',
error: error.message,
});
}
}
}

module.exports = OplogPopulatorMetrics;
32 changes: 25 additions & 7 deletions extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,39 @@

/**
* @constructor
* @param {Logger} logger logger object
* @param {Object} params params
* @param {Logger} params.logger logger object
*/
constructor(logger) {
this._logger = logger;
constructor(params) {
this._logger = params.logger;
}

/**
* Get best connector for assigning a bucket
* @param {Connector[]} connectors available connectors
* @returns {Connector} connector
* Get best connector to assign a bucket to.
* If no connector is available, null is returned.
* @param {Array<Connector>} connectors connectors
* @param {String} bucket bucket name
* @returns {Connector | null} connector
*/
getConnector(connectors) { // eslint-disable-line no-unused-vars
getConnector(connectors, bucket) { // eslint-disable-line no-unused-vars
throw errors.NotImplemented;
}

/**
* Assess if a pipeline can be updated
* @returns {Boolean} true if the connector can be updated
*/
canUpdate() {
throw errors.NotImplemented;
}

/**
* Getter for the maximum number of buckets per connector
* @returns {Number} maximum number of buckets per connector
*/
get maximumBucketsPerConnector() {
throw errors.NotImplemented;

Check warning on line 38 in extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js#L38

Added line #L38 was not covered by tests
}
}

module.exports = AllocationStrategy;
40 changes: 40 additions & 0 deletions extensions/oplogPopulator/allocationStrategy/ImmutableConnector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const AllocationStrategy = require('./AllocationStrategy');

/**
* @class ImmutableConnector
*
* @classdesc ImmutableConnector is an allocation
* strategy that always requires a new connector to be
* created for each bucket.
*/
class ImmutableConnector extends AllocationStrategy {
/**
* Get best connector to assign a bucket to.
* If no connector is available, null is returned.
* @param {Array<Connector>} connectors connectors
* @param {String} bucket bucket name
* @returns {Connector | null} connector
*/
getConnector(connectors, bucket) { // eslint-disable-line no-unused-vars
return null;
williamlardier marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Assess if a pipeline can be updated. With the immutable
* strategy, a connector cannot be updated.
* @returns {false} false
*/
canUpdate() {
return false;
}

/**
* Getter for the maximum number of buckets per connector
* @returns {Number} maximum number of buckets per connector
*/
get maximumBucketsPerConnector() {
return 1;
}
}

module.exports = ImmutableConnector;
37 changes: 27 additions & 10 deletions extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const constants = require('../constants');
const AllocationStrategy = require('./AllocationStrategy');

/**
Expand All @@ -11,21 +12,37 @@ const AllocationStrategy = require('./AllocationStrategy');
class LeastFullConnector extends AllocationStrategy {

/**
* @constructor
* @param {Object} params params
* @param {Logger} params.logger logger object
* Get best connector to assign a bucket to.
* If no connector is available, null is returned.
* @param {Array<Connector>} connectors connectors
* @param {String} bucket bucket name
* @returns {Connector | null} connector
*/
constructor(params) {
super(params.logger);
getConnector(connectors, bucket) { // eslint-disable-line no-unused-vars
if (!connectors.length) {
return null;
}
const connector = connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev));
if (connector.buckets.length >= this.maximumBucketsPerConnector) {
return null;
}
return connector;
}

/**
* Get best connector for assigning a bucket
* @param {Connector[]} connectors available connectors
* @returns {Connector} connector
* Assess if a pipeline can be updated.
* @returns {true} true
*/
getConnector(connectors) {
return connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev));
canUpdate() {
return true;
}

/**
* Getter for the maximum number of buckets per connector
* @returns {Number} maximum number of buckets per connector
*/
get maximumBucketsPerConnector() {
return constants.maxBucketsPerConnector;
}
}

Expand Down
Loading
Loading