Skip to content

Commit

Permalink
Code improvements
Browse files Browse the repository at this point in the history
- Ensure we do not update the pipelines more than once at the
  same time.
- Ensure we clear the retained buckets only when the pipeline
  is updated.

Issue: BB-601
  • Loading branch information
williamlardier committed Sep 18, 2024
1 parent 3e67886 commit d17510b
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 40 deletions.
4 changes: 2 additions & 2 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ class OplogPopulator {
});
// For now, we always use the RetainBucketsDecorator
// so, we map the events from the classes
this._connectorsManager.on('connector-destroyed', connector =>
this._allocationStrategy.onConnectorDestroyed(connector));
this._connectorsManager.on('connector-updated', connector =>
this._allocationStrategy.onConnectorUpdatedOrDestroyed(connector));
this._allocator.on('bucket-removed', (bucket, connector) =>
this._allocationStrategy.onBucketRemoved(bucket, connector));
this._connectorsManager.on('connectors-reconciled', bucketsExceedingLimit => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ class AllocationStrategy {

/**
* Assess if a pipeline can be updated
* @param {Connector} connector connector
* @returns {Boolean} true if the connector can be updated
*/
canUpdate(connector) { // eslint-disable-line no-unused-vars
canUpdate() {
throw errors.NotImplemented;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ class ImmutableConnector extends AllocationStrategy {
/**
* Assess if a pipeline can be updated. With the immutable
* strategy, a connector cannot be updated.
* @param {Connector} connector connector
* @returns {false} false
*/
canUpdate(connector) { // eslint-disable-line no-unused-vars
canUpdate() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ class LeastFullConnector extends AllocationStrategy {

/**
* Assess if a pipeline can be updated.
* @param {Connector} connector connector
* @returns {true} true
*/
canUpdate(connector) { // eslint-disable-line no-unused-vars
canUpdate() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,12 @@ class RetainBucketsDecorator extends AllocationStrategy {
}

/**
* Callback when a connector is destroyed.
* Callback when a connector is destroyed or
* updated.
* @param {Connector} connector connector
* @returns {undefined}
*/
onConnectorDestroyed(connector) {
this._cleanupRetainedBucket(connector);
}

/**
* Cleanup retained buckets for a connector
* @param {Connector} connector connector
* @returns {undefined}
*/
_cleanupRetainedBucket(connector) {
onConnectorUpdatedOrDestroyed(connector) {
// When a connector is updated or destroyed, the retained
// buckets are removed from the connector
this._retainedBuckets.forEach((conn, bucket) => {
Expand All @@ -83,15 +75,10 @@ class RetainBucketsDecorator extends AllocationStrategy {
/**
* Assess if a pipeline can be updated. If the connector can
* be updated, the bucket is added as retained.
* @param {Connector} connector connector
* @returns {Boolean} true if the connector can be updated
*/
canUpdate(connector) {
const res = this._strategy.canUpdate();
if (res) {
this._cleanupRetainedBucket(connector);
}
return res;
canUpdate() {
return this._strategy.canUpdate();
}

/**
Expand Down
17 changes: 13 additions & 4 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class ConnectorsManager extends EventEmitter {
async _spawnOrDestroyConnector(connector) {
try {
if (connector.isRunning && connector.bucketCount === 0) {
this.emit('connector-destroyed', connector);
this.emit('connector-updated', connector);
await connector.destroy();
this._metricsHandler.onConnectorDestroyed();
this._logger.info('Successfully destroyed a connector', {
Expand All @@ -262,8 +262,12 @@ class ConnectorsManager extends EventEmitter {
connector: connector.name,
});
return true;
} else if (connector.isRunning && this._allocationStrategy.canUpdate(connector)) {
return connector.updatePipeline(true);
} else if (connector.isRunning && this._allocationStrategy.canUpdate()) {
const isPipelineUpdated = connector.updatePipeline(true);
if (isPipelineUpdated) {
this.emit('connector-updated', connector);
}
return isPipelineUpdated;
}

return false;
Expand Down Expand Up @@ -374,8 +378,13 @@ class ConnectorsManager extends EventEmitter {
* @returns {undefined}
*/
scheduleConnectorUpdates() {
let updateInProgress = false;
schedule.scheduleJob(this._cronRule, async () => {
await this._updateConnectors();
if (!updateInProgress) {
updateInProgress = true;
await this._updateConnectors();
updateInProgress = false;
}
});
}

Expand Down
18 changes: 17 additions & 1 deletion tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const assert = require('assert');
const sinon = require('sinon');
const werelogs = require('werelogs');
const schedule = require('node-schedule');

const Connector =
require('../../../extensions/oplogPopulator/modules/Connector');
Expand Down Expand Up @@ -239,7 +240,7 @@ describe('ConnectorsManager', () => {
connector1._buckets = new Set();
const emitStub = sinon.stub(connectorsManager, 'emit');
await connectorsManager._spawnOrDestroyConnector(connector1);
assert(emitStub.calledOnceWith('connector-destroyed', connector1));
assert(emitStub.calledOnceWith('connector-updated', connector1));
});

it('should spawn a non running connector when buckets are configured', async () => {
Expand Down Expand Up @@ -423,5 +424,20 @@ describe('ConnectorsManager', () => {
assert(connectorRestartStub.notCalled);
});
});

describe('scheduleConnectorUpdates', () => {
afterEach(() => {
sinon.restore();
});

it('should schedule connector updates', () => {
const updateConnectorsStub = sinon.stub(connectorsManager, '_updateConnectors');
sinon.stub(schedule, 'scheduleJob').callsFake((rule, cb) => {
cb();
});
connectorsManager.scheduleConnectorUpdates();
assert(updateConnectorsStub.called);
});
});
});

Original file line number Diff line number Diff line change
Expand Up @@ -80,29 +80,29 @@ const connector2 = new Connector({
it('should remove retained buckets for connector', () => {
decorator._retainedBuckets.set('bucket1', connector1);
decorator._retainedBuckets.set('bucket2', connector2);
decorator.onConnectorDestroyed(connector1);
decorator.onConnectorUpdatedOrDestroyed(connector1);
assert.strictEqual(decorator._retainedBuckets.size, 1);
assert.strictEqual(decorator._retainedBuckets.get('bucket2'), connector2);
});

it('should not remove retained buckets for other connectors', () => {
decorator._retainedBuckets.set('bucket1', connector1);
decorator._retainedBuckets.set('bucket2', connector2);
decorator.onConnectorDestroyed(connector2);
decorator.onConnectorUpdatedOrDestroyed(connector2);
assert.strictEqual(decorator._retainedBuckets.size, 1);
assert.strictEqual(decorator._retainedBuckets.get('bucket1'), connector1);
});
});

describe('canUpdate', () => {
it('should return the strategy result', async () => {
const result = await decorator.canUpdate(connector1);
assert.strictEqual(result, strategy.canUpdate(connector1));
const result = await decorator.canUpdate();
assert.strictEqual(result, strategy.canUpdate());
});

it('should remove from retained buckets if the strategy allows', async () => {
sinon.stub(strategy, 'canUpdate').returns(true);
await decorator.canUpdate(connector1);
await decorator.canUpdate();
assert.strictEqual(decorator._retainedBuckets.size, 0);
});
});
Expand Down
9 changes: 5 additions & 4 deletions tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ describe('OplogPopulator', () => {
assert(initializeConnectorsManagerStub.calledOnce);
});

it('should bind the connector-destroyed event from the connectors manager', async () => {
it('should bind the connector-updated event from the connectors manager', async () => {
const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves();
const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream');
const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager');
Expand All @@ -207,9 +207,10 @@ describe('OplogPopulator', () => {
assert(getBackbeatEnabledBucketsStub.calledOnce);
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
const onConnectorDestroyedStub = sinon.stub(oplogPopulator._allocationStrategy, 'onConnectorDestroyed');
oplogPopulator._connectorsManager.emit('connector-destroyed');
assert(onConnectorDestroyedStub.calledOnce);
const onConnectorUpdatedOrDestroyedStub =
sinon.stub(oplogPopulator._allocationStrategy, 'onConnectorUpdatedOrDestroyed');
oplogPopulator._connectorsManager.emit('connector-updated');
assert(onConnectorUpdatedOrDestroyedStub.calledOnce);
});

it('should bind the bucket-removed event from the allocator', async () => {
Expand Down

0 comments on commit d17510b

Please sign in to comment.