Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
williamlardier committed Sep 10, 2024
1 parent 1d94901 commit 758ed65
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 43 deletions.
76 changes: 53 additions & 23 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OplogPopulator {
this._logger = params.logger;
this._changeStreamWrapper = null;
this._allocator = null;
this._connectorsManager = null;
this._connectorsManager = null;
// contains OplogPopulatorUtils class of each supported extension
this._extHelpers = {};
// MongoDB related
Expand All @@ -78,9 +78,9 @@ class OplogPopulator {
async _setupMongoClient() {
try {
const client = await MongoClient.connect(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
});
// connect to metadata DB
this._mongoClient = client.db(this._database, {
Expand Down Expand Up @@ -185,7 +185,7 @@ class OplogPopulator {
// remove bucket if no longer backbeat enabled
if (isListeningToBucket && !isBackbeatEnabled) {
await this._allocator.stopListeningToBucket(change.documentKey._id);
// add bucket if it became backbeat enabled
// add bucket if it became backbeat enabled
} else if (!isListeningToBucket && isBackbeatEnabled) {
await this._allocator.listenToBucket(change.documentKey._id, eventDate);
}
Expand Down Expand Up @@ -242,35 +242,65 @@ class OplogPopulator {
this._changeStreamWrapper.start();
}

_isPipelineImmutable() {
return semver.gte(this._mongoVersion, constants.mongodbVersionWithImmutablePipelines);
}

/**
* Sets the OplogPopulator
* @returns {Promise|undefined} undefined
* @throws {InternalError}
*/
async setup() {
try {
this._loadOplogHelperClasses();
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
try {
this._loadOplogHelperClasses();
// initialize mongo client
await this._setupMongoClient();

if (this._isPipelineImmutable()) {
// In this case, mongodb does not support reusing a
// resume token from a different pipeline. In other
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
this._maximumBucketsPerConnector = 1;
} else {
// In this case, we can have multiple buckets per
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
this._maximumBucketsPerConnector = constants.maxBucketPerConnector;
}

// If the flag useSingleChangeStream is set to true, we
// set the max number to infinity, and the number of connectors
// to 1.
if (this._config.changeStream.singleChangeStream) {
this._maximumBucketsPerConnector = Infinity;
this._config.numberOfConnectors = 1;
}
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
singleChangeStream: this._config.changeStream.singleChangeStream,
isPipelineImmutable: this._isPipelineImmutable(),
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
await this._connectorsManager.initializeConnectors();
this._allocator = new Allocator({
connectorsManager: this._connectorsManager,
metricsHandler: this._metricsHandler,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
logger: this._logger,
});
// initialize mongo client
await this._setupMongoClient();
// get currently valid buckets from mongo
const validBuckets = await this._getBackbeatEnabledBuckets();
// listen to valid buckets
Expand All @@ -291,13 +321,13 @@ class OplogPopulator {
this._logger.info('OplogPopulator setup complete', {
method: 'OplogPopulator.setup',
});
} catch (err) {
} catch (err) {
this._logger.error('An error occured when setting up the OplogPopulator', {
method: 'OplogPopulator.setup',
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions extensions/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const joiSchema = joi.object({
probeServer: probeServerJoi.default(),
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),
heartbeatIntervalMs: joi.number().default(10000),
singleChangeStream: joi.boolean().default(false),
});

function configValidator(backbeatConfig, extConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ class AllocationStrategy {

/**
* @constructor
* @param {Logger} logger logger object
* @param {Object} params params
* @param {Number} params.maximumBucketsPerConnector maximum number of buckets per connector
* @param {Function} params.addConnector function to add a connector
* @param {Logger} params.logger logger object
*/
constructor(logger) {
this._logger = logger;
constructor(params) {
this._logger = params.logger;
this.maximumBucketsPerConnector = params.maximumBucketsPerConnector;
this._addConnector = params.addConnector.bind(this);
}

/**
Expand Down
21 changes: 9 additions & 12 deletions extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { maximumBucketsPerConnector } = require('../constants');
const AllocationStrategy = require('./AllocationStrategy');

/**
Expand All @@ -6,26 +7,22 @@ const AllocationStrategy = require('./AllocationStrategy');
* @classdesc LeastFullConnector is an allocation
* strategy that assigns buckets to connectors based
* on the number of buckets assigned to each connector.
* Connectors with the fewest buckets are filled first
* Connectors with the fewest buckets are filled first.
* If a connector reached the maximum number of buckets,
* a new connector is created.
*/
class LeastFullConnector extends AllocationStrategy {

/**
* @constructor
* @param {Object} params params
* @param {Logger} params.logger logger object
*/
constructor(params) {
super(params.logger);
}

/**
* Get best connector for assigning a bucket
* @param {Connector[]} connectors available connectors
* @returns {Connector} connector
*/
getConnector(connectors) {
return connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev));
const connector = connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev));
if (connector.buckets.length >= maximumBucketsPerConnector) {
return this._addConnector();
}
return connector;
}
}

Expand Down
6 changes: 6 additions & 0 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
const constants = {
internalMongoDBCollectionPrefix: '__',
bucketMetastore: '__metastore',
defaultConnectorName: 'source-connector',
// Max length in a pipeline is equal to the MongoDB BSON max document size,
// so 16MB. To allow for other parameters in the pipeline, we round the max
// to 16 MB (16777216B) / 64 (max length of a bucket name) ~= 260000
maxBucketPerConnector: 260000,
mongodbVersionWithImmutablePipelines: '6.0.0',
defaultConnectorConfig: {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
Expand Down
4 changes: 4 additions & 0 deletions extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ const { errors } = require('arsenal');

const OplogPopulatorMetrics = require('../OplogPopulatorMetrics');
const LeastFullConnector = require('../allocationStrategy/LeastFullConnector');
const constants = require('../constants');

const paramsJoi = joi.object({
connectorsManager: joi.object().required(),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
Expand All @@ -31,6 +33,8 @@ class Allocator {
this._logger = params.logger;
this._allocationStrategy = new LeastFullConnector({
logger: params.logger,
maximumBucketsPerConnector: params.maximumBucketsPerConnector,
addConnector: this._connectorsManager.addConnector.bind(this._connectorsManager),
});
this._metricsHandler = params.metricsHandler;
// Stores connector assigned for each bucket
Expand Down
35 changes: 33 additions & 2 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const joi = require('joi');
const uuid = require('uuid');
const { errors } = require('arsenal');
const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper');
const constants = require('../constants');

const connectorParams = joi.object({
name: joi.string().required(),
Expand All @@ -11,6 +12,9 @@ const connectorParams = joi.object({
logger: joi.object().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
isPipelineImmutable: joi.boolean().default(false),
singleChangeStream: joi.boolean().default(false),
});

/**
Expand All @@ -34,6 +38,10 @@ class Connector {
* @param {Logger} params.logger logger object
* @param {string} params.kafkaConnectHost kafka connect host
* @param {number} params.kafkaConnectPort kafka connect port
* @param {number} params.maximumBucketsPerConnector maximum number of
* buckets per connector
* @param {Boolean} params.singleChangeStream if true, one connector binds to
* one bucket maximum
*/
constructor(params) {
joi.attempt(params, connectorParams);
Expand All @@ -59,6 +67,9 @@ class Connector {
kafkaConnectPort: params.kafkaConnectPort,
logger: this._logger,
});
this._singleChangeStream = params.singleChangeStream;
this._maximumBucketsPerConnector = params.maximumBucketsPerConnector;
this._isPipelineImmutable = params.isPipelineImmutable;
}

/**
Expand Down Expand Up @@ -233,6 +244,9 @@ class Connector {
* @throws {InternalError}
*/
async addBucket(bucket, doUpdate = false) {
if (this._buckets.size > this._maximumBucketsPerConnector) {
throw errors.InternalError.customizeDescription('Connector reached maximum number of buckets');
}
this._buckets.add(bucket);
this._updateConnectorState(true);
try {
Expand Down Expand Up @@ -260,7 +274,19 @@ class Connector {
this._buckets.delete(bucket);
this._updateConnectorState(true);
try {
await this.updatePipeline(doUpdate);
if (this._isPipelineImmutable && this._buckets.size > 1) {
this.logger.warning('Removing a bucket from an immutable pipeline', {
method: 'Connector.removeBucket',
connector: this._name,
bucket,
});
} else if (this._isPipelineImmutable) {
// If the pipeline is immutable and only one bucket is left,
// we can destroy the connector, so it will be recreated with
// a new bucket later.
return this.destroy();
}
return this.updatePipeline(doUpdate);
} catch (err) {
this._logger.error('Error while removing bucket from connector', {
method: 'Connector.removeBucket',
Expand All @@ -274,11 +300,16 @@ class Connector {

/**
* Makes new connector pipeline that includes
* buckets assigned to this connector
* buckets assigned to this connector. If the
* singleChangeStream parameter is set to true,
* returns a pipeline that listens to all collections.
* @param {string[]} buckets list of bucket names
* @returns {string} new connector pipeline
*/
_generateConnectorPipeline(buckets) {
if (this._singleChangeStream) {
return '[]';
}
const pipeline = [
{
$match: {
Expand Down
Loading

0 comments on commit 758ed65

Please sign in to comment.