diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index ba536231f..884d08933 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -51,7 +51,7 @@ class OplogPopulator { this._logger = params.logger; this._changeStreamWrapper = null; this._allocator = null; - this._connectorsManager = null; + this._connectorsManager = null; // contains OplogPopulatorUtils class of each supported extension this._extHelpers = {}; // MongoDB related @@ -78,9 +78,9 @@ class OplogPopulator { async _setupMongoClient() { try { const client = await MongoClient.connect(this._mongoUrl, { - replicaSet: this._replicaSet, - useNewUrlParser: true, - useUnifiedTopology: true, + replicaSet: this._replicaSet, + useNewUrlParser: true, + useUnifiedTopology: true, }); // connect to metadata DB this._mongoClient = client.db(this._database, { @@ -185,7 +185,7 @@ class OplogPopulator { // remove bucket if no longer backbeat enabled if (isListeningToBucket && !isBackbeatEnabled) { await this._allocator.stopListeningToBucket(change.documentKey._id); - // add bucket if it became backbeat enabled + // add bucket if it became backbeat enabled } else if (!isListeningToBucket && isBackbeatEnabled) { await this._allocator.listenToBucket(change.documentKey._id, eventDate); } @@ -242,35 +242,65 @@ class OplogPopulator { this._changeStreamWrapper.start(); } + _isPipelineImmutable() { + return semver.gte(this._mongoVersion, constants.mongodbVersionWithImmutablePipelines); + } + /** * Sets the OplogPopulator * @returns {Promise|undefined} undefined * @throws {InternalError} */ async setup() { - try { - this._loadOplogHelperClasses(); - this._connectorsManager = new ConnectorsManager({ - nbConnectors: this._config.numberOfConnectors, - database: this._database, - mongoUrl: this._mongoUrl, - oplogTopic: this._config.topic, - cronRule: this._config.connectorsUpdateCronRule, - prefix: this._config.prefix, - heartbeatIntervalMs: this._config.heartbeatIntervalMs, - kafkaConnectHost: this._config.kafkaConnectHost, - kafkaConnectPort: this._config.kafkaConnectPort, - metricsHandler: this._metricsHandler, - logger: this._logger, + try { + this._loadOplogHelperClasses(); + // initialize mongo client + await this._setupMongoClient(); + + if (this._isPipelineImmutable()) { + // In this case, mongodb does not support reusing a + // resume token from a different pipeline. In other + // words, we cannot alter an existing pipeline. In this + // case, the strategy is to allow a maximum of one + // bucket per kafka connector. + this._maximumBucketsPerConnector = 1; + } else { + // In this case, we can have multiple buckets per + // kafka connector. However, we want to proactively + // ensure that the pipeline will be accepted by + // mongodb. + this._maximumBucketsPerConnector = constants.maxBucketPerConnector; + } + + // If the flag useSingleChangeStream is set to true, we + // set the max number to infinity, and the number of connectors + // to 1. + if (this._config.changeStream.singleChangeStream) { + this._maximumBucketsPerConnector = Infinity; + this._config.numberOfConnectors = 1; + } + this._connectorsManager = new ConnectorsManager({ + nbConnectors: this._config.numberOfConnectors, + singleChangeStream: this._config.changeStream.singleChangeStream, + isPipelineImmutable: this._isPipelineImmutable(), + database: this._database, + mongoUrl: this._mongoUrl, + oplogTopic: this._config.topic, + cronRule: this._config.connectorsUpdateCronRule, + prefix: this._config.prefix, + heartbeatIntervalMs: this._config.heartbeatIntervalMs, + kafkaConnectHost: this._config.kafkaConnectHost, + kafkaConnectPort: this._config.kafkaConnectPort, + metricsHandler: this._metricsHandler, + logger: this._logger, }); await this._connectorsManager.initializeConnectors(); this._allocator = new Allocator({ connectorsManager: this._connectorsManager, metricsHandler: this._metricsHandler, + maximumBucketsPerConnector: this._maximumBucketsPerConnector, logger: this._logger, }); - // initialize mongo client - await this._setupMongoClient(); // get currently valid buckets from mongo const validBuckets = await this._getBackbeatEnabledBuckets(); // listen to valid buckets @@ -291,13 +321,13 @@ class OplogPopulator { this._logger.info('OplogPopulator setup complete', { method: 'OplogPopulator.setup', }); - } catch (err) { + } catch (err) { this._logger.error('An error occured when setting up the OplogPopulator', { method: 'OplogPopulator.setup', error: err.description || err.message, }); throw errors.InternalError.customizeDescription(err.description); - } + } } /** diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index 2f9685618..71b4bcb05 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -10,6 +10,7 @@ const joiSchema = joi.object({ probeServer: probeServerJoi.default(), connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'), heartbeatIntervalMs: joi.number().default(10000), + singleChangeStream: joi.boolean().default(false), }); function configValidator(backbeatConfig, extConfig) { diff --git a/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js b/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js index cabee9443..0ef3e5332 100644 --- a/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js +++ b/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js @@ -4,10 +4,15 @@ class AllocationStrategy { /** * @constructor - * @param {Logger} logger logger object + * @param {Object} params params + * @param {Number} params.maximumBucketsPerConnector maximum number of buckets per connector + * @param {Function} params.addConnector function to add a connector + * @param {Logger} params.logger logger object */ - constructor(logger) { - this._logger = logger; + constructor(params) { + this._logger = params.logger; + this.maximumBucketsPerConnector = params.maximumBucketsPerConnector; + this._addConnector = params.addConnector.bind(this); } /** diff --git a/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js b/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js index 104616709..fc1977144 100644 --- a/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js +++ b/extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js @@ -1,3 +1,4 @@ +const { maximumBucketsPerConnector } = require('../constants'); const AllocationStrategy = require('./AllocationStrategy'); /** @@ -6,26 +7,22 @@ const AllocationStrategy = require('./AllocationStrategy'); * @classdesc LeastFullConnector is an allocation * strategy that assigns buckets to connectors based * on the number of buckets assigned to each connector. - * Connectors with the fewest buckets are filled first + * Connectors with the fewest buckets are filled first. + * If a connector reached the maximum number of buckets, + * a new connector is created. */ class LeastFullConnector extends AllocationStrategy { - - /** - * @constructor - * @param {Object} params params - * @param {Logger} params.logger logger object - */ - constructor(params) { - super(params.logger); - } - /** * Get best connector for assigning a bucket * @param {Connector[]} connectors available connectors * @returns {Connector} connector */ getConnector(connectors) { - return connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev)); + const connector = connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev)); + if (connector.buckets.length >= maximumBucketsPerConnector) { + return this._addConnector(); + } + return connector; } } diff --git a/extensions/oplogPopulator/constants.js b/extensions/oplogPopulator/constants.js index 269f11de1..fceb7962c 100644 --- a/extensions/oplogPopulator/constants.js +++ b/extensions/oplogPopulator/constants.js @@ -1,6 +1,12 @@ const constants = { + internalMongoDBCollectionPrefix: '__', bucketMetastore: '__metastore', defaultConnectorName: 'source-connector', + // Max length in a pipeline is equal to the MongoDB BSON max document size, + // so 16MB. To allow for other parameters in the pipeline, we round the max + // to 16 MB (16777216B) / 64 (max length of a bucket name) ~= 260000 + maxBucketPerConnector: 260000, + mongodbVersionWithImmutablePipelines: '6.0.0', defaultConnectorConfig: { 'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector', 'pipeline': '[]', diff --git a/extensions/oplogPopulator/modules/Allocator.js b/extensions/oplogPopulator/modules/Allocator.js index fa618dacb..96c16a963 100644 --- a/extensions/oplogPopulator/modules/Allocator.js +++ b/extensions/oplogPopulator/modules/Allocator.js @@ -3,9 +3,11 @@ const { errors } = require('arsenal'); const OplogPopulatorMetrics = require('../OplogPopulatorMetrics'); const LeastFullConnector = require('../allocationStrategy/LeastFullConnector'); +const constants = require('../constants'); const paramsJoi = joi.object({ connectorsManager: joi.object().required(), + maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), logger: joi.object().required(), @@ -31,6 +33,8 @@ class Allocator { this._logger = params.logger; this._allocationStrategy = new LeastFullConnector({ logger: params.logger, + maximumBucketsPerConnector: params.maximumBucketsPerConnector, + addConnector: this._connectorsManager.addConnector.bind(this._connectorsManager), }); this._metricsHandler = params.metricsHandler; // Stores connector assigned for each bucket diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index 2306585ae..7e001d953 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -2,6 +2,7 @@ const joi = require('joi'); const uuid = require('uuid'); const { errors } = require('arsenal'); const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper'); +const constants = require('../constants'); const connectorParams = joi.object({ name: joi.string().required(), @@ -11,6 +12,9 @@ const connectorParams = joi.object({ logger: joi.object().required(), kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), + maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector), + isPipelineImmutable: joi.boolean().default(false), + singleChangeStream: joi.boolean().default(false), }); /** @@ -34,6 +38,10 @@ class Connector { * @param {Logger} params.logger logger object * @param {string} params.kafkaConnectHost kafka connect host * @param {number} params.kafkaConnectPort kafka connect port + * @param {number} params.maximumBucketsPerConnector maximum number of + * buckets per connector + * @param {Boolean} params.singleChangeStream if true, one connector binds to + * one bucket maximum */ constructor(params) { joi.attempt(params, connectorParams); @@ -59,6 +67,9 @@ class Connector { kafkaConnectPort: params.kafkaConnectPort, logger: this._logger, }); + this._singleChangeStream = params.singleChangeStream; + this._maximumBucketsPerConnector = params.maximumBucketsPerConnector; + this._isPipelineImmutable = params.isPipelineImmutable; } /** @@ -233,6 +244,9 @@ class Connector { * @throws {InternalError} */ async addBucket(bucket, doUpdate = false) { + if (this._buckets.size > this._maximumBucketsPerConnector) { + throw errors.InternalError.customizeDescription('Connector reached maximum number of buckets'); + } this._buckets.add(bucket); this._updateConnectorState(true); try { @@ -260,7 +274,19 @@ class Connector { this._buckets.delete(bucket); this._updateConnectorState(true); try { - await this.updatePipeline(doUpdate); + if (this._isPipelineImmutable && this._buckets.size > 1) { + this.logger.warning('Removing a bucket from an immutable pipeline', { + method: 'Connector.removeBucket', + connector: this._name, + bucket, + }); + } else if (this._isPipelineImmutable) { + // If the pipeline is immutable and only one bucket is left, + // we can destroy the connector, so it will be recreated with + // a new bucket later. + return this.destroy(); + } + return this.updatePipeline(doUpdate); } catch (err) { this._logger.error('Error while removing bucket from connector', { method: 'Connector.removeBucket', @@ -274,11 +300,16 @@ class Connector { /** * Makes new connector pipeline that includes - * buckets assigned to this connector + * buckets assigned to this connector. If the + * singleChangeStream parameter is set to true, + * returns a pipeline that listens to all collections. * @param {string[]} buckets list of bucket names * @returns {string} new connector pipeline */ _generateConnectorPipeline(buckets) { + if (this._singleChangeStream) { + return '[]'; + } const pipeline = [ { $match: { diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index 1d34b9a49..5b6752513 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -20,6 +20,9 @@ const paramsJoi = joi.object({ heartbeatIntervalMs: joi.number().required(), kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), + singleChangeStream: joi.boolean().default(false), + isPipelineImmutable: joi.boolean().default(false), + maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), logger: joi.object().required(), @@ -40,6 +43,12 @@ class ConnectorsManager { * @constructor * @param {Object} params params * @param {number} params.nbConnectors number of connectors to have + * @param {boolean} params.maximumBucketsPerConnector maximum number of + * buckets per connector + * @param {boolean} params.isPipelineImmutable true if the mongodb pipelines + * are immutable + * @param {boolean} params.singleChangeStream wether to use a single change + * stream per bucket * @param {string} params.database MongoDB database to use (for connector) * @param {string} params.mongoUrl MongoDB connection url * @param {string} params.oplogTopic topic to use for oplog @@ -69,6 +78,13 @@ class ConnectorsManager { this._connectors = []; // used for initial clean up of old connector pipelines this._oldConnectors = []; + this._singleChangeStream = params.singleChangeStream; + this._maximumBucketsPerConnector = params.maximumBucketsPerConnector; + this._isPipelineImmutable = params.isPipelineImmutable; + + if (this._singleChangeStream) { + this._nbConnectors = 1; + } } /** @@ -120,6 +136,9 @@ class ConnectorsManager { logger: this._logger, kafkaConnectHost: this._kafkaConnectHost, kafkaConnectPort: this._kafkaConnectPort, + maximumBucketsPerConnector: this._maximumBucketsPerConnector, + isPipelineImmutable: this._isPipelineImmutable, + singleChangeStream: this._singleChangeStream, }); return connector; } @@ -129,7 +148,7 @@ class ConnectorsManager { * @param {Object} connectorConfig connector config * @returns {string[]} list of buckets */ - _extractBucketsFromConfig(connectorConfig) { + _extractBucketsFromConfig(connectorConfig) { const pipeline = connectorConfig.pipeline ? JSON.parse(connectorConfig.pipeline) : null; if (!pipeline || pipeline.length === 0) { @@ -203,6 +222,12 @@ class ConnectorsManager { } // Add connectors if required number of connectors not reached const nbConnectorsToAdd = this._nbConnectors - this._connectors.length; + if (nbConnectorsToAdd > 0 && this._singleChangeStream) { + this._logger.warn('Single change stream is enabled but multiple connectors are running', { + method: 'ConnectorsManager.initializeConnectors', + numberOfActiveConnectors: this._connectors.length, + }); + } for (let i = 0; i < nbConnectorsToAdd; i++) { const newConnector = this.addConnector(); this._connectors.push(newConnector); @@ -235,7 +260,7 @@ class ConnectorsManager { this._metricsHandler.onConnectorDestroyed(); this._logger.info('Successfully destroyed a connector', { method: 'ConnectorsManager._spawnOrDestroyConnector', - connector: connector.name + connector: connector.name, }); return true; } else if (!connector.isRunning && connector.bucketCount > 0) { @@ -243,7 +268,7 @@ class ConnectorsManager { this._metricsHandler.onConnectorsInstantiated(false); this._logger.info('Successfully spawned a connector', { method: 'ConnectorsManager._spawnOrDestroyConnector', - connector: connector.name + connector: connector.name, }); return true; } else if (connector.isRunning) { diff --git a/tests/unit/oplogPopulator/Allocator.js b/tests/unit/oplogPopulator/Allocator.js index 3d822a2c0..81348a26c 100644 --- a/tests/unit/oplogPopulator/Allocator.js +++ b/tests/unit/oplogPopulator/Allocator.js @@ -17,6 +17,7 @@ const defaultConnectorParams = { logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + maximumBucketsPerConnector: 10, }; const connector1 = new Connector({ @@ -31,6 +32,7 @@ describe('Allocator', () => { allocator = new Allocator({ connectorsManager: { connectors: [], + addConnector: () => {}, }, metricsHandler: new OplogPopulatorMetrics(logger), logger, diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 7ce464b23..5bc0776f2 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -96,6 +96,7 @@ describe('ConnectorsManager', () => { kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, metricsHandler: new OplogPopulatorMetrics(logger), + maximumBucketsPerConnector: 1000, logger, }); }); diff --git a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js index c4f54d18b..bcc705944 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js +++ b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js @@ -32,6 +32,11 @@ describe('LeastFullConnector', () => { let strategy; beforeEach(() => { strategy = new LeastFullConnector({ + addConnector: () => new Connector({ + name: 'example-connector-3', + buckets: [], + ...defaultConnectorParams, + }), logger, }); });