Skip to content

Commit

Permalink
ft: add pause/resume api routes, tests, fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
philipyoo committed Jun 27, 2018
1 parent 49e461f commit b8502e2
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 50 deletions.
40 changes: 31 additions & 9 deletions docs/crr-pause-resume.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Description

This feature offers a way for users to manually pause and resume cross-region
replication (CRR) operations by site.
replication (CRR) operations by storage locations.

## Design

Expand All @@ -17,7 +17,10 @@ containers setup for replication.
We want to pause and resume the CRR service at the lowest level (in our case,
pause and resume all Kafka Consumers subscribed to the CRR topic). We want to
perform these actions at the lowest level in order to stop processing any
current batch jobs.
replication entries that might have already been populated by Kafka but have yet
to be consumed and queued for replication. Any entries that have already been
consumed by the Kafka Consumer and are being processed for replication will
continue to finish replication and will not be paused.

The API will have a Redis instance publishing messages to a specific channel.
All CRR Kafka Consumers will subscribe to this channel and complete any given
Expand All @@ -31,22 +34,41 @@ from its last offset.

## Definition of API

* POST `/_/crr/pause/<site>`
* POST `/_/crr/pause`

This POST request is to manually pause the cross-region replication service.
You may specify a site name, or if you would like to pause replication for
all sites, specify "all" as the site name.
This POST request is to manually pause the cross-region replication service
for all locations configured as destination replication endpoints.

Response:
```sh
{}
```

* POST `/_/crr/resume/<site>`
* POST `/_/crr/pause/<location-name>`

This POST request is to manually pause the cross-region replication service
for a specified location configured as a destination replication endpoint.

Response:
```sh
{}
```

* POST `/_/crr/resume`

This POST request is to manually resume the cross-region replication
service for all locations configured as destination replication endpoints.

Response:
```sh
{}
```

* POST `/_/crr/resume/<location-name>`

This POST request is to manually resume the cross-region replication
service. You may specify a site name, or if you would like to resume
replication for all sites, specify "all" as the site name.
service for a specified location configured as a destination replication
endpoint.

Response:
```sh
Expand Down
52 changes: 39 additions & 13 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ class QueueProcessor extends EventEmitter {
this._consumer = null;
this._mProducer = mProducer;
this.site = site;
// redis pub/sub for pause/resume
this._redis = new Redis(redisConfig);
this._setupRedis();

this.echoMode = false;

Expand All @@ -88,6 +85,7 @@ class QueueProcessor extends EventEmitter {
this.destHTTPAgent = new http.Agent({ keepAlive: true });

this._setupVaultclientCache();
this._setupRedis(redisConfig);

// FIXME support multiple scality destination sites
if (Array.isArray(destConfig.bootstrapList)) {
Expand Down Expand Up @@ -205,30 +203,58 @@ class QueueProcessor extends EventEmitter {
this.accountCredsCache = {};
}

_setupRedis() {
_setupRedis(redisConfig) {
// redis pub/sub for pause/resume
const redis = new Redis(redisConfig);
// redis subscribe to site specific channel
const channelName = `${this.repConfig.topic}-${this.site}`;
this._redis.subscribe(channelName, err => {
redis.subscribe(channelName, err => {
if (err) {
this.logger.fatal('queue processor failed to subscribe to ' +
`crr redis channel for site ${this.site}`,
`crr redis channel for location ${this.site}`,
{ method: 'QueueProcessor.constructor',
error: err });
process.exit(1);
}
this._redis.on('message', (channel, message) => {
if (channel === channelName && this._consumer[message]) {
// should only validate valid functions that can be invoked
// from the redis pub/sub
const validCmds = ['pauseService', 'resumeService'];
if (validCmds.includes(message)) {
this._consumer[message](this.site);
redis.on('message', (channel, message) => {
const validActions = {
pauseService: this._pauseService.bind(this),
resumeService: this._resumeService.bind(this),
};
try {
const { action } = JSON.parse(message);
const cmd = validActions[action];
if (channel === channelName && typeof cmd === 'function') {
cmd();
}
} catch (e) {
this.logger.error('error parsing redis sub message', {
method: 'QueueProcessor._setupRedis',
error: e,
});
}
});
});
}

/**
* Pause replication consumers
* @return {undefined}
*/
_pauseService() {
this._consumer.pause(this.site);
this.logger.info(`paused replication for location: ${this.site}`);
}

/**
* Resume replication consumers
* @return {undefined}
*/
_resumeService() {
this._consumer.resume(this.site);
this.logger.info(`resumed replication for location: ${this.site}`);
}

getStateVars() {
return {
sourceConfig: this.sourceConfig,
Expand Down
21 changes: 11 additions & 10 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,26 +518,27 @@ class BackbeatConsumer extends EventEmitter {
* @param {string} site - name of site
* @return {undefined}
*/
pauseService(site) {
pause(site) {
// Use of KafkaConsumer#pause did not work. Using alternative
// of unsubscribe/subscribe
if (this._consumerReady) {
this._consumer.unsubscribe();
this._consumerReady = false;
this._log.info(`paused replication for site ${site}`);
}
this._consumer.unsubscribe();
this._log.debug(`paused consumer for location: ${site}`, {
method: 'BackbeatConsumer.pause',
});
}

/**
* resume the kafka consumer
* @param {string} site - name of site
* @return {undefined}
*/
resumeService(site) {
if (!this._consumerReady) {
resume(site) {
// if not subscribed, then subscribe
if (this._consumer.subscription().length === 0) {
this._consumer.subscribe([this._topic]);
this._consumerReady = true;
this._log.info(`resumed replication for site ${site}`);
this._log.debug(`resumed consumer for location: ${site}`, {
method: 'BackbeatConsumer.resume',
});
}
}

Expand Down
58 changes: 56 additions & 2 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'; // eslint-disable-line strict

const async = require('async');
const Redis = require('ioredis');
const zookeeper = require('node-zookeeper-client');

const { errors } = require('arsenal');
Expand Down Expand Up @@ -62,6 +63,10 @@ class BackbeatAPI {
this._internalStart = Date.now();
}

// Redis instance for publishing messages to BackbeatConsumers
this._redisPublisher = new Redis(this._redisConfig);

// Redis instance is used for getting/setting keys
this._redisClient = new RedisClient(this._redisConfig, this._logger);
// Redis expiry increased by an additional interval so we can reference
// the immediate older data for average throughput calculation
Expand Down Expand Up @@ -167,11 +172,14 @@ class BackbeatAPI {
addKeys.key = key;
addKeys.versionId = versionId;
}
if (bbRequest.getHTTPMethod() === 'GET' && type === 'all' &&
marker) {
if (marker) {
// this is optional, so doesn't matter if set or not
addKeys.marker = marker;
}
} else {
// currently only pause/resume
filteredRoutes = filteredRoutes.filter(r =>
rDetails.status === r.type);
}
}
// if rDetails has a type property
Expand Down Expand Up @@ -527,6 +535,52 @@ class BackbeatAPI {
return { reqBody };
}

/**
* Pause CRR operations for given site(s)
* @param {Object} details - The route details
* @param {String} body - The POST request body string
* @param {Function} cb - The callback to call
* @return {undefined}
*/
pauseCRRService(details, body, cb) {
let sites;
if (details.site === 'all') {
sites = details.extensions.crr.filter(s => s !== 'all');
} else {
sites = [details.site];
}
sites.forEach(site => {
const channel = `${this._crrTopic}-${site}`;
const message = JSON.stringify({ action: 'pauseService' });
this._redisPublisher.publish(channel, message);
});
this._logger.info(`replication service paused for locations: ${sites}`);
return cb(null, {});
}

/**
* Resume CRR operations for given site(s)
* @param {Object} details - The route details
* @param {String} body - The POST request body string
* @param {Function} cb - The callback to call
* @return {undefined}
*/
resumeCRRService(details, body, cb) {
let sites;
if (details.site === 'all') {
sites = details.extensions.crr.filter(s => s !== 'all');
} else {
sites = [details.site];
}
sites.forEach(site => {
const channel = `${this._crrTopic}-${site}`;
const message = JSON.stringify({ action: 'resumeService' });
this._redisPublisher.publish(channel, message);
});
this._logger.info(`replication service resumed for locations ${sites}`);
return cb(null, {});
}

/**
* Setup internals
* @param {function} cb - callback(error)
Expand Down
26 changes: 17 additions & 9 deletions lib/api/BackbeatRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ class BackbeatRequest {
}

/**
* Parse the route details for any of the retry routes.
* Parse the route details for any of the crr routes (retry, pause, resume).
* @param {Array} parts - The route schema split by '/'
* @param {String} query - The query string.
* @return {undefined}
*/
_parseRetryRoutes(parts, query) {
this._routeDetails.extension = parts[0];
this._routeDetails.status = parts[1];
this._routeDetails.bucket = parts[2];
this._routeDetails.key = parts[3];
this._routeDetails.versionId = parts[4];
this._routeDetails.marker = querystring.parse(query).marker;
_parseCRRRoutes(parts, query) {
if (parts[1] && parts[1] === 'failed') {
this._routeDetails.extension = parts[0];
this._routeDetails.status = parts[1];
this._routeDetails.bucket = parts[2];
this._routeDetails.key = parts[3];
this._routeDetails.versionId = parts[4];
this._routeDetails.marker = querystring.parse(query).marker;
} else {
// for now: pause/resume
this._routeDetails.extension = parts[0];
this._routeDetails.status = parts[1];
this._routeDetails.site = parts[2] || 'all';
}
}

/**
Expand Down Expand Up @@ -81,8 +88,9 @@ class BackbeatRequest {
const { pathname, query } = url.parse(this._route);
const parts = pathname ? pathname.split('/') : [];

// crr retry/pause/resume routes
if (parts[0] === 'crr') {
this._parseRetryRoutes(parts, query);
this._parseCRRRoutes(parts, query);
} else if (parts[0] === 'metrics') {
this._parseMetricsRoutes(parts);
} else if (parts[0] === 'monitoring') {
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ describe('BackbeatConsumer', () => {
consumer.on('consumed', messagesConsumed => {
totalConsumed += messagesConsumed;
if (totalConsumed === 1) {
consumer.pauseService();
consumer.pause();
next();
}
});
Expand Down Expand Up @@ -166,7 +166,7 @@ describe('BackbeatConsumer', () => {
});
},
next => {
consumer.resumeService();
consumer.resume();
assert.equal(kafkaConsumer.subscription().length, 1);
consumer.on('consumed', messagesConsumed => {
totalConsumed += messagesConsumed;
Expand Down
Loading

0 comments on commit b8502e2

Please sign in to comment.