Skip to content

Commit

Permalink
squash: post-review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
philipyoo committed Jun 26, 2018
1 parent 718fbc9 commit cd8ca97
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 42 deletions.
14 changes: 7 additions & 7 deletions docs/crr-pause-resume.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Description

This feature offers a way for users to manually pause and resume cross-region
replication (CRR) operations by site.
replication (CRR) operations by storage locations.

## Design

Expand Down Expand Up @@ -37,17 +37,17 @@ from its last offset.
* POST `/_/crr/pause`

This POST request is to manually pause the cross-region replication service
for all sites configured as destination replication endpoints.
for all locations configured as destination replication endpoints.

Response:
```sh
{}
```

* POST `/_/crr/pause/<site>`
* POST `/_/crr/pause/<location-name>`

This POST request is to manually pause the cross-region replication service
for a specified site configured as a destination replication endpoint.
for a specified location configured as a destination replication endpoint.

Response:
```sh
Expand All @@ -57,17 +57,17 @@ from its last offset.
* POST `/_/crr/resume`

This POST request is to manually resume the cross-region replication
service for all sites configured as destination replication endpoints.
service for all locations configured as destination replication endpoints.

Response:
```sh
{}
```

* POST `/_/crr/resume/<site>`
* POST `/_/crr/resume/<location-name>`

This POST request is to manually resume the cross-region replication
service for a specified site configured as a destination replication
service for a specified location configured as a destination replication
endpoint.

Response:
Expand Down
52 changes: 39 additions & 13 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ class QueueProcessor extends EventEmitter {
this._consumer = null;
this._mProducer = mProducer;
this.site = site;
// redis pub/sub for pause/resume
this._redis = new Redis(redisConfig);
this._setupRedis();

this.echoMode = false;

Expand All @@ -88,6 +85,7 @@ class QueueProcessor extends EventEmitter {
this.destHTTPAgent = new http.Agent({ keepAlive: true });

this._setupVaultclientCache();
this._setupRedis(redisConfig);

// FIXME support multiple scality destination sites
if (Array.isArray(destConfig.bootstrapList)) {
Expand Down Expand Up @@ -205,30 +203,58 @@ class QueueProcessor extends EventEmitter {
this.accountCredsCache = {};
}

_setupRedis() {
_setupRedis(redisConfig) {
// redis pub/sub for pause/resume
const redis = new Redis(redisConfig);
// redis subscribe to site specific channel
const channelName = `${this.repConfig.topic}-${this.site}`;
this._redis.subscribe(channelName, err => {
redis.subscribe(channelName, err => {
if (err) {
this.logger.fatal('queue processor failed to subscribe to ' +
`crr redis channel for site ${this.site}`,
`crr redis channel for location ${this.site}`,
{ method: 'QueueProcessor.constructor',
error: err });
process.exit(1);
}
this._redis.on('message', (channel, message) => {
if (channel === channelName && this._consumer[message]) {
// should only validate valid functions that can be invoked
// from the redis pub/sub
const validCmds = ['pauseService', 'resumeService'];
if (validCmds.includes(message)) {
this._consumer[message](this.site);
redis.on('message', (channel, message) => {
const validActions = {
pauseService: this._pauseService.bind(this),
resumeService: this._resumeService.bind(this),
};
try {
const { action } = JSON.parse(message);
const cmd = validActions[action];
if (channel === channelName && typeof cmd === 'function') {
cmd();
}
} catch (e) {
this.logger.error('error parsing redis sub message', {
method: 'QueueProcessor._setupRedis',
error: e,
});
}
});
});
}

/**
* Pause replication consumers
* @return {undefined}
*/
_pauseService() {
this._consumer.pause(this.site);
this.logger.info(`paused replication for location: ${this.site}`);
}

/**
* Resume replication consumers
* @return {undefined}
*/
_resumeService() {
this._consumer.resume(this.site);
this.logger.info(`resumed replication for location: ${this.site}`);
}

getStateVars() {
return {
sourceConfig: this.sourceConfig,
Expand Down
21 changes: 11 additions & 10 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,26 +518,27 @@ class BackbeatConsumer extends EventEmitter {
* @param {string} site - name of site
* @return {undefined}
*/
pauseService(site) {
pause(site) {
// Use of KafkaConsumer#pause did not work. Using alternative
// of unsubscribe/subscribe
if (this._consumerReady) {
this._consumer.unsubscribe();
this._consumerReady = false;
this._log.info(`paused replication for site ${site}`);
}
this._consumer.unsubscribe();
this._log.debug(`paused consumer for location: ${site}`, {
method: 'BackbeatConsumer.pause',
});
}

/**
* resume the kafka consumer
* @param {string} site - name of site
* @return {undefined}
*/
resumeService(site) {
if (!this._consumerReady) {
resume(site) {
// if not subscribed, then subscribe
if (this._consumer.subscription().length === 0) {
this._consumer.subscribe([this._topic]);
this._consumerReady = true;
this._log.info(`resumed replication for site ${site}`);
this._log.debug(`resumed consumer for location: ${site}`, {
method: 'BackbeatConsumer.resume',
});
}
}

Expand Down
16 changes: 10 additions & 6 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,12 @@ class BackbeatAPI {
} else {
sites = [details.site];
}
sites.map(s => `${this._crrTopic}-${s}`).forEach(channel => {
this._redisPublisher.publish(channel, 'pauseService');
sites.forEach(site => {
const channel = `${this._crrTopic}-${site}`;
const message = JSON.stringify({ action: 'pauseService' });
this._redisPublisher.publish(channel, message);
});
this._logger.info('replication service paused');
this._logger.info(`replication service paused for locations: ${sites}`);
return cb(null, {});
}

Expand All @@ -570,10 +572,12 @@ class BackbeatAPI {
} else {
sites = [details.site];
}
sites.map(s => `${this._crrTopic}-${s}`).forEach(channel => {
this._redisPublisher.publish(channel, 'resumeService');
sites.forEach(site => {
const channel = `${this._crrTopic}-${site}`;
const message = JSON.stringify({ action: 'resumeService' });
this._redisPublisher.publish(channel, message);
});
this._logger.info('replication service resumed');
this._logger.info(`replication service resumed for locations ${sites}`);
return cb(null, {});
}

Expand Down
4 changes: 2 additions & 2 deletions tests/functional/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ describe('BackbeatConsumer', () => {
consumer.on('consumed', messagesConsumed => {
totalConsumed += messagesConsumed;
if (totalConsumed === 1) {
consumer.pauseService();
consumer.pause();
next();
}
});
Expand Down Expand Up @@ -166,7 +166,7 @@ describe('BackbeatConsumer', () => {
});
},
next => {
consumer.resumeService();
consumer.resume();
assert.equal(kafkaConsumer.subscription().length, 1);
consumer.on('consumed', messagesConsumed => {
totalConsumed += messagesConsumed;
Expand Down
15 changes: 11 additions & 4 deletions tests/functional/api/BackbeatServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -1007,11 +1007,15 @@ describe('Backbeat Server', () => {
setTimeout(() => {
assert.strictEqual(cache1.length, 1);
assert.strictEqual(cache2.length, 1);

assert.deepStrictEqual(cache1[0].channel, channel1);
assert.deepStrictEqual(cache2[0].channel, channel2);

assert.deepStrictEqual(cache1[0].message, 'pauseService');
assert.deepStrictEqual(cache2[0].message, 'pauseService');
const message1 = JSON.parse(cache1[0].message);
const message2 = JSON.parse(cache2[0].message);
const expected = { action: 'pauseService' };
assert.deepStrictEqual(message1, expected);
assert.deepStrictEqual(message2, expected);
done();
}, 1000);
});
Expand All @@ -1032,8 +1036,11 @@ describe('Backbeat Server', () => {
assert.deepStrictEqual(cache1[0].channel, channel1);
assert.deepStrictEqual(cache2[0].channel, channel2);

assert.deepStrictEqual(cache1[0].message, 'resumeService');
assert.deepStrictEqual(cache2[0].message, 'resumeService');
const message1 = JSON.parse(cache1[0].message);
const message2 = JSON.parse(cache2[0].message);
const expected = { action: 'resumeService' };
assert.deepStrictEqual(message1, expected);
assert.deepStrictEqual(message2, expected);
done();
}, 1000);
});
Expand Down

0 comments on commit cd8ca97

Please sign in to comment.