Skip to content

Commit

Permalink
Update oplog populator to detect max bucket per connector
Browse files Browse the repository at this point in the history
- Check the mongodb version
- Adapt the _maximumBucketsPerConnector absed on the version
- Enforce 1 connector if the singleChangeStream mode is set
- Reorder calls in the function, and unit testing

Issue: BB-601
  • Loading branch information
williamlardier committed Sep 11, 2024
1 parent f7fd8f0 commit fe5cafb
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 23 deletions.
82 changes: 59 additions & 23 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OplogPopulator {
this._logger = params.logger;
this._changeStreamWrapper = null;
this._allocator = null;
this._connectorsManager = null;
this._connectorsManager = null;
// contains OplogPopulatorUtils class of each supported extension
this._extHelpers = {};
// MongoDB related
Expand All @@ -78,9 +78,9 @@ class OplogPopulator {
async _setupMongoClient() {
try {
const client = await MongoClient.connect(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
});
// connect to metadata DB
this._mongoClient = client.db(this._database, {
Expand Down Expand Up @@ -242,35 +242,71 @@ class OplogPopulator {
this._changeStreamWrapper.start();
}

_isPipelineImmutable() {
return semver.gte(this._mongoVersion, constants.mongodbVersionWithImmutablePipelines);
}

async _initializeConnectorsManager() {
return this._connectorsManager.initializeConnectors();
}

/**
* Sets the OplogPopulator
* @returns {Promise|undefined} undefined
* @throws {InternalError}
*/
async setup() {
try {
this._loadOplogHelperClasses();
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
try {
this._loadOplogHelperClasses();
// initialize mongo client
await this._setupMongoClient();

if (this._isPipelineImmutable()) {
// In this case, mongodb does not support reusing a
// resume token from a different pipeline. In other
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
this._maximumBucketsPerConnector = 1;
} else {
// In this case, we can have multiple buckets per
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
this._maximumBucketsPerConnector = constants.maxBucketPerConnector;
}
// If the flag useSingleChangeStream is set to true, we
// set the max number to infinity, and the number of connectors
// to 1.
if (this._config.singleChangeStream) {
this._maximumBucketsPerConnector = Infinity;
this._config.numberOfConnectors = 1;
}

this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
singleChangeStream: this._config.singleChangeStream,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
isPipelineImmutable: this._isPipelineImmutable(),
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
await this._connectorsManager.initializeConnectors();
await this._initializeConnectorsManager();

this._allocator = new Allocator({
connectorsManager: this._connectorsManager,
metricsHandler: this._metricsHandler,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
logger: this._logger,
});
// initialize mongo client
await this._setupMongoClient();
// get currently valid buckets from mongo
const validBuckets = await this._getBackbeatEnabledBuckets();
// listen to valid buckets
Expand All @@ -291,13 +327,13 @@ class OplogPopulator {
this._logger.info('OplogPopulator setup complete', {
method: 'OplogPopulator.setup',
});
} catch (err) {
} catch (err) {
this._logger.error('An error occured when setting up the OplogPopulator', {
method: 'OplogPopulator.setup',
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
}
}

/**
Expand Down
68 changes: 68 additions & 0 deletions tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const OplogPopulator =
require('../../../extensions/oplogPopulator/OplogPopulator');
const ChangeStream =
require('../../../lib/wrappers/ChangeStream');
const ConnectorsManager = require('../../../extensions/oplogPopulator/modules/ConnectorsManager');

const oplogPopulatorConfig = {
topic: 'oplog',
Expand Down Expand Up @@ -113,6 +114,73 @@ describe('OplogPopulator', () => {
});
});

describe('_isPipelineImmutable', () => {
it('should return true if pipeline is immutable', () => {
oplogPopulator._mongoVersion = '6.0.0';
assert(oplogPopulator._isPipelineImmutable());
});

it('should return false if pipeline is not immutable', () => {
oplogPopulator._mongoVersion = '5.0.0';
assert(!oplogPopulator._isPipelineImmutable());
});
});

describe('setup', () => {

it('should handle error during setup', async () => {
const error = new Error('InternalError');
const loadOplogHelperClassesStub = sinon.stub(oplogPopulator, '_loadOplogHelperClasses').throws(error);
const loggerErrorStub = sinon.stub(oplogPopulator._logger, 'error');

await assert.rejects(oplogPopulator.setup(), error);

assert(loadOplogHelperClassesStub.calledOnce);
assert(loggerErrorStub.calledWith('An error occured when setting up the OplogPopulator', {
method: 'OplogPopulator.setup',
error: 'InternalError',
}));
});

it('should setup oplog populator', async () => {
const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves();
const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream');
const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager');
const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]);

await oplogPopulator.setup();

assert(setupMongoClientStub.calledOnce);
assert(getBackbeatEnabledBucketsStub.calledOnce);
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
});
});

describe('_initializeConnectorsManager', () => {
it('should initialize connectors manager', async () => {
oplogPopulator._connectorsManager = new ConnectorsManager({
nbConnectors: oplogPopulator._config.numberOfConnectors,
singleChangeStream: oplogPopulator._config.singleChangeStream,
maximumBucketsPerConnector: oplogPopulator._maximumBucketsPerConnector,
isPipelineImmutable: oplogPopulator._isPipelineImmutable(),
database: oplogPopulator._database,
mongoUrl: oplogPopulator._mongoUrl,
oplogTopic: oplogPopulator._config.topic,
cronRule: oplogPopulator._config.connectorsUpdateCronRule,
prefix: oplogPopulator._config.prefix,
heartbeatIntervalMs: oplogPopulator._config.heartbeatIntervalMs,
kafkaConnectHost: oplogPopulator._config.kafkaConnectHost,
kafkaConnectPort: oplogPopulator._config.kafkaConnectPort,
metricsHandler: oplogPopulator._metricsHandler,
logger: oplogPopulator._logger,
});
const connectorsManagerStub = sinon.stub(oplogPopulator._connectorsManager, 'initializeConnectors');
await oplogPopulator._initializeConnectorsManager();
assert(connectorsManagerStub.calledOnce);
});
});

describe('_setupMongoClient', () => {
it('should connect to mongo and setup client', async () => {
const collectionStub = sinon.stub();
Expand Down

0 comments on commit fe5cafb

Please sign in to comment.