Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ft: ZENKO-583 get CRR status #331

Merged
merged 2 commits into from
Jun 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions docs/crr-pause-resume.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ A RESTful API will expose methods for users to pause and resume cross-region
replication operations.

Redis offers a pub/sub function. We will utilize this function to propagate
requests to all active CRR Kafka Consumers on all nodes with backbeat
requests to all active CRR Kafka Consumers on all nodes that have backbeat
containers setup for replication.

We want to pause and resume the CRR service at the lowest level (in our case,
Expand All @@ -23,24 +23,52 @@ consumed by the Kafka Consumer and are being processed for replication will
continue to finish replication and will not be paused.

The API will have a Redis instance publishing messages to a specific channel.
All CRR Kafka Consumers will subscribe to this channel and complete any given
valid requests.

When a Kafka Consumer pauses, the Consumer is still kept alive and maintains
any internal state, including offset. The Consumer will no longer be
subscribed to the CRR topic, so will no longer try consuming any entries.
When the paused Consumer is resumed, it will again resume consuming entries
from its last offset.
QueueProcessors will subscribe to this channel, and on receiving a request
to pause or resume CRR, will notify all of their BackbeatConsumers to perform
the action if applicable. If an action occurred, the QueueProcessor will receive
an update on the current status of each Consumer. Based on the global status of
a location, the status will be updated in Zookeeper if a change has occurred.

It is important to note, when a Consumer pauses, the Consumer process is still
kept alive and maintains any internal state, including offset. The Consumer will
no longer be subscribed to the CRR topic, so will no longer try consuming any
entries. When the paused Consumer is resumed, it will again resume consuming
entries from its last offset.

## Definition of API

* GET `/_/crr/status`

This GET request checks if cross-region replication is enabled or not for
all locations configured as destination replication endpoints.

Response:
```json
{
"location1": "disabled",
"location2": "enabled"
}
```

* GET `/_/crr/status/<location-name>`

This GET request checks if cross-region replication is enabled or not for
a specified location configured as a destination replication endpoint.

Response:
```json
{
"<location-name>": "enabled"
}
```

* POST `/_/crr/pause`

This POST request is to manually pause the cross-region replication service
for all locations configured as destination replication endpoints.

Response:
```sh
```json
{}
```

Expand All @@ -50,7 +78,7 @@ from its last offset.
for a specified location configured as a destination replication endpoint.

Response:
```sh
```json
{}
```

Expand All @@ -60,7 +88,7 @@ from its last offset.
service for all locations configured as destination replication endpoints.

Response:
```sh
```json
{}
```

Expand All @@ -71,6 +99,6 @@ from its last offset.
endpoint.

Response:
```sh
```json
{}
```
3 changes: 2 additions & 1 deletion extensions/replication/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
const testIsOn = process.env.TEST_SWITCH === '1';

const constants = {
zookeeperReplicationNamespace: '/backbeat/replication',
zookeeperReplicationNamespace:
testIsOn ? '/backbeattest' : '/backbeat/replication',
proxyVaultPath: '/_/backbeat/vault',
proxyIAMPath: '/_/backbeat/iam',
metricsExtension: 'crr',
Expand Down
113 changes: 105 additions & 8 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'; // eslint-disable-line

const async = require('async');
const http = require('http');
const { EventEmitter } = require('events');
const Redis = require('ioredis');
Expand All @@ -21,6 +22,9 @@ const EchoBucket = require('../tasks/EchoBucket');
const ObjectQueueEntry = require('../utils/ObjectQueueEntry');
const BucketQueueEntry = require('../utils/BucketQueueEntry');

const { zookeeperReplicationNamespace } = require('../constants');
const ZK_CRR_STATE_PATH = '/state';

const {
proxyVaultPath,
proxyIAMPath,
Expand All @@ -37,9 +41,9 @@ class QueueProcessor extends EventEmitter {
* entries to a target S3 endpoint.
*
* @constructor
* @param {Object} zkConfig - zookeeper configuration object
* @param {node-zookeeper-client.Client} zkClient - zookeeper client
* @param {Object} kafkaConfig - kafka configuration object
* @param {string} kafkaConfig.hosts - list of kafka brokers
* @param {String} kafkaConfig.hosts - list of kafka brokers
* as "host:port[,host:port...]"
* @param {Object} sourceConfig - source S3 configuration
* @param {Object} sourceConfig.s3 - s3 endpoint configuration object
Expand All @@ -59,9 +63,10 @@ class QueueProcessor extends EventEmitter {
* @param {MetricsProducer} mProducer - instance of metrics producer
* @param {String} site - site name
*/
constructor(zkConfig, kafkaConfig, sourceConfig, destConfig, repConfig,
constructor(zkClient, kafkaConfig, sourceConfig, destConfig, repConfig,
redisConfig, mProducer, site) {
super();
this.zkClient = zkClient;
this.kafkaConfig = kafkaConfig;
this.sourceConfig = sourceConfig;
this.destConfig = destConfig;
Expand Down Expand Up @@ -203,6 +208,12 @@ class QueueProcessor extends EventEmitter {
this.accountCredsCache = {};
}

/**
* Setup the Redis Subscriber which listens for actions from other processes
* (i.e. BackbeatAPI for pause/resume)
* @param {object} redisConfig - redis configs
* @return {undefined}
*/
_setupRedis(redisConfig) {
// redis pub/sub for pause/resume
const redis = new Redis(redisConfig);
Expand Down Expand Up @@ -242,17 +253,100 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
_pauseService() {
this._consumer.pause(this.site);
this.logger.info(`paused replication for location: ${this.site}`);
const enabled = this._consumer.getServiceStatus();
if (enabled) {
// if currently resumed/active, attempt to pause
this._updateZkServiceStatus(err => {
if (err) {
this.logger.trace('error occurred saving state to ' +
'zookeeper', {
method: 'QueueProcessor._pauseService',
});
} else {
this._consumer.pause(this.site);
this.logger.info('paused replication for location: ' +
`${this.site}`);
}
});
}
}

/**
* Resume replication consumers
* @return {undefined}
*/
_resumeService() {
this._consumer.resume(this.site);
this.logger.info(`resumed replication for location: ${this.site}`);
const enabled = this._consumer.getServiceStatus();
if (!enabled) {
// if currently paused, attempt to resume
this._updateZkServiceStatus(err => {
if (err) {
this.logger.trace('error occurred saving state to ' +
'zookeeper', {
method: 'QueueProcessor._resumeService',
});
} else {
this._consumer.resume(this.site);
this.logger.info('resumed replication for location: ' +
`${this.site}`);
}
});
}
}

_getZkSiteNode() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}/` +
`${this.site}`;
}

/**
* Update Kafka consumer status in zookeeper for this site-defined
* QueueProcessor
* @param {Function} cb - callback(error)
* @return {undefined}
*/
_updateZkServiceStatus(cb) {
const path = this._getZkSiteNode();
const enabled = this._consumer.getServiceStatus();
async.waterfall([
next => this.zkClient.getData(path, (err, data) => {
if (err) {
this.logger.error('could not get state from zookeeper', {
method: 'QueueProcessor._updateZkServiceStatus',
zookeeperPath: path,
error: err.message,
});
return next(err);
}
try {
const state = JSON.parse(data.toString());
// set revised status
state.paused = !enabled;
const bufferedData = Buffer.from(JSON.stringify(state));
return next(null, bufferedData);
} catch (err) {
this.logger.error('could not parse state data from ' +
'zookeeper', {
method: 'QueueProcessor._updateZkServiceStatus',
zookeeperPath: path,
error: err,
});
return next(err);
}
}),
(data, next) => this.zkClient.setData(path, data, err => {
if (err) {
this.logger.error('could not save state data in ' +
'zookeeper', {
method: 'QueueProcessor._updateZkServiceStatus',
zookeeperPath: path,
error: err,
});
return next(err);
}
return next();
}),
], cb);
}

getStateVars() {
Expand Down Expand Up @@ -282,6 +376,7 @@ class QueueProcessor extends EventEmitter {
* @param {boolean} [options.disableConsumer] - true to disable
* startup of consumer (for testing: one has to call
* processQueueEntry() explicitly)
* @param {boolean} [options.paused] - if true, kafka consumer is paused
* @return {undefined}
*/
start(options) {
Expand Down Expand Up @@ -314,7 +409,9 @@ class QueueProcessor extends EventEmitter {
});
this._consumer.on('ready', () => {
consumerReady = true;
this._consumer.subscribe();
const paused = options && options.paused;
this._consumer.subscribe(paused);

this.logger.info('queue processor is ready to consume ' +
'replication entries');
this.emit('ready');
Expand Down
Loading