diff --git a/.github/workflows/alerts.yaml b/.github/workflows/alerts.yaml index ab189ede0..f4e17f10d 100644 --- a/.github/workflows/alerts.yaml +++ b/.github/workflows/alerts.yaml @@ -81,4 +81,5 @@ jobs: namespace=zenko oplog_populator_job=artesca-data-backbeat-oplog-populator-headless oplogPopulatorChangeStreamLagThreshold=10 + oplogPopulatorChangeStreamLimit=10 github_token: ${{ secrets.GIT_ACCESS_TOKEN }} diff --git a/.gitignore b/.gitignore index b8ee1ab34..386a6c396 100644 --- a/.gitignore +++ b/.gitignore @@ -9,9 +9,13 @@ pids # Coverage directory used by tools like istanbul coverage +.nyc_output # Dependencies node_modules # Redis *.rdb + +# Python cache, created when refreshing the monitoring files +__pycache__ diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index ba536231f..cd4affb9b 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -11,6 +11,9 @@ const { ZenkoMetrics } = require('arsenal').metrics; const OplogPopulatorMetrics = require('./OplogPopulatorMetrics'); const { OplogPopulatorConfigJoiSchema } = require('./OplogPopulatorConfigValidator'); const { mongoJoi } = require('../../lib/config/configItems.joi'); +const ImmutableConnector = require('./allocationStrategy/ImmutableConnector'); +const RetainBucketsDecorator = require('./allocationStrategy/RetainBucketsDecorator'); +const LeastFullConnector = require('./allocationStrategy/LeastFullConnector'); const paramsJoi = joi.object({ config: OplogPopulatorConfigJoiSchema.required(), @@ -51,7 +54,7 @@ class OplogPopulator { this._logger = params.logger; this._changeStreamWrapper = null; this._allocator = null; - this._connectorsManager = null; + this._connectorsManager = null; // contains OplogPopulatorUtils class of each supported extension this._extHelpers = {}; // MongoDB related @@ -78,9 +81,9 @@ class OplogPopulator { async _setupMongoClient() { try { const client = await MongoClient.connect(this._mongoUrl, { - replicaSet: this._replicaSet, - useNewUrlParser: true, - useUnifiedTopology: true, + replicaSet: this._replicaSet, + useNewUrlParser: true, + useUnifiedTopology: true, }); // connect to metadata DB this._mongoClient = client.db(this._database, { @@ -242,35 +245,58 @@ class OplogPopulator { this._changeStreamWrapper.start(); } + _arePipelinesImmutable() { + return semver.gte(this._mongoVersion, constants.mongodbVersionWithImmutablePipelines); + } + + async _initializeConnectorsManager() { + return this._connectorsManager.initializeConnectors(); + } + /** * Sets the OplogPopulator * @returns {Promise|undefined} undefined * @throws {InternalError} */ async setup() { - try { - this._loadOplogHelperClasses(); - this._connectorsManager = new ConnectorsManager({ - nbConnectors: this._config.numberOfConnectors, - database: this._database, - mongoUrl: this._mongoUrl, - oplogTopic: this._config.topic, - cronRule: this._config.connectorsUpdateCronRule, - prefix: this._config.prefix, - heartbeatIntervalMs: this._config.heartbeatIntervalMs, - kafkaConnectHost: this._config.kafkaConnectHost, - kafkaConnectPort: this._config.kafkaConnectPort, - metricsHandler: this._metricsHandler, - logger: this._logger, + try { + this._loadOplogHelperClasses(); + // initialize mongo client + await this._setupMongoClient(); + this._allocationStrategy = this.initStrategy(); + this._connectorsManager = new ConnectorsManager({ + nbConnectors: this._config.numberOfConnectors, + database: this._database, + mongoUrl: this._mongoUrl, + oplogTopic: this._config.topic, + cronRule: this._config.connectorsUpdateCronRule, + prefix: this._config.prefix, + heartbeatIntervalMs: this._config.heartbeatIntervalMs, + kafkaConnectHost: this._config.kafkaConnectHost, + kafkaConnectPort: this._config.kafkaConnectPort, + metricsHandler: this._metricsHandler, + allocationStrategy: this._allocationStrategy, + logger: this._logger, }); - await this._connectorsManager.initializeConnectors(); + await this._initializeConnectorsManager(); this._allocator = new Allocator({ connectorsManager: this._connectorsManager, metricsHandler: this._metricsHandler, + allocationStrategy: this._allocationStrategy, logger: this._logger, }); - // initialize mongo client - await this._setupMongoClient(); + // For now, we always use the RetainBucketsDecorator + // so, we map the events from the classes + this._connectorsManager.on(constants.connectorUpdatedEvent, connector => + this._allocationStrategy.onConnectorUpdatedOrDestroyed(connector)); + this._allocator.on(constants.bucketRemovedFromConnectorEvent, (bucket, connector) => + this._allocationStrategy.onBucketRemoved(bucket, connector)); + this._connectorsManager.on(constants.connectorsReconciledEvent, bucketsExceedingLimit => { + this._metricsHandler.onConnectorsReconciled( + bucketsExceedingLimit, + this._allocationStrategy.retainedBucketsCount, + ); + }); // get currently valid buckets from mongo const validBuckets = await this._getBackbeatEnabledBuckets(); // listen to valid buckets @@ -291,13 +317,48 @@ class OplogPopulator { this._logger.info('OplogPopulator setup complete', { method: 'OplogPopulator.setup', }); - } catch (err) { + } catch (err) { this._logger.error('An error occured when setting up the OplogPopulator', { method: 'OplogPopulator.setup', error: err.description || err.message, }); throw errors.InternalError.customizeDescription(err.description); - } + } + } + + /** + * Init the allocation strategy + * @returns {RetainBucketsDecorator} extended allocation strategy + * handling retained buckets + */ + initStrategy() { + let strategy; + if (this._arePipelinesImmutable()) { + // In this case, mongodb does not support reusing a + // resume token from a different pipeline. In other + // words, we cannot alter an existing pipeline. In this + // case, the strategy is to allow a maximum of one + // bucket per kafka connector. + strategy = new ImmutableConnector({ + logger: this._logger, + metricsHandler: this._metricsHandler, + connectorsManager: this._connectorsManager, + }); + } else { + // In this case, we can have multiple buckets per + // kafka connector. However, we want to proactively + // ensure that the pipeline will be accepted by + // mongodb. + strategy = new LeastFullConnector({ + logger: this._logger, + metricsHandler: this._metricsHandler, + connectorsManager: this._connectorsManager, + }); + } + return new RetainBucketsDecorator( + strategy, + { logger: this._logger }, + ); } /** diff --git a/extensions/oplogPopulator/OplogPopulatorMetrics.js b/extensions/oplogPopulator/OplogPopulatorMetrics.js index 21889a66a..4fe15cf90 100644 --- a/extensions/oplogPopulator/OplogPopulatorMetrics.js +++ b/extensions/oplogPopulator/OplogPopulatorMetrics.js @@ -33,6 +33,14 @@ class OplogPopulatorMetrics { help: 'Total number of buckets per connector', labelNames: ['connector'], }); + this.bucketsExceedingLimit = ZenkoMetrics.createGauge({ + name: 's3_oplog_populator_buckets_exceeding_limit', + help: 'Total number of buckets exceeding the limit for all connectors', + }); + this.retainedBuckets = ZenkoMetrics.createGauge({ + name: 's3_oplog_populator_retained_buckets', + help: 'Current number of buckets still listened to by immutable connectors despite intended removal', + }); this.requestSize = ZenkoMetrics.createCounter({ name: 's3_oplog_populator_connector_request_bytes_total', help: 'Total size of kafka connect request in bytes', @@ -196,6 +204,26 @@ class OplogPopulatorMetrics { }); } } + + /** + * updates metrics when the connectors are reconciled + * @param {number} bucketsExceedingLimit number of buckets above the limit + * for all connectors + * @param {number} retainedBuckets number of buckets still listened to by + * immutable connectors despite intended removal + * @returns {undefined} + */ + onConnectorsReconciled(bucketsExceedingLimit, retainedBuckets) { + try { + this.bucketsExceedingLimit.set(bucketsExceedingLimit); + this.retainedBuckets.set(retainedBuckets); + } catch (error) { + this._logger.error('An error occured while pushing metric', { + method: 'OplogPopulatorMetrics.onConnectorsReconciled', + error: error.message, + }); + } + } } module.exports = OplogPopulatorMetrics; diff --git a/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js b/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js index cabee9443..586fccaed 100644 --- a/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js +++ b/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js @@ -4,21 +4,39 @@ class AllocationStrategy { /** * @constructor - * @param {Logger} logger logger object + * @param {Object} params params + * @param {Logger} params.logger logger object */ - constructor(logger) { - this._logger = logger; + constructor(params) { + this._logger = params.logger; } /** - * Get best connector for assigning a bucket - * @param {Connector[]} connectors available connectors - * @returns {Connector} connector + * Get best connector to assign a bucket to. + * If no connector is available, null is returned. + * @param {Array} connectors connectors + * @param {String} bucket bucket name + * @returns {Connector | null} connector */ - getConnector(connectors) { // eslint-disable-line no-unused-vars + getConnector(connectors, bucket) { // eslint-disable-line no-unused-vars throw errors.NotImplemented; } + /** + * Assess if a pipeline can be updated + * @returns {Boolean} true if the connector can be updated + */ + canUpdate() { + throw errors.NotImplemented; + } + + /** + * Getter for the maximum number of buckets per connector + * @returns {Number} maximum number of buckets per connector + */ + get maximumBucketsPerConnector() { + throw errors.NotImplemented; + } } module.exports = AllocationStrategy; diff --git a/extensions/oplogPopulator/allocationStrategy/ImmutableConnector.js b/extensions/oplogPopulator/allocationStrategy/ImmutableConnector.js new file mode 100644 index 000000000..96dc9b6df --- /dev/null +++ b/extensions/oplogPopulator/allocationStrategy/ImmutableConnector.js @@ -0,0 +1,40 @@ +const AllocationStrategy = require('./AllocationStrategy'); + +/** + * @class ImmutableConnector + * + * @classdesc ImmutableConnector is an allocation + * strategy that always requires a new connector to be + * created for each bucket. + */ +class ImmutableConnector extends AllocationStrategy { + /** + * Get best connector to assign a bucket to. + * If no connector is available, null is returned. + * @param {Array} connectors connectors + * @param {String} bucket bucket name + * @returns {Connector | null} connector + */ + getConnector(connectors, bucket) { // eslint-disable-line no-unused-vars + return null; + } + + /** + * Assess if a pipeline can be updated. With the immutable + * strategy, a connector cannot be updated. + * @returns {false} false + */ + canUpdate() { + return false; + } + + /** + * Getter for the maximum number of buckets per connector + * @returns {Number} maximum number of buckets per connector + */ + get maximumBucketsPerConnector() { + return 1; + } +} + +module.exports = ImmutableConnector; diff --git a/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js b/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js index 104616709..b5b5e6ebf 100644 --- a/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js +++ b/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js @@ -1,3 +1,4 @@ +const constants = require('../constants'); const AllocationStrategy = require('./AllocationStrategy'); /** @@ -11,21 +12,37 @@ const AllocationStrategy = require('./AllocationStrategy'); class LeastFullConnector extends AllocationStrategy { /** - * @constructor - * @param {Object} params params - * @param {Logger} params.logger logger object + * Get best connector to assign a bucket to. + * If no connector is available, null is returned. + * @param {Array} connectors connectors + * @param {String} bucket bucket name + * @returns {Connector | null} connector */ - constructor(params) { - super(params.logger); + getConnector(connectors, bucket) { // eslint-disable-line no-unused-vars + if (!connectors.length) { + return null; + } + const connector = connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev)); + if (connector.buckets.length >= this.maximumBucketsPerConnector) { + return null; + } + return connector; } /** - * Get best connector for assigning a bucket - * @param {Connector[]} connectors available connectors - * @returns {Connector} connector + * Assess if a pipeline can be updated. + * @returns {true} true */ - getConnector(connectors) { - return connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev)); + canUpdate() { + return true; + } + + /** + * Getter for the maximum number of buckets per connector + * @returns {Number} maximum number of buckets per connector + */ + get maximumBucketsPerConnector() { + return constants.maxBucketsPerConnector; } } diff --git a/extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js b/extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js new file mode 100644 index 000000000..040980b81 --- /dev/null +++ b/extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js @@ -0,0 +1,92 @@ +const AllocationStrategy = require('./AllocationStrategy'); + +/** + * @class RetainBucketsDecorator + * + * @classdesc RetainBucketsDecorator is a decorator + * that retains buckets that are removed from the + * connector but still in use. + */ +class RetainBucketsDecorator extends AllocationStrategy { + /** + * @constructor + * @param {AllocationStrategy} strategy the strategy to decorate + * @param {Object} params params + * @param {Logger} params.logger logger object + */ + constructor(strategy, params) { + super(params); + this._strategy = strategy; + + // Stores buckets that should be removed from the connector + // but still in use + this._retainedBuckets = new Map(); + } + + /** + * Get the number of retained buckets + * @returns {Number} number of retained buckets + */ + get retainedBucketsCount() { return this._retainedBuckets.size; } + + /** + * Callback when a bucket is removed from a connector + * @param {String} bucket bucket name + * @param {Connector} connector connector + * @returns {undefined} + */ + onBucketRemoved(bucket, connector) { + // When a bucket is removed, it is not immediately removed + // from the connector. This allows to retain the bucket + // in the connector until it is removed. + this._retainedBuckets.set(bucket, connector); + } + + /** + * Callback when a connector is destroyed or + * updated. + * @param {Connector} connector connector + * @returns {undefined} + */ + onConnectorUpdatedOrDestroyed(connector) { + // When a connector is updated or destroyed, the retained + // buckets are removed from the connector + this._retainedBuckets.forEach((conn, bucket) => { + if (conn === connector) { + this._retainedBuckets.delete(bucket); + } + }); + } + + /** + * Get best connector to assign a bucket to. + * If no connector is available, null is returned. + * If the bucket is retained, the associated connector + * is returned. + * @param {Array} connectors connectors + * @param {String} bucket bucket name + * @returns {Connector | null} connector + */ + getConnector(connectors, bucket) { + return this._retainedBuckets.get(bucket) || + this._strategy.getConnector(connectors, bucket); + } + + /** + * Assess if a pipeline can be updated. + * @returns {Boolean} true if the connector can be updated + */ + canUpdate() { + return this._strategy.canUpdate(); + } + + /** + * Getter for the maximum number of buckets per connector + * @returns {Number} maximum number of buckets per connector + */ + get maximumBucketsPerConnector() { + return this._strategy.maximumBucketsPerConnector; + } +} + +module.exports = RetainBucketsDecorator; diff --git a/extensions/oplogPopulator/constants.js b/extensions/oplogPopulator/constants.js index 269f11de1..5df585267 100644 --- a/extensions/oplogPopulator/constants.js +++ b/extensions/oplogPopulator/constants.js @@ -1,6 +1,14 @@ const constants = { bucketMetastore: '__metastore', defaultConnectorName: 'source-connector', + // Max length in a pipeline is equal to the MongoDB BSON max document size, + // so 16MB. To allow for other parameters in the pipeline, we round the max + // to 16 MB (16777216B) / 64 (max length of a bucket name) ~= 260000 + maxBucketsPerConnector: 260000, + mongodbVersionWithImmutablePipelines: '6.0.0', + connectorUpdatedEvent: 'connector-updated', + bucketRemovedFromConnectorEvent: 'bucket-removed', + connectorsReconciledEvent: 'connectors-reconciled', defaultConnectorConfig: { 'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector', 'pipeline': '[]', diff --git a/extensions/oplogPopulator/modules/Allocator.js b/extensions/oplogPopulator/modules/Allocator.js index fa618dacb..b43cb65e8 100644 --- a/extensions/oplogPopulator/modules/Allocator.js +++ b/extensions/oplogPopulator/modules/Allocator.js @@ -2,12 +2,16 @@ const joi = require('joi'); const { errors } = require('arsenal'); const OplogPopulatorMetrics = require('../OplogPopulatorMetrics'); -const LeastFullConnector = require('../allocationStrategy/LeastFullConnector'); +const AllocationStrategy = require('../allocationStrategy/AllocationStrategy'); +const { EventEmitter } = require('./ConnectorsManager'); +const constants = require('../constants'); const paramsJoi = joi.object({ connectorsManager: joi.object().required(), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), + allocationStrategy: joi.object() + .instance(AllocationStrategy).required(), logger: joi.object().required(), }).required(); @@ -17,21 +21,22 @@ const paramsJoi = joi.object({ * @classdesc Allocator handles listening to buckets by assigning * a connector to them. */ -class Allocator { +class Allocator extends EventEmitter { /** * @constructor * @param {Object} params Allocator param * @param {ConnectorsManager} params.connectorsManager connectorsManager + * @param {OplogPopulatorMetrics} params.metricsHandler metrics handler + * @param {AllocationStrategy} params.allocationStrategy allocation strategy * @param {Logger} params.logger logger object */ constructor(params) { + super(); joi.attempt(params, paramsJoi); this._connectorsManager = params.connectorsManager; this._logger = params.logger; - this._allocationStrategy = new LeastFullConnector({ - logger: params.logger, - }); + this._allocationStrategy = params.allocationStrategy; this._metricsHandler = params.metricsHandler; // Stores connector assigned for each bucket this._bucketsToConnectors = new Map(); @@ -72,18 +77,24 @@ class Allocator { * @throws {InternalError} */ async listenToBucket(bucket, eventDate = null) { + let connector; try { - if (!this._bucketsToConnectors.has(bucket)) { + if (!this.has(bucket)) { const connectors = this._connectorsManager.connectors; - const connector = this._allocationStrategy.getConnector(connectors); + connector = this._allocationStrategy.getConnector(connectors, bucket); + if (!connector) { + // the connector is not available (too many buckets) + // we need to create a new connector + connector = this._connectorsManager.addConnector(); + } // In the initial startup of the oplog populator // we fetch the buckets directly from mongo. // We don't have an event date in this case. if (eventDate) { connector.setResumePoint(eventDate); } - await connector.addBucket(bucket); this._bucketsToConnectors.set(bucket, connector); + await connector.addBucket(bucket); this._metricsHandler.onConnectorConfigured(connector, 'add'); this._logger.info('Started listening to bucket', { method: 'Allocator.listenToBucket', @@ -92,6 +103,9 @@ class Allocator { }); } } catch (err) { + if (this._bucketsToConnectors.get(bucket) === connector) { + this._bucketsToConnectors.delete(bucket); + } this._logger.error('Error when starting to listen to bucket', { method: 'Allocator.listenToBucket', bucket, @@ -113,6 +127,7 @@ class Allocator { try { const connector = this._bucketsToConnectors.get(bucket); if (connector) { + this.emit(constants.bucketRemovedFromConnectorEvent, bucket, connector); await connector.removeBucket(bucket); this._bucketsToConnectors.delete(bucket); this._metricsHandler.onConnectorConfigured(connector, 'delete'); diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index 2306585ae..f38986554 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -1,7 +1,9 @@ const joi = require('joi'); const uuid = require('uuid'); const { errors } = require('arsenal'); +const { EventEmitter } = require('stream'); const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper'); +const constants = require('../constants'); const connectorParams = joi.object({ name: joi.string().required(), @@ -21,7 +23,7 @@ const connectorParams = joi.object({ * destroy and update the config of the connector when adding * or removing buckets from it */ -class Connector { +class Connector extends EventEmitter { /** * @constructor @@ -36,6 +38,7 @@ class Connector { * @param {number} params.kafkaConnectPort kafka connect port */ constructor(params) { + super(); joi.attempt(params, connectorParams); this._name = params.name; this._config = params.config; @@ -184,6 +187,7 @@ class Connector { return; } try { + this.emit(constants.connectorUpdatedEvent, this); await this._kafkaConnect.deleteConnector(this._name); this._isRunning = false; // resetting the resume point to set a new one on creation of the connector @@ -339,6 +343,7 @@ class Connector { if (doUpdate && this._isRunning) { const timeBeforeUpdate = Date.now(); this._state.isUpdating = true; + this.emit(constants.connectorUpdatedEvent, this); await this._kafkaConnect.updateConnectorConfig(this._name, this._config); this._updateConnectorState(false, timeBeforeUpdate); this._state.isUpdating = false; diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index 1d34b9a49..542c3e3f9 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -9,6 +9,8 @@ const constants = require('../constants'); const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper'); const Connector = require('./Connector'); const OplogPopulatorMetrics = require('../OplogPopulatorMetrics'); +const { EventEmitter } = require('stream'); +const AllocationStrategy = require('../allocationStrategy/AllocationStrategy'); const paramsJoi = joi.object({ nbConnectors: joi.number().required(), @@ -22,6 +24,8 @@ const paramsJoi = joi.object({ kafkaConnectPort: joi.number().required(), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), + allocationStrategy: joi.object() + .instance(AllocationStrategy).required(), logger: joi.object().required(), }).required(); @@ -34,7 +38,7 @@ const eachLimit = util.promisify(async.eachLimit); * @classdesc ConnectorsManager handles connector logic * for spawning connectors and retreiving old ones */ -class ConnectorsManager { +class ConnectorsManager extends EventEmitter { /** * @constructor @@ -46,9 +50,11 @@ class ConnectorsManager { * @param {string} params.cronRule connector updates cron rule * @param {string} params.kafkaConnectHost kafka connect host * @param {number} params.kafkaConnectPort kafka connect port + * @param {AllocationStrategy} params.allocationStrategy allocation strategy * @param {Logger} params.logger logger object */ constructor(params) { + super(); joi.attempt(params, paramsJoi); this._nbConnectors = params.nbConnectors; this._cronRule = params.cronRule; @@ -69,6 +75,7 @@ class ConnectorsManager { this._connectors = []; // used for initial clean up of old connector pipelines this._oldConnectors = []; + this._allocationStrategy = params.allocationStrategy; } /** @@ -121,6 +128,9 @@ class ConnectorsManager { kafkaConnectHost: this._kafkaConnectHost, kafkaConnectPort: this._kafkaConnectPort, }); + connector.on(constants.connectorUpdatedEvent, + connector => this.emit(constants.connectorUpdatedEvent, connector)); + this._connectors.push(connector); return connector; } @@ -129,7 +139,7 @@ class ConnectorsManager { * @param {Object} connectorConfig connector config * @returns {string[]} list of buckets */ - _extractBucketsFromConfig(connectorConfig) { + _extractBucketsFromConfig(connectorConfig) { const pipeline = connectorConfig.pipeline ? JSON.parse(connectorConfig.pipeline) : null; if (!pipeline || pipeline.length === 0) { @@ -165,6 +175,14 @@ class ConnectorsManager { kafkaConnectHost: this._kafkaConnectHost, kafkaConnectPort: this._kafkaConnectPort, }); + if (buckets.length > this._allocationStrategy.maximumBucketsPerConnector) { + this._logger.warn('Connector has more bucket than expected', { + method: 'ConnectorsManager._getOldConnectors', + connector: connector.name, + numberOfBuckets: buckets.length, + allowed: this._allocationStrategy.maximumBucketsPerConnector, + }); + } this._logger.debug('Successfully retreived old connector', { method: 'ConnectorsManager._getOldConnectors', connector: connector.name @@ -204,8 +222,7 @@ class ConnectorsManager { // Add connectors if required number of connectors not reached const nbConnectorsToAdd = this._nbConnectors - this._connectors.length; for (let i = 0; i < nbConnectorsToAdd; i++) { - const newConnector = this.addConnector(); - this._connectors.push(newConnector); + this.addConnector(); } this._logger.info('Successfully initialized connectors', { method: 'ConnectorsManager.initializeConnectors', @@ -235,7 +252,7 @@ class ConnectorsManager { this._metricsHandler.onConnectorDestroyed(); this._logger.info('Successfully destroyed a connector', { method: 'ConnectorsManager._spawnOrDestroyConnector', - connector: connector.name + connector: connector.name, }); return true; } else if (!connector.isRunning && connector.bucketCount > 0) { @@ -243,12 +260,13 @@ class ConnectorsManager { this._metricsHandler.onConnectorsInstantiated(false); this._logger.info('Successfully spawned a connector', { method: 'ConnectorsManager._spawnOrDestroyConnector', - connector: connector.name + connector: connector.name, }); return true; - } else if (connector.isRunning) { + } else if (connector.isRunning && this._allocationStrategy.canUpdate()) { return connector.updatePipeline(true); } + return false; } catch (err) { this._logger.error('Error while spawning or destorying connector', { @@ -266,6 +284,7 @@ class ConnectorsManager { */ async _updateConnectors() { const connectorsStatus = {}; + let bucketsExceedingLimit = 0; await eachLimit(this._connectors, 10, async connector => { const startTime = Date.now(); try { @@ -284,6 +303,10 @@ class ConnectorsManager { numberOfBuckets: connector.bucketCount, }; } + if (connector.bucketCount > this._allocationStrategy.maximumBucketsPerConnector) { + bucketsExceedingLimit += + connector.bucketCount - this._allocationStrategy.maximumBucketsPerConnector; + } } catch (err) { this._metricsHandler.onConnectorReconfiguration(connector, false); this._logger.error('Failed to updated connector', { @@ -294,6 +317,7 @@ class ConnectorsManager { }); } }); + this.emit(constants.connectorsReconciledEvent, bucketsExceedingLimit); if (Object.keys(connectorsStatus).length > 0) { this._logger.info('Successfully updated connectors', { method: 'ConnectorsManager._updateConnectors', @@ -351,8 +375,13 @@ class ConnectorsManager { * @returns {undefined} */ scheduleConnectorUpdates() { + let updateInProgress = false; schedule.scheduleJob(this._cronRule, async () => { - await this._updateConnectors(); + if (!updateInProgress) { + updateInProgress = true; + await this._updateConnectors(); + updateInProgress = false; + } }); } diff --git a/monitoring/oplog-populator/alerts.test.yaml b/monitoring/oplog-populator/alerts.test.yaml index 1ab1913de..b641e5f78 100644 --- a/monitoring/oplog-populator/alerts.test.yaml +++ b/monitoring/oplog-populator/alerts.test.yaml @@ -73,3 +73,25 @@ tests: zenko_service: backbeat-oplog-populator description: "Oplog populator restarted connectors too many times" summary: "Oplog populator connectors keep failing after restarts" + + - name: Number of Kafka Connect Replicas + interval: 1m + input_series: + - series: s3_oplog_populator_connectors{job="artesca-data-backbeat-oplog-populator-headless",namespace="zenko"} + values: 1x3 11x20 1x3 + alert_rule_test: + - alertname: KafkaConnectTooManyConnectors + eval_time: 4m + exp_alerts: [] + - alertname: KafkaConnectTooManyConnectors + eval_time: 15m + exp_alerts: + - exp_labels: + severity: warning + exp_annotations: + zenko_service: backbeat-oplog-populator + description: "Kafka connector count is above the soft limit. If the number of buckets is too high, consider using a single replica for your cluster" + summary: "Kafka connector count is above the soft limit" + - alertname: KafkaConnectReplicasAbove + eval_time: 25m + exp_alerts: [] diff --git a/monitoring/oplog-populator/alerts.yaml b/monitoring/oplog-populator/alerts.yaml index 3b43062e6..eb694c54b 100644 --- a/monitoring/oplog-populator/alerts.yaml +++ b/monitoring/oplog-populator/alerts.yaml @@ -8,6 +8,9 @@ x-inputs: - name: oplogPopulatorChangeStreamLagThreshold type: config value: 10 +- name: oplogPopulatorChangeStreamLimit + type: config + value: 0 groups: - name: Oplog Populator @@ -36,6 +39,18 @@ groups: description: "Oplog populator failed to configure connector" summary: "Oplog populator couldn't update kafka connect connector" + - alert: KafkaConnectTooManyConnectors + Expr: | + sum(s3_oplog_populator_connectors{job="${oplog_populator_job}",namespace="${namespace}"}) + > ${oplogPopulatorChangeStreamLimit} + For: "5m" + Labels: + severity: warning + Annotations: + zenko_service: backbeat-oplog-populator + description: "Kafka connector count is above the soft limit. If the number of buckets is too high, consider using a single replica for your cluster" + summary: "Kafka connector count is above the soft limit" + - alert: OplogPopulatorMetastoreChangeStreamLagThreshold Expr: | histogram_quantile( diff --git a/package.json b/package.json index 92ad154e9..860c2020c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "backbeat", - "version": "8.6.48", + "version": "8.6.49", "description": "Asynchronous queue and job manager", "main": "index.js", "scripts": { diff --git a/tests/unit/ingestion/IngestionReader.js b/tests/unit/ingestion/IngestionReader.js index 733047738..c62ac48e9 100644 --- a/tests/unit/ingestion/IngestionReader.js +++ b/tests/unit/ingestion/IngestionReader.js @@ -19,7 +19,7 @@ describe('IngestionReader', () => { zkMock = new ZookeeperMock(); }); - it('Should strip metadata v1 prefixes from object entries', done => { + it('should strip metadata v1 prefixes from object entries', done => { const mockExtension = { filter: sinon.spy(), }; @@ -63,7 +63,7 @@ describe('IngestionReader', () => { done(); }); - it('Should not change keys of objects in v0 format', done => { + it('should not change keys of objects in v0 format', done => { const mockExtension = { filter: sinon.spy(), }; diff --git a/tests/unit/lib/models/ObjectQueueEntry.spec.js b/tests/unit/lib/models/ObjectQueueEntry.spec.js index 3f793d489..fd1482fb6 100644 --- a/tests/unit/lib/models/ObjectQueueEntry.spec.js +++ b/tests/unit/lib/models/ObjectQueueEntry.spec.js @@ -5,14 +5,14 @@ const { replicationEntry } = require('../../../utils/kafkaEntries'); describe('ObjectQueueEntry', () => { describe('toRetryEntry', () => { - it('Should not clear dataStoreVersionId when retrying', () => { + it('should not clear dataStoreVersionId when retrying', () => { const entry = QueueEntry.createFromKafkaEntry(replicationEntry); const dataStoreVersionId = entry.getReplicationSiteDataStoreVersionId('sf'); const retryEntry = entry.toRetryEntry('sf'); assert.strictEqual(retryEntry.getReplicationSiteDataStoreVersionId('sf'), dataStoreVersionId); }); - it('Should only include failed site details', () => { + it('should only include failed site details', () => { const entry = QueueEntry.createFromKafkaEntry(replicationEntry); const retryEntry = entry.toRetryEntry('sf'); assert.strictEqual(retryEntry.getReplicationBackends().length, 1); diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index a40ef1eef..511b80c43 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -72,7 +72,7 @@ describe('LogReader', () => { }); }); - it('Should strip metadata v1 prefixes from object entries', done => { + it('should strip metadata v1 prefixes from object entries', done => { const mockExtension = { filter: sinon.spy(), }; @@ -117,7 +117,7 @@ describe('LogReader', () => { done(); }); - it('Should not change keys of objects in v0 format', done => { + it('should not change keys of objects in v0 format', done => { const mockExtension = { filter: sinon.spy(), }; @@ -164,7 +164,7 @@ describe('LogReader', () => { }); describe('_processFilterEntries', () => { - it('Should do nothing if no records where pushed', done => { + it('should do nothing if no records where pushed', done => { const batchState = { currentRecords: [], }; @@ -176,7 +176,7 @@ describe('LogReader', () => { }); }); - it('Should process all records', done => { + it('should process all records', done => { const batchState = { currentRecords: [1, 2], }; @@ -191,7 +191,7 @@ describe('LogReader', () => { }); describe('_processFilterEntry', () => { - it('Should do nothing if record is empty', done => { + it('should do nothing if record is empty', done => { const batchState = { entriesToPublish: {}, }; @@ -203,7 +203,7 @@ describe('LogReader', () => { }); }); - it('Should process record', done => { + it('should process record', done => { const batchState = { entriesToPublish: {}, }; @@ -225,7 +225,7 @@ describe('LogReader', () => { }); describe('_filterEntries', () => { - it('Should process all record entries', done => { + it('should process all record entries', done => { const batchState = { logStats: { nbLogEntriesRead: 0, diff --git a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js index b184757ab..766899072 100644 --- a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js +++ b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js @@ -111,7 +111,7 @@ describe('ListRecordStream', () => { }); describe('_transform', () => { - it('Should correct format entry', done => { + it('should correct format entry', done => { const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); listRecordStream.write(kafkaMessage); listRecordStream.once('data', data => { @@ -130,7 +130,7 @@ describe('ListRecordStream', () => { return done(); }); }); - it('Should skip record if format is invalid', done => { + it('should skip record if format is invalid', done => { const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); const InvalidKafkaMessage = getKafkaMessage(''); listRecordStream.write(InvalidKafkaMessage); diff --git a/tests/unit/lib/queuePopulator/kafkaLogConsumer/LogConsumer.js b/tests/unit/lib/queuePopulator/kafkaLogConsumer/LogConsumer.js index 3743a94e1..36650466b 100644 --- a/tests/unit/lib/queuePopulator/kafkaLogConsumer/LogConsumer.js +++ b/tests/unit/lib/queuePopulator/kafkaLogConsumer/LogConsumer.js @@ -52,7 +52,7 @@ describe('LogConsumer', () => { }); describe('_waitForAssignment', () => { - it('Should wait for consumer group to balance', done => { + it('should wait for consumer group to balance', done => { const waitAssignementSpy = sinon.spy(logConsumer, '_waitForAssignment'); const getAssignemntsStub = sinon.stub(); getAssignemntsStub.onCall(0).returns([]); @@ -71,7 +71,7 @@ describe('LogConsumer', () => { }); describe('_storeCurrentOffsets', () => { - it('Should store offsets', done => { + it('should store offsets', done => { const committedStub = sinon.stub(); committedStub.callsArgWith(1, null, [{ topic: 'backbeat-oplog-topic', @@ -94,7 +94,7 @@ describe('LogConsumer', () => { }); describe('_resetRecordStream', () => { - it('Should initialize record stream', () => { + it('should initialize record stream', () => { logConsumer._resetRecordStream(); assert(logConsumer._listRecordStream instanceof ListRecordStream); assert.strictEqual(typeof(logConsumer._listRecordStream.getOffset), 'function'); @@ -102,7 +102,7 @@ describe('LogConsumer', () => { }); describe('_consumeKafkaMessages', () => { - it('Should consume kafka messages', done => { + it('should consume kafka messages', done => { const consumeStub = sinon.stub(); consumeStub.callsArgWith(1, null, [kafkaMessage]); logConsumer._consumer = { @@ -131,7 +131,7 @@ describe('LogConsumer', () => { }); describe('readRecords', () => { - it('Should return stream', done => { + it('should return stream', done => { const waitAssignementStub = sinon.stub(logConsumer, '_waitForAssignment') .callsArg(1); const storeOffsetsStub = sinon.stub(logConsumer, '_storeCurrentOffsets') @@ -151,7 +151,7 @@ describe('LogConsumer', () => { }); }); - it('Should fail if consumer group failed to stabilize', done => { + it('should fail if consumer group failed to stabilize', done => { const waitAssignementStub = sinon.stub(logConsumer, '_waitForAssignment') .callsArgWith(1, errors.InternalError); logConsumer.readRecords({ limit: 1 }, err => { @@ -161,7 +161,7 @@ describe('LogConsumer', () => { }); }); - it('Should fail if it can\'t store offsets', done => { + it('should fail if it can\'t store offsets', done => { const waitAssignementStub = sinon.stub(logConsumer, '_waitForAssignment') .callsArg(1); const storeOffsetsStub = sinon.stub(logConsumer, '_storeCurrentOffsets') @@ -174,7 +174,7 @@ describe('LogConsumer', () => { }); }); - it('Should fail if it can\'t consume kafka messages', done => { + it('should fail if it can\'t consume kafka messages', done => { const waitAssignementStub = sinon.stub(logConsumer, '_waitForAssignment') .callsArg(1); const storeOffsetsStub = sinon.stub(logConsumer, '_storeCurrentOffsets') diff --git a/tests/unit/lib/wrappers/ChangeStream.js b/tests/unit/lib/wrappers/ChangeStream.js index 4c7cce4a9..79409ab07 100644 --- a/tests/unit/lib/wrappers/ChangeStream.js +++ b/tests/unit/lib/wrappers/ChangeStream.js @@ -132,7 +132,7 @@ describe('ChangeStream', () => { }); describe('start', () => { - it('Should create and listen to the metastore change stream', done => { + it('should create and listen to the metastore change stream', done => { const watchStub = sinon.stub().returns(new events.EventEmitter()); wrapper._collection = { watch: watchStub, @@ -149,7 +149,7 @@ describe('ChangeStream', () => { }); }); - it('Should set the change stream pipeline', done => { + it('should set the change stream pipeline', done => { const watchStub = sinon.stub().returns(new events.EventEmitter()); const changeStreamPipline = [ { @@ -216,7 +216,7 @@ describe('ChangeStream', () => { }); }); - it('Should fail if it fails to create change stream', done => { + it('should fail if it fails to create change stream', done => { wrapper._collection = { watch: sinon.stub().throws(errors.InternalError), }; diff --git a/tests/unit/lib/wrappers/kafkaConnectWrapper.js b/tests/unit/lib/wrappers/kafkaConnectWrapper.js index 3dad099a8..8e7f1cbf1 100644 --- a/tests/unit/lib/wrappers/kafkaConnectWrapper.js +++ b/tests/unit/lib/wrappers/kafkaConnectWrapper.js @@ -361,7 +361,7 @@ describe('KafkaConnectWrapper', () => { .returns(request); }); - it('Should make request and return body', async () => { + it('should make request and return body', async () => { const promiseResponse = wrapper.makeRequest({ method: 'GET', path: '/connectors', @@ -383,7 +383,7 @@ describe('KafkaConnectWrapper', () => { .catch(err => assert.ifError(err)); }); - it('Should fail if request fails', async () => { + it('should fail if request fails', async () => { const promiseResponse = wrapper.makeRequest({ method: 'GET', path: '/connectors', @@ -394,7 +394,7 @@ describe('KafkaConnectWrapper', () => { .catch(err => assert.deepEqual(err, errors.InternalError)); }); - it('Should write data into post request', async () => { + it('should write data into post request', async () => { const promiseResponse = wrapper.makeRequest({ method: 'POST', path: '/connectors', @@ -423,7 +423,7 @@ describe('KafkaConnectWrapper', () => { .catch(err => assert.ifError(err)); }); - it('Should group and parse data chunks', async () => { + it('should group and parse data chunks', async () => { const promiseResponse = wrapper.makeRequest({ method: 'GET', path: '/connectors', @@ -449,7 +449,7 @@ describe('KafkaConnectWrapper', () => { .catch(err => assert.ifError(err)); }); - it('Should fail if it can\'t parse response body', async () => { + it('should fail if it can\'t parse response body', async () => { const promiseResponse = wrapper.makeRequest({ method: 'GET', path: '/connectors', @@ -471,7 +471,7 @@ describe('KafkaConnectWrapper', () => { .catch(err => assert.deepEqual(err, errors.InternalError)); }); - it('Should fail if response has bad status code', async () => { + it('should fail if response has bad status code', async () => { response.statusCode = 500; response.message = 'Server Error'; const promiseResponse = wrapper.makeRequest({ diff --git a/tests/unit/notification/NotificationConfigManager.js b/tests/unit/notification/NotificationConfigManager.js index f784920a7..51eb145a1 100644 --- a/tests/unit/notification/NotificationConfigManager.js +++ b/tests/unit/notification/NotificationConfigManager.js @@ -98,7 +98,7 @@ describe('NotificationConfigManager ::', () => { }); describe('_setupMongoClient ::', () => { - it('Should setup the mongo client and get metastore collection', () => { + it('should setup the mongo client and get metastore collection', () => { const manager = new NotificationConfigManager(params); const getCollectionStub = sinon.stub(); const mongoCommandStub = sinon.stub().returns({ @@ -123,7 +123,7 @@ describe('NotificationConfigManager ::', () => { }); }); - it('Should fail when mongo client setup fails', () => { + it('should fail when mongo client setup fails', () => { const manager = new NotificationConfigManager(params); sinon.stub(MongoClient, 'connect').callsArgWith(2, errors.InternalError, null); @@ -132,7 +132,7 @@ describe('NotificationConfigManager ::', () => { }); }); - it('Should fail when when getting the metadata db', () => { + it('should fail when when getting the metadata db', () => { const manager = new NotificationConfigManager(params); const getDbStub = sinon.stub().throws(errors.InternalError); sinon.stub(MongoClient, 'connect').callsArgWith(2, null, { @@ -143,7 +143,7 @@ describe('NotificationConfigManager ::', () => { }); }); - it('Should fail when mongo client fails to get metastore', () => { + it('should fail when mongo client fails to get metastore', () => { const manager = new NotificationConfigManager(params); const getCollectionStub = sinon.stub().throws(errors.InternalError); const getDbStub = sinon.stub().returns({ @@ -159,7 +159,7 @@ describe('NotificationConfigManager ::', () => { }); describe('_handleChangeStreamChangeEvent ::', () => { - it('Should remove entry from cache', () => { + it('should remove entry from cache', () => { const changeStreamEvent = { _id: 'resumeToken', operationType: 'delete', @@ -185,7 +185,7 @@ describe('NotificationConfigManager ::', () => { assert.strictEqual(manager._cachedConfigs.count(), 0); }); - it('Should replace entry from cache', () => { + it('should replace entry from cache', () => { const changeStreamEvent = { _id: 'resumeToken', operationType: 'replace', @@ -223,7 +223,7 @@ describe('NotificationConfigManager ::', () => { assert.strictEqual(manager._cachedConfigs.count(), 1); }); - it('Should do nothing when config not in cache', () => { + it('should do nothing when config not in cache', () => { const changeStreamEvent = { _id: 'resumeToken', operationType: 'delete', @@ -250,7 +250,7 @@ describe('NotificationConfigManager ::', () => { assert.strictEqual(manager._cachedConfigs.count(), 1); }); - it('Should do nothing when operation is not supported', () => { + it('should do nothing when operation is not supported', () => { const changeStreamEvent = { _id: 'resumeToken', operationType: 'insert', @@ -315,7 +315,7 @@ describe('NotificationConfigManager ::', () => { }); describe('getConfig ::', () => { - it('Should return notification configuration of bucket', async () => { + it('should return notification configuration of bucket', async () => { const manager = new NotificationConfigManager(params); manager._metastore = { findOne: () => ( @@ -334,7 +334,7 @@ describe('NotificationConfigManager ::', () => { notificationConfiguration); }); - it('Should return undefined when bucket doesn\'t have notification configuration', async () => { + it('should return undefined when bucket doesn\'t have notification configuration', async () => { const manager = new NotificationConfigManager(params); manager._metastore = { findOne: () => ({ value: {} }), @@ -343,7 +343,7 @@ describe('NotificationConfigManager ::', () => { assert.strictEqual(config, undefined); }); - it('Should return undefined when mongo findOne fails', async () => { + it('should return undefined when mongo findOne fails', async () => { const manager = new NotificationConfigManager(params); manager._metastore = { findOne: sinon.stub().throws(errors.InternalError), diff --git a/tests/unit/notification/NotificationQueuePopulator.js b/tests/unit/notification/NotificationQueuePopulator.js index a0163e71a..3db51f70e 100644 --- a/tests/unit/notification/NotificationQueuePopulator.js +++ b/tests/unit/notification/NotificationQueuePopulator.js @@ -48,13 +48,13 @@ describe('NotificationQueuePopulator ::', () => { }); describe('_isBucketEntry ::', () => { - it('Should return true if entry is a bucket entry', done => { + it('should return true if entry is a bucket entry', done => { const isBucket = notificationQueuePopulator._isBucketEntry('__metastore', 'example-bucket'); assert.strictEqual(isBucket, true); return done(); }); - it('Should return false if entry is an object entry', done => { + it('should return false if entry is an object entry', done => { const isBucket = notificationQueuePopulator._isBucketEntry('example-bucket', 'example-key'); assert.strictEqual(isBucket, false); @@ -63,7 +63,7 @@ describe('NotificationQueuePopulator ::', () => { }); describe('_processObjectEntry ::', () => { - it('Should publish object entry in notification topic of destination1', async () => { + it('should publish object entry in notification topic of destination1', async () => { sinon.stub(bnConfigManager, 'getConfig').returns(notificationConfiguration); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( @@ -79,7 +79,7 @@ describe('NotificationQueuePopulator ::', () => { assert.strictEqual(publishStub.getCall(0).args.at(0), 'internal-notification-topic-destination1'); }); - it('Should publish object entry in notification topic of destination2', async () => { + it('should publish object entry in notification topic of destination2', async () => { sinon.stub(bnConfigManager, 'getConfig').returns(notificationConfiguration); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( @@ -95,7 +95,7 @@ describe('NotificationQueuePopulator ::', () => { assert.strictEqual(publishStub.getCall(0).args.at(0), 'internal-notification-topic-destination2'); }); - it('Should not publish object entry in notification topic if ' + + it('should not publish object entry in notification topic if ' + 'config validation failed', async () => { sinon.stub(bnConfigManager, 'getConfig').returns(notificationConfiguration); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); @@ -113,7 +113,7 @@ describe('NotificationQueuePopulator ::', () => { assert(publishStub.notCalled); }); - it('Should publish object entry to internal shared topic only once ' + + it('should publish object entry to internal shared topic only once ' + 'when multiple destinations are valid for that event', async () => { // override the destinations' config to use the default shared topic notificationQueuePopulator.notificationConfig = { @@ -152,7 +152,7 @@ describe('NotificationQueuePopulator ::', () => { assert.strictEqual(publishStub.getCall(0).args.at(0), 'backbeat-bucket-notification'); }); - it('Should publish object entry to same custom internal topic only once ' + + it('should publish object entry to same custom internal topic only once ' + 'when multiple destinations are valid for that event', async () => { // override the destinations' config to use a single custom internal topic notificationQueuePopulator.notificationConfig = { @@ -191,7 +191,7 @@ describe('NotificationQueuePopulator ::', () => { assert.strictEqual(publishStub.getCall(0).args.at(0), 'custom-topic'); }); - it('Should publish object entry to each entry\'s destination topic when multiple ' + + it('should publish object entry to each entry\'s destination topic when multiple ' + 'destinations are valid for an event', async () => { sinon.stub(bnConfigManager, 'getConfig').returns({ queueConfig: [ @@ -250,7 +250,7 @@ describe('NotificationQueuePopulator ::', () => { assert.strictEqual(publishStub.getCall(2).args.at(0), 'backbeat-bucket-notification'); }); - it('Should not publish object entry in notification topic if ' + + it('should not publish object entry in notification topic if ' + 'notification is non standard', async () => { sinon.stub(bnConfigManager, 'getConfig').returns({ queueConfig: [ @@ -277,7 +277,7 @@ describe('NotificationQueuePopulator ::', () => { }); describe('filterAsync ::', () => { - it('Should fail if entry value parse fails', done => { + it('should fail if entry value parse fails', done => { const processEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry'); const entry = { bucket: 'example-bucket', @@ -295,7 +295,7 @@ describe('NotificationQueuePopulator ::', () => { }); }); - it('Should fail if entry is a bucket entry', done => { + it('should fail if entry is a bucket entry', done => { const processEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry'); const entry = { bucket: '__metastore', @@ -313,7 +313,7 @@ describe('NotificationQueuePopulator ::', () => { }); }); - it('Should process the entry', done => { + it('should process the entry', done => { const processEntryCbStub = sinon.stub(notificationQueuePopulator, '_processObjectEntryCb') .yields(); const entry = { @@ -456,7 +456,7 @@ describe('NotificationQueuePopulator with multiple rules ::', () => { }); describe('_processObjectEntry with multiple rules::', () => { - it('Should publish object entry if it matches the first rule', async () => { + it('should publish object entry if it matches the first rule', async () => { const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( 'example-bucket', @@ -472,7 +472,7 @@ describe('NotificationQueuePopulator with multiple rules ::', () => { assert.strictEqual(publishStub.getCall(0).args.at(0), 'internal-notification-topic-destination1'); }); - it('Should publish object entry if it matches the second rule', async () => { + it('should publish object entry if it matches the second rule', async () => { const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( 'example-bucket', @@ -488,7 +488,7 @@ describe('NotificationQueuePopulator with multiple rules ::', () => { assert.strictEqual(publishStub.getCall(0).args.at(0), 'internal-notification-topic-destination1'); }); - it('Should not publish object entry if it does not match any rule', async () => { + it('should not publish object entry if it does not match any rule', async () => { const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( 'example-bucket', diff --git a/tests/unit/notification/NotificationQueueProcessor.js b/tests/unit/notification/NotificationQueueProcessor.js index 4645280bd..8a910c86b 100644 --- a/tests/unit/notification/NotificationQueueProcessor.js +++ b/tests/unit/notification/NotificationQueueProcessor.js @@ -90,7 +90,7 @@ describe('NotificationQueueProcessor:: ', () => { }); describe('processKafkaEntry ::', () => { - it('Should publish notification in correct format', async () => { + it('should publish notification in correct format', async () => { notificationQueueProcessor._getConfig = sinon.stub().yields(null, notificationConfiguration); const sendStub = sinon.stub().yields(null); notificationQueueProcessor._destination = { diff --git a/tests/unit/oplogPopulator/Allocator.js b/tests/unit/oplogPopulator/Allocator.js index 3d822a2c0..d92fab263 100644 --- a/tests/unit/oplogPopulator/Allocator.js +++ b/tests/unit/oplogPopulator/Allocator.js @@ -8,6 +8,8 @@ const Connector = require('../../../extensions/oplogPopulator/modules/Connector'); const OplogPopulatorMetrics = require('../../../extensions/oplogPopulator/OplogPopulatorMetrics'); +const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); +const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator'); const logger = new werelogs.Logger('Allocator'); @@ -31,8 +33,16 @@ describe('Allocator', () => { allocator = new Allocator({ connectorsManager: { connectors: [], + addConnector: () => connector1, }, metricsHandler: new OplogPopulatorMetrics(logger), + allocationStrategy: new RetainBucketsDecorator( + // Not needed to test all strategies here: we stub their methods + new LeastFullConnector({ + logger, + }), + { logger, } + ), logger, }); }); @@ -42,7 +52,7 @@ describe('Allocator', () => { }); describe('_initConnectorToBucketMap', () => { - it('Should initialize map', () => { + it('should initialize map', () => { allocator._connectorsManager.connectors = [connector1]; allocator._initConnectorToBucketMap(); const connector = allocator._bucketsToConnectors.get('example-bucket-1'); @@ -52,32 +62,37 @@ describe('Allocator', () => { }); describe('has', () => { - it('Should return true if bucket exist', () => { + it('should return true if bucket exist', () => { allocator._bucketsToConnectors.set('example-bucket-1', connector1); const exists = allocator.has('example-bucket-1'); assert.strictEqual(exists, true); }); - it('Should return false if bucket doesn\'t exist', () => { + it('should return false if bucket doesn\'t exist', () => { const exists = allocator.has('example-bucket-2'); assert.strictEqual(exists, false); }); }); describe('listenToBucket', () => { - it('Should listen to bucket if it wasn\'t assigned before', async () => { + it('should handle errors', async () => { + sinon.stub(connector1, 'addBucket').rejects(new Error('error')); + assert.rejects(allocator.listenToBucket('example-bucket-2')); + }); + + it('should listen to bucket if it wasn\'t assigned before', async () => { allocator._connectorsManager.connectors = [connector1]; const getConnectorStub = sinon.stub(allocator._allocationStrategy, 'getConnector') .returns(connector1); const addBucketStub = sinon.stub(connector1, 'addBucket').resolves(); await allocator.listenToBucket('example-bucket-1'); - assert(getConnectorStub.calledOnceWith([connector1])); + assert(getConnectorStub.calledOnceWith([connector1], 'example-bucket-1')); assert(addBucketStub.calledOnceWith('example-bucket-1')); const assignedConnector = allocator._bucketsToConnectors.get('example-bucket-1'); assert.deepEqual(assignedConnector, connector1); }); - it('Should not listen to bucket it was assigned before', async () => { + it('should not listen to bucket it was assigned before', async () => { allocator._bucketsToConnectors.set('example-bucket-1', connector1); const getConnectorStub = sinon.stub(allocator._allocationStrategy, 'getConnector') .returns(connector1); @@ -86,19 +101,44 @@ describe('Allocator', () => { assert(getConnectorStub.notCalled); assert(addBucketStub.notCalled); }); + + it('should add a connector if the strategy returns null', () => { + allocator._connectorsManager.connectors = [connector1]; + sinon.stub(allocator._allocationStrategy, 'getConnector').returns(null); + const addConnectorStub = sinon.stub(allocator._connectorsManager, 'addConnector'); + allocator.listenToBucket('example-bucket-2'); + assert(addConnectorStub.calledOnce); + }); + + it('should handle errors when adding a bucket', async () => { + allocator._connectorsManager.connectors = [connector1]; + const getConnectorStub = sinon.stub(allocator._allocationStrategy, 'getConnector') + .returns(connector1); + const addBucketStub = sinon.stub(connector1, 'addBucket').throws(); + await assert.rejects(allocator.listenToBucket('example-bucket-1')); + assert(getConnectorStub.calledOnceWith([connector1], 'example-bucket-1')); + assert(addBucketStub.calledOnceWith('example-bucket-1')); + const assignedConnector = allocator._bucketsToConnectors.get('example-bucket-1'); + assert.deepEqual(assignedConnector, undefined); + }); }); describe('stopListeningToBucket', () => { - it('Should stop listening to bucket if it was assigned a connector', async () => { + it('should handle errors', async () => { + sinon.stub(connector1, 'removeBucket').rejects(new Error('error')); + assert.rejects(allocator.stopListeningToBucket('example-bucket-2')); + }); + + it('should emit event if listening to bucket that was assigned a connector', async () => { allocator._bucketsToConnectors.set('example-bucket-1', connector1); const removeBucketStub = sinon.stub(connector1, 'removeBucket').resolves(); await allocator.stopListeningToBucket('example-bucket-1'); assert(removeBucketStub.calledOnceWith('example-bucket-1')); - const exists = allocator._bucketsToConnectors.has('example-bucket-1'); - assert.strictEqual(exists, false); + const assignedConnector = allocator._bucketsToConnectors.get('example-bucket-1'); + assert.strictEqual(assignedConnector, undefined); }); - it('Should do nothing if bucket has no connector assigned', async () => { + it('should do nothing if bucket has no connector assigned', async () => { const removeBucketStub = sinon.stub(connector1, 'removeBucket').resolves(); await allocator.stopListeningToBucket('example-bucket-1'); assert(removeBucketStub.notCalled); diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index 244fb250e..5637be9cd 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -39,7 +39,7 @@ describe('Connector', () => { }); describe('spawn', () => { - it('Should spawn connector with correct pipeline', async () => { + it('should spawn connector with correct pipeline', async () => { const createStub = sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); assert.strictEqual(connector.isRunning, false); @@ -50,7 +50,7 @@ describe('Connector', () => { })); assert.strictEqual(connector.isRunning, true); }); - it('Should change partition name on creation', async () => { + it('should change partition name on creation', async () => { sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); await connector.spawn(); @@ -59,7 +59,7 @@ describe('Connector', () => { await connector.spawn(); assert.notStrictEqual(partitionName, connector.config['offset.partition.name']); }); - it('Should not try spawning a new connector when on is already existent', async () => { + it('should not try spawning a new connector when on is already existent', async () => { const createStub = sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); connector._isRunning = true; @@ -69,7 +69,7 @@ describe('Connector', () => { }); describe('destroy', () => { - it('Should destroy connector', async () => { + it('should destroy connector', async () => { const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector') .resolves(); connector._isRunning = true; @@ -78,14 +78,14 @@ describe('Connector', () => { assert(deleteStub.calledOnceWith('example-connector')); assert.strictEqual(connector.isRunning, false); }); - it('Should not try destroying a new connector when connector is already destroyed', async () => { + it('should not try destroying a new connector when connector is already destroyed', async () => { const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector') .resolves(); connector._isRunning = false; await connector.destroy(); assert(deleteStub.notCalled); }); - it('Should reset resume point', async () => { + it('should reset resume point', async () => { sinon.stub(connector._kafkaConnect, 'deleteConnector') .resolves(); connector._isRunning = true; @@ -95,21 +95,21 @@ describe('Connector', () => { }); describe('restart', () => { - it('Should restart connector', async () => { + it('should restart connector', async () => { const restartStub = sinon.stub(connector._kafkaConnect, 'restartConnector') .resolves(); connector._isRunning = true; await connector.restart(); assert(restartStub.calledOnceWith('example-connector')); }); - it('Should not try to restart a connector that is not spawned', async () => { + it('should not try to restart a connector that is not spawned', async () => { const restartStub = sinon.stub(connector._kafkaConnect, 'restartConnector') .resolves(); connector._isRunning = false; await connector.restart(); assert(restartStub.notCalled); }); - it('Should fail when kafka connect returns an error', async () => { + it('should fail when kafka connect returns an error', async () => { sinon.stub(connector._kafkaConnect, 'restartConnector') .rejects(errors.InternalError); connector._isRunning = true; @@ -118,7 +118,7 @@ describe('Connector', () => { }); describe('addBucket', () => { - it('Should add bucket and update connector', async () => { + it('should add bucket and update connector', async () => { const connectorUpdateStub = sinon.stub(connector, 'updatePipeline') .resolves(); await connector.addBucket('example-bucket'); @@ -126,7 +126,7 @@ describe('Connector', () => { assert.strictEqual(connector._buckets.has('example-bucket'), true); }); - it('Should add bucket without updating connector', async () => { + it('should add bucket without updating connector', async () => { const connectorUpdateStub = sinon.stub(connector, 'updatePipeline') .resolves(); await connector.addBucket('example-bucket', false); @@ -135,7 +135,7 @@ describe('Connector', () => { }); describe('removeBucket', () => { - it('Should remove bucket and update connector', async () => { + it('should remove bucket and update connector', async () => { const connectorUpdateStub = sinon.stub(connector, 'updatePipeline') .resolves(); connector._buckets.add('example-bucket'); @@ -144,7 +144,7 @@ describe('Connector', () => { assert.strictEqual(connector._buckets.has('example-bucket'), false); }); - it('Should remove bucket without updating connector', async () => { + it('should remove bucket without updating connector', async () => { const connectorUpdateStub = sinon.stub(connector, 'updatePipeline') .resolves(); await connector.removeBucket('example-bucket', false); @@ -153,7 +153,7 @@ describe('Connector', () => { }); describe('_generateConnectorPipeline', () => { - it('Should return new pipeline', () => { + it('should return new pipeline', () => { const buckets = ['example-bucket-1', 'example-bucket-2']; const pipeline = connector._generateConnectorPipeline(buckets); assert.strictEqual(pipeline, JSON.stringify([ @@ -169,7 +169,7 @@ describe('Connector', () => { }); describe('_updateConnectorState', () => { - it('Should update all fields when a bucket is added/removed', () => { + it('should update all fields when a bucket is added/removed', () => { const clock = sinon.useFakeTimers(); clock.tick(100); connector._state.bucketsGotModified = false; @@ -179,7 +179,7 @@ describe('Connector', () => { assert.notEqual(oldDate, connector._state.lastUpdated); }); - it('Should update all fields when connector got updated and no other operations occured', () => { + it('should update all fields when connector got updated and no other operations occured', () => { connector._state.bucketsGotModified = true; const oldDate = connector._state.lastUpdated; const now = Date.now(); @@ -190,7 +190,7 @@ describe('Connector', () => { assert.notEqual(oldDate, connector._state.lastUpdated); }); - it('Should only update date incase an opetation happend while updating connector', () => { + it('should only update date incase an opetation happend while updating connector', () => { const oldDate = Date.now(); connector._state.lastUpdated = oldDate; connector._state.bucketsGotModified = true; @@ -204,7 +204,7 @@ describe('Connector', () => { }); describe('updatePipeline', () => { - it('Should only update connector pipeline data if conditions are met', async () => { + it('should only update connector pipeline data if conditions are met', async () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = false; const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') @@ -217,7 +217,7 @@ describe('Connector', () => { assert(updateStub.notCalled); }); - it('Should update connector', async () => { + it('should update connector', async () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = false; connector._isRunning = true; @@ -231,7 +231,7 @@ describe('Connector', () => { assert(updateStub.calledOnceWith('example-connector', connector._config)); }); - it('Should not update when buckets assigned to connector haven\'t changed', async () => { + it('should not update when buckets assigned to connector haven\'t changed', async () => { connector._state.bucketsGotModified = false; connector._state.isUpdating = false; const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') @@ -244,7 +244,7 @@ describe('Connector', () => { assert(updateStub.notCalled); }); - it('Should not update when connector is updating', async () => { + it('should not update when connector is updating', async () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = true; const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') @@ -257,7 +257,7 @@ describe('Connector', () => { assert(updateStub.notCalled); }); - it('Should not update destroyed connector', async () => { + it('should not update destroyed connector', async () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = false; connector._isRunning = false; @@ -273,7 +273,7 @@ describe('Connector', () => { }); describe('getConfigSizeInBytes', () => { - it('Should return correct size in bytes', () => { + it('should return correct size in bytes', () => { connector._config = { key: 'value' }; const size = connector.getConfigSizeInBytes(); assert.strictEqual(size, 15); @@ -281,7 +281,7 @@ describe('Connector', () => { }); describe('updatePartitionName', () => { - it('Should update partition name in config', () => { + it('should update partition name in config', () => { connector._config['offset.partition.name'] = 'partition-name'; connector.updatePartitionName(); assert.notStrictEqual(connector._config['offset.partition.name'], 'partition-name'); @@ -289,7 +289,7 @@ describe('Connector', () => { }); describe('setResumePoint', () => { - it('Should not set the resume point when resume point already set', () => { + it('should not set the resume point when resume point already set', () => { connector._isRunning = false; connector._config['startup.mode.timestamp.start.at.operation.time'] = '2023-11-15T16:18:53.000Z'; connector.setResumePoint(new Date('2023-11-16T16:18:53.000Z')); @@ -299,7 +299,7 @@ describe('Connector', () => { ); }); - it('Should set the resume point when not present and connector is stopped', () => { + it('should set the resume point when not present and connector is stopped', () => { connector._isRunning = false; delete connector._config['startup.mode.timestamp.start.at.operation.time']; diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 7ce464b23..345632f57 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -1,6 +1,7 @@ const assert = require('assert'); const sinon = require('sinon'); const werelogs = require('werelogs'); +const schedule = require('node-schedule'); const Connector = require('../../../extensions/oplogPopulator/modules/Connector'); @@ -8,6 +9,9 @@ const ConnectorsManager = require('../../../extensions/oplogPopulator/modules/ConnectorsManager'); const OplogPopulatorMetrics = require('../../../extensions/oplogPopulator/OplogPopulatorMetrics'); +const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator'); +const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); +const constants = require('../../../extensions/oplogPopulator/constants'); const logger = new werelogs.Logger('ConnectorsManager'); @@ -96,6 +100,13 @@ describe('ConnectorsManager', () => { kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, metricsHandler: new OplogPopulatorMetrics(logger), + allocationStrategy: new RetainBucketsDecorator( + // Not needed to test all strategies here: we stub their methods + new LeastFullConnector({ + logger, + }), + { logger } + ), logger, }); }); @@ -113,12 +124,12 @@ describe('ConnectorsManager', () => { }); describe('_generateConnectorName', () => { - it('Should generate a random name', () => { + it('should generate a random name', () => { const connectorName = connectorsManager._generateConnectorName(); assert(connectorName.startsWith('source-connector-')); }); - it('Should add prefix to connector name', () => { + it('should add prefix to connector name', () => { connectorsManager._prefix = 'pfx-'; const connectorName = connectorsManager._generateConnectorName(); assert(connectorName.startsWith('pfx-source-connector-')); @@ -155,7 +166,7 @@ describe('ConnectorsManager', () => { }); describe('_getOldConnectors', () => { - it('Should update connector config while keeping the extra fields', async () => { + it('should update connector config while keeping the extra fields', async () => { const config = { ...connectorConfig }; config['topic.namespace.map'] = 'outdated-topic'; config['offset.partitiom.name'] = 'partition-name'; @@ -168,10 +179,24 @@ describe('ConnectorsManager', () => { assert.strictEqual(connectors[0].config['topic.namespace.map'], '{"*":"oplog"}'); assert.strictEqual(connectors[0].isRunning, true); }); + + it('should warn when the number of retrieved bucket in a connector exceeds the limit', async () => { + const config = { ...connectorConfig }; + config['topic.namespace.map'] = 'outdated-topic'; + config['offset.partitiom.name'] = 'partition-name'; + sinon.stub(connectorsManager._allocationStrategy, 'maximumBucketsPerConnector').value(1); + sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') + .resolves(config); + sinon.stub(connectorsManager, '_extractBucketsFromConfig').returns(['bucket1', 'bucket2']); + const warnStub = sinon.stub(connectorsManager._logger, 'warn'); + const connectors = await connectorsManager._getOldConnectors(['source-connector', 'source-connector-2']); + assert.strictEqual(connectors.length, 2); + assert(warnStub.called); + }); }); describe('initializeConnectors', () => { - it('Should initialize old connector', async () => { + it('should initialize old connector', async () => { connectorsManager._nbConnectors = 1; sinon.stub(connectorsManager._kafkaConnect, 'getConnectors') .resolves(['source-connector']); @@ -183,12 +208,15 @@ describe('ConnectorsManager', () => { assert.deepEqual(connectorsManager._oldConnectors, [connector1]); }); - it('Should add more connectors', async () => { + it('should add more connectors', async () => { connectorsManager._nbConnectors = 1; sinon.stub(connectorsManager._kafkaConnect, 'getConnectors') .resolves([]); sinon.stub(connectorsManager, 'addConnector') - .returns(connector1); + .callsFake(() => { + connectorsManager._connectors.push(connector1); + return connector1; + }); const connectors = await connectorsManager.initializeConnectors(); assert.deepEqual(connectors, [connector1]); assert.deepEqual(connectorsManager._connectors, [connector1]); @@ -207,6 +235,15 @@ describe('ConnectorsManager', () => { assert(connectorDeleteStub.calledOnceWith(connector1.name)); }); + it('should emit event when destroying connector', async () => { + connector1._isRunning = true; + connector1._state.bucketsGotModified = false; + connector1._buckets = new Set(); + const emitStub = sinon.stub(connector1, 'emit'); + await connectorsManager._spawnOrDestroyConnector(connector1); + assert(emitStub.calledOnceWith(constants.connectorUpdatedEvent, connector1)); + }); + it('should spawn a non running connector when buckets are configured', async () => { connector1._isRunning = false; connector1._state.bucketsGotModified = false; @@ -239,6 +276,18 @@ describe('ConnectorsManager', () => { assert(connectorCreateStub.notCalled); assert(connectorDeleteStub.notCalled); }); + + it('should do nothing if the strategy does not allow to update', async () => { + connector1._isRunning = true; + connector1._state.bucketsGotModified = false; + connector1._buckets = new Set(['bucket1']); + sinon.stub(connectorsManager._allocationStrategy, 'canUpdate') + .resolves(false); + const updated = await connectorsManager._spawnOrDestroyConnector(connector1); + assert.strictEqual(updated, false); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + }); }); describe('_updateConnectors', () => { @@ -376,5 +425,20 @@ describe('ConnectorsManager', () => { assert(connectorRestartStub.notCalled); }); }); + + describe('scheduleConnectorUpdates', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should schedule connector updates', () => { + const updateConnectorsStub = sinon.stub(connectorsManager, '_updateConnectors'); + sinon.stub(schedule, 'scheduleJob').callsFake((rule, cb) => { + cb(); + }); + connectorsManager.scheduleConnectorUpdates(); + assert(updateConnectorsStub.called); + }); + }); }); diff --git a/tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js b/tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js new file mode 100644 index 000000000..435e25c3d --- /dev/null +++ b/tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js @@ -0,0 +1,23 @@ +const assert = require('assert'); +const werelogs = require('werelogs'); +const AllocationStrategy = require('../../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy'); + +const logger = new werelogs.Logger('LeastFullConnector'); + +describe('AllocationStrategy', () => { + it('should throw NotImplemented when calling getConnector', async () => { + const allocationStrategy = new AllocationStrategy({ logger }); + assert.throws(() => allocationStrategy.getConnector(), { + name: 'Error', + type: 'NotImplemented', + }); + }); + + it('should throw NotImplemented when calling canUpdate', async () => { + const allocationStrategy = new AllocationStrategy({ logger }); + assert.throws(() => allocationStrategy.canUpdate(), { + name: 'Error', + type: 'NotImplemented', + }); + }); +}); diff --git a/tests/unit/oplogPopulator/allocationStrategy/ImmutableConnector.js b/tests/unit/oplogPopulator/allocationStrategy/ImmutableConnector.js new file mode 100644 index 000000000..6fdc0c63a --- /dev/null +++ b/tests/unit/oplogPopulator/allocationStrategy/ImmutableConnector.js @@ -0,0 +1,56 @@ +const assert = require('assert'); +const werelogs = require('werelogs'); + +const Connector = + require('../../../../extensions/oplogPopulator/modules/Connector'); +const ImmutableConnector = + require('../../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector'); + +const logger = new werelogs.Logger('LeastFullConnector'); + +const defaultConnectorParams = { + config: {}, + isRunning: true, + logger, + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, +}; + +const connector1 = new Connector({ + name: 'example-connector-1', + buckets: [], + ...defaultConnectorParams, +}); + +const connector2 = new Connector({ + name: 'example-connector-2', + buckets: ['bucket1', 'bucket2'], + ...defaultConnectorParams, +}); + +const connector3 = new Connector({ + name: 'example-connector-3', + buckets: ['bucket3'], + ...defaultConnectorParams, +}); + +describe('ImmutableConnector', () => { + it('should return null for getConnector', async () => { + const immutableConnector = new ImmutableConnector({ logger }); + const result = await immutableConnector.getConnector( + [connector1, connector2, connector3], 'bucket1'); + assert.strictEqual(result, null); + }); + + it('should return false for canUpdate', async () => { + const immutableConnector = new ImmutableConnector({ logger }); + const result = await immutableConnector.canUpdate(); + assert.strictEqual(result, false); + }); + + it('should return 1 for maximumBucketsPerConnector', async () => { + const immutableConnector = new ImmutableConnector({ logger }); + const result = await immutableConnector.maximumBucketsPerConnector; + assert.strictEqual(result, 1); + }); +}); diff --git a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js index c4f54d18b..a229e1d34 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js +++ b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js @@ -1,10 +1,12 @@ const assert = require('assert'); +const sinon = require('sinon'); const werelogs = require('werelogs'); const Connector = require('../../../../extensions/oplogPopulator/modules/Connector'); const LeastFullConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); +const constants = require('../../../../extensions/oplogPopulator/constants'); const logger = new werelogs.Logger('LeastFullConnector'); @@ -37,9 +39,36 @@ describe('LeastFullConnector', () => { }); describe('getConnector', () => { - it('Should return connector with fewest buckets', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should return connector with fewest buckets', () => { const connector = strategy.getConnector([connector1, connector2]); assert.strictEqual(connector.name, connector1.name); }); + + it('should return null if no connectors', () => { + const connector = strategy.getConnector([]); + assert.strictEqual(connector, null); + }); + + it('should return null if the smallest connector is full', () => { + sinon.stub(strategy, 'maximumBucketsPerConnector').value(1); + const connector = strategy.getConnector([connector2]); + assert.strictEqual(connector, null); + }); + }); + + describe('canUpdate', () => { + it('should return true', () => { + assert.strictEqual(strategy.canUpdate(), true); + }); + }); + + describe('maximumBucketsPerConnector', () => { + it('should return the maximum number of buckets per connector', () => { + assert.strictEqual(strategy.maximumBucketsPerConnector, constants.maxBucketsPerConnector); + }); }); }); diff --git a/tests/unit/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js b/tests/unit/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js new file mode 100644 index 000000000..14d2f9b9e --- /dev/null +++ b/tests/unit/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js @@ -0,0 +1,118 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const werelogs = require('werelogs'); + +const Connector = + require('../../../../extensions/oplogPopulator/modules/Connector'); +const RetainBucketsDecorator = + require('../../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator'); +const ImmutableConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector'); +const LeastFullConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); + +const logger = new werelogs.Logger('LeastFullConnector'); + +const defaultConnectorParams = { + config: {}, + isRunning: true, + logger, + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, +}; + +const connector1 = new Connector({ + name: 'example-connector-1', + buckets: [], + ...defaultConnectorParams, +}); + +const connector2 = new Connector({ + name: 'example-connector-2', + buckets: ['bucket1', 'bucket2'], + ...defaultConnectorParams, +}); + +[ + { + scenario: 'ImmutableConnector', + strategy: new ImmutableConnector({ logger }), + }, + { + scenario: 'LeastFullConnector', + strategy: new LeastFullConnector({ + logger, + }), + } +].forEach(({ scenario, strategy }) => { + describe(`RetainBucketsDecorator with ${scenario}`, () => { + let decorator; + + beforeEach(() => { + decorator = new RetainBucketsDecorator(strategy, { logger }); + }); + + afterEach(() => { + decorator._retainedBuckets.clear(); + sinon.restore(); + }); + + describe('onBucketRemoved', () => { + it('should add bucket from retained buckets', () => { + decorator.onBucketRemoved('bucket1', connector1); + assert.strictEqual(decorator._retainedBuckets.size, 1); + assert.strictEqual(decorator._retainedBuckets.get('bucket1'), connector1); + }); + }); + + describe('getConnector', () => { + it('should return the retrained bucket\'s connector if it exists', () => { + decorator._retainedBuckets.set('bucket1', connector1); + const result = decorator.getConnector([connector1, connector2], 'bucket1'); + assert.strictEqual(result, connector1); + }); + + it('should call the strategy if the bucket is not retained', () => { + const result = decorator.getConnector([connector1, connector2], 'bucket1'); + assert.strictEqual(result, strategy.getConnector([connector1, connector2], 'bucket1')); + }); + }); + + describe('onConnectorDestroyed', () => { + it('should remove retained buckets for connector', () => { + decorator._retainedBuckets.set('bucket1', connector1); + decorator._retainedBuckets.set('bucket2', connector2); + decorator.onConnectorUpdatedOrDestroyed(connector1); + assert.strictEqual(decorator._retainedBuckets.size, 1); + assert.strictEqual(decorator._retainedBuckets.get('bucket2'), connector2); + }); + + it('should not remove retained buckets for other connectors', () => { + decorator._retainedBuckets.set('bucket1', connector1); + decorator._retainedBuckets.set('bucket2', connector2); + decorator.onConnectorUpdatedOrDestroyed(connector2); + assert.strictEqual(decorator._retainedBuckets.size, 1); + assert.strictEqual(decorator._retainedBuckets.get('bucket1'), connector1); + }); + }); + + describe('canUpdate', () => { + it('should return the strategy result', async () => { + const result = await decorator.canUpdate(); + assert.strictEqual(result, strategy.canUpdate()); + }); + + it('should remove from retained buckets if the strategy allows', async () => { + sinon.stub(strategy, 'canUpdate').returns(true); + await decorator.canUpdate(); + assert.strictEqual(decorator._retainedBuckets.size, 0); + }); + }); + + describe('maximumBucketsPerConnector', () => { + it('should return the strategy result', () => { + const result = decorator.maximumBucketsPerConnector; + assert.strictEqual(result, strategy.maximumBucketsPerConnector); + }); + }); + }); + +}); diff --git a/tests/unit/oplogPopulator/oplogPopulator.js b/tests/unit/oplogPopulator/oplogPopulator.js index 040484a1f..e6365b801 100644 --- a/tests/unit/oplogPopulator/oplogPopulator.js +++ b/tests/unit/oplogPopulator/oplogPopulator.js @@ -13,6 +13,12 @@ const OplogPopulator = require('../../../extensions/oplogPopulator/OplogPopulator'); const ChangeStream = require('../../../lib/wrappers/ChangeStream'); +const ConnectorsManager = require('../../../extensions/oplogPopulator/modules/ConnectorsManager'); +const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator'); +const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); +const ImmutableConnector = require('../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector'); +const AllocationStrategy = require('../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy'); +const constants = require('../../../extensions/oplogPopulator/constants'); const oplogPopulatorConfig = { topic: 'oplog', @@ -113,6 +119,154 @@ describe('OplogPopulator', () => { }); }); + describe('_arePipelinesImmutable', () => { + it('should return true if pipeline is immutable', () => { + oplogPopulator._mongoVersion = '6.0.0'; + assert(oplogPopulator._arePipelinesImmutable()); + }); + + it('should return false if pipeline is not immutable', () => { + oplogPopulator._mongoVersion = '5.0.0'; + assert(!oplogPopulator._arePipelinesImmutable()); + }); + }); + + describe('initStrategy', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should return an instance of RetainBucketsDecorator for immutable pipelines', () => { + const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(true); + const strategy = oplogPopulator.initStrategy(); + assert(strategy instanceof RetainBucketsDecorator); + assert(strategy._strategy instanceof ImmutableConnector); + assert(arePipelinesImmutableStub.calledOnce); + }); + + it('should return an instance of RetainBucketsDecorator for immutable pipelines', () => { + const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(false); + const strategy = oplogPopulator.initStrategy(); + assert(strategy instanceof RetainBucketsDecorator); + assert(strategy._strategy instanceof LeastFullConnector); + assert(arePipelinesImmutableStub.calledOnce); + }); + }); + + describe('setup', () => { + it('should handle error during setup', async () => { + const error = new Error('InternalError'); + const loadOplogHelperClassesStub = sinon.stub(oplogPopulator, '_loadOplogHelperClasses').throws(error); + const loggerErrorStub = sinon.stub(oplogPopulator._logger, 'error'); + + await assert.rejects(oplogPopulator.setup(), error); + + assert(loadOplogHelperClassesStub.calledOnce); + assert(loggerErrorStub.calledWith('An error occured when setting up the OplogPopulator', { + method: 'OplogPopulator.setup', + error: 'InternalError', + })); + }); + + it('should setup oplog populator', async () => { + const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves(); + const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream'); + const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager'); + const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]); + + await oplogPopulator.setup(); + + assert(setupMongoClientStub.calledOnce); + assert(getBackbeatEnabledBucketsStub.calledOnce); + assert(setMetastoreChangeStreamStub.calledOnce); + assert(initializeConnectorsManagerStub.calledOnce); + }); + + it('should setup oplog populator with immutable pipelines', async () => { + const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves(); + const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream'); + const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager'); + const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]); + + oplogPopulator._mongoVersion = '6.0.0'; + + await oplogPopulator.setup(); + + assert(setupMongoClientStub.calledOnce); + assert(getBackbeatEnabledBucketsStub.calledOnce); + assert(setMetastoreChangeStreamStub.calledOnce); + assert(initializeConnectorsManagerStub.calledOnce); + }); + + it('should bind the connector-updated event from the connectors manager', async () => { + const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves(); + const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream'); + const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager'); + const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]); + await oplogPopulator.setup(); + assert(setupMongoClientStub.calledOnce); + assert(getBackbeatEnabledBucketsStub.calledOnce); + assert(setMetastoreChangeStreamStub.calledOnce); + assert(initializeConnectorsManagerStub.calledOnce); + const onConnectorUpdatedOrDestroyedStub = + sinon.stub(oplogPopulator._allocationStrategy, 'onConnectorUpdatedOrDestroyed'); + oplogPopulator._connectorsManager.emit(constants.connectorUpdatedEvent); + assert(onConnectorUpdatedOrDestroyedStub.calledOnce); + }); + + it('should bind the bucket-removed event from the allocator', async () => { + const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves(); + const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream'); + const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager'); + const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]); + await oplogPopulator.setup(); + assert(setupMongoClientStub.calledOnce); + assert(getBackbeatEnabledBucketsStub.calledOnce); + assert(setMetastoreChangeStreamStub.calledOnce); + assert(initializeConnectorsManagerStub.calledOnce); + const onBucketRemovedStub = sinon.stub(oplogPopulator._allocationStrategy, 'onBucketRemoved'); + oplogPopulator._allocator.emit(constants.bucketRemovedFromConnectorEvent); + assert(onBucketRemovedStub.calledOnce); + }); + + it('should bind the connectors-reconciled event from the connectors manager', async () => { + const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves(); + const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream'); + const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager'); + const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]); + await oplogPopulator.setup(); + assert(setupMongoClientStub.calledOnce); + assert(getBackbeatEnabledBucketsStub.calledOnce); + assert(setMetastoreChangeStreamStub.calledOnce); + assert(initializeConnectorsManagerStub.calledOnce); + const onConnectorsReconciledStub = sinon.stub(oplogPopulator._metricsHandler, 'onConnectorsReconciled'); + oplogPopulator._connectorsManager.emit(constants.connectorsReconciledEvent); + assert(onConnectorsReconciledStub.calledOnce); + }); + }); + + describe('_initializeConnectorsManager', () => { + it('should initialize connectors manager', async () => { + oplogPopulator._connectorsManager = new ConnectorsManager({ + nbConnectors: oplogPopulator._config.numberOfConnectors, + database: oplogPopulator._database, + mongoUrl: oplogPopulator._mongoUrl, + oplogTopic: oplogPopulator._config.topic, + cronRule: oplogPopulator._config.connectorsUpdateCronRule, + prefix: oplogPopulator._config.prefix, + heartbeatIntervalMs: oplogPopulator._config.heartbeatIntervalMs, + kafkaConnectHost: oplogPopulator._config.kafkaConnectHost, + kafkaConnectPort: oplogPopulator._config.kafkaConnectPort, + metricsHandler: oplogPopulator._metricsHandler, + allocationStrategy: new AllocationStrategy({ logger }), + logger: oplogPopulator._logger, + }); + const connectorsManagerStub = sinon.stub(oplogPopulator._connectorsManager, 'initializeConnectors'); + await oplogPopulator._initializeConnectorsManager(); + assert(connectorsManagerStub.calledOnce); + }); + }); + describe('_setupMongoClient', () => { it('should connect to mongo and setup client', async () => { const collectionStub = sinon.stub(); diff --git a/tests/unit/utils/MongoUtils.js b/tests/unit/utils/MongoUtils.js index ea6ed648e..727872bd0 100644 --- a/tests/unit/utils/MongoUtils.js +++ b/tests/unit/utils/MongoUtils.js @@ -27,14 +27,14 @@ const mongoConfShard = { }; describe('constructConnectionString', () => { - it('Should construct correct mongo connection string', done => { + it('should construct correct mongo connection string', done => { const url = MongoUtils.constructConnectionString(mongoConfRepl); assert.strictEqual(url, 'mongodb://user:pass@localhost:27017,localhost:27018,localhost:27019' + '/?w=majority&readPreference=primary&replicaSet=rs0'); return done(); }); - it('Should construct correct mongo connection string (replica)', done => { + it('should construct correct mongo connection string (replica)', done => { const url = MongoUtils.constructConnectionString(mongoConfShard); assert.strictEqual(url, 'mongodb://user:pass@localhost:27017,localhost:27018,localhost:27019' + '/?w=majority&readPreference=primary'); @@ -49,7 +49,7 @@ describe('getMongoVersion', () => { }), }; - it('Should return mongo version in the passed callback', done => { + it('should return mongo version in the passed callback', done => { MongoUtils.getMongoVersion(client, (err, version) => { assert.ifError(err); assert.strictEqual(version, '4.2.0'); @@ -57,7 +57,7 @@ describe('getMongoVersion', () => { }); }); - it('Should return mongo version as a return value', async () => { + it('should return mongo version as a return value', async () => { const version = await MongoUtils.getMongoVersion(client); assert.strictEqual(version, '4.2.0'); });