Skip to content

Commit

Permalink
squash: post-review
Browse files Browse the repository at this point in the history
  • Loading branch information
philipyoo committed Jun 27, 2018
1 parent cdcc885 commit f813eff
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 42 deletions.
26 changes: 13 additions & 13 deletions docs/crr-pause-resume.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ continue to finish replication and will not be paused.

The API will have a Redis instance publishing messages to a specific channel.
QueueProcessors will subscribe to this channel, and on receiving a request
to pause or resume CRR, will notify all of its BackbeatConsumers to perform the
action if applicable. If an action occurred, the QueueProcessor will receive an
update on the current status of each Consumer. Based on the global status of a
location, the status will be updated in Zookeeper if a change has occurred.
to pause or resume CRR, will notify all of their BackbeatConsumers to perform
the action if applicable. If an action occurred, the QueueProcessor will receive
an update on the current status of each Consumer. Based on the global status of
a location, the status will be updated in Zookeeper if a change has occurred.

It is important to note, when a Consumer pauses, the Consumer process is still
kept alive and maintains any internal state, including offset. The Consumer will
Expand All @@ -43,10 +43,10 @@ entries from its last offset.
all locations configured as destination replication endpoints.

Response:
```sh
```json
{
location1: "disabled",
location2: "enabled"
"location1": "disabled",
"location2": "enabled"
}
```

Expand All @@ -56,9 +56,9 @@ entries from its last offset.
a specified location configured as a destination replication endpoint.

Response:
```sh
```json
{
location: "enabled",
"<location-name>": "enabled"
}
```

Expand All @@ -68,7 +68,7 @@ entries from its last offset.
for all locations configured as destination replication endpoints.

Response:
```sh
```json
{}
```

Expand All @@ -78,7 +78,7 @@ entries from its last offset.
for a specified location configured as a destination replication endpoint.

Response:
```sh
```json
{}
```

Expand All @@ -88,7 +88,7 @@ entries from its last offset.
service for all locations configured as destination replication endpoints.

Response:
```sh
```json
{}
```

Expand All @@ -99,6 +99,6 @@ entries from its last offset.
endpoint.

Response:
```sh
```json
{}
```
6 changes: 3 additions & 3 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class QueueProcessor extends EventEmitter {
* @param {boolean} [options.disableConsumer] - true to disable
* startup of consumer (for testing: one has to call
* processQueueEntry() explicitly)
* @param {boolean} [options.status] - if false, kafka consumer is disabled
* @param {boolean} [options.paused] - if true, kafka consumer is paused
* @return {undefined}
*/
start(options) {
Expand Down Expand Up @@ -353,8 +353,8 @@ class QueueProcessor extends EventEmitter {
});
this._consumer.on('ready', () => {
consumerReady = true;
const disableConsumer = (options && options.status === false);
this._consumer.subscribe(disableConsumer);
const paused = options && options.paused;
this._consumer.subscribe(paused);

this.logger.info('queue processor is ready to consume ' +
'replication entries');
Expand Down
36 changes: 18 additions & 18 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,27 @@ function getCRRStatusZkPath() {
function setupZkStatusNode(zkClient, site, done) {
const path = `${getCRRStatusZkPath()}/${site}`;
zkClient.create(path, Buffer.from('true'), err => {
if (err && err.name !== 'NODE_EXISTS') {
log.fatal('could not setup zookeeper node', {
method: 'QueueProcessor:task',
zookeeperPath: path,
error: err,
});
return done(err);
}
if (err && err.name === 'NODE_EXISTS') {
zkClient.getData(path, (err, data) => {
if (err) {
log.fatal('could not check check site status in zookeeper',
log.fatal('could not check site status in zookeeper',
{ method: 'QueueProcessor:task',
zookeeperPath: path,
error: err });
error: err.message });
return done(err);
}
const d = data.toString() === 'true';
return done(null, d);
const paused = data.toString() === 'false';
return done(null, paused);
});
}
if (err) {
log.fatal('could not setup zookeeper node', {
method: 'QueueProcessor:task',
zookeeperPath: path,
error: err.message,
});
return done(err);
}
return done(null, true);
});
}
Expand All @@ -86,7 +86,7 @@ async.series([
zkClient.connect();
zkClient.once('error', err => {
log.fatal('error connecting to zookeeper', {
error: err,
error: err.message,
});
return done(err);
});
Expand All @@ -98,7 +98,7 @@ async.series([
log.fatal('could not create path in zookeeper', {
method: 'QueueProcessor:task',
zookeeperPath: path,
error: err,
error: err.message,
});
return done(err);
}
Expand Down Expand Up @@ -146,11 +146,11 @@ async.series([
activeQProcessors[site] = new QueueProcessor(
zkClient, kafkaConfig, sourceConfig, destConfig,
repConfig, redisConfig, metricsProducer, site);
setupZkStatusNode(zkClient, site, (err, status) => {
setupZkStatusNode(zkClient, site, (err, paused) => {
if (err) {
return next(err);
}
activeQProcessors[site].start({ status });
activeQProcessors[site].start({ paused });
return next();
});
}
Expand All @@ -173,11 +173,11 @@ async.series([
activeQProcessors[site] = new QueueProcessor(zkClient,
kafkaConfig, sourceConfig, destConfig, repConfig,
redisConfig, metricsProducer, site);
return setupZkStatusNode(zkClient, site, (err, status) => {
return setupZkStatusNode(zkClient, site, (err, paused) => {
if (err) {
return next(err);
}
activeQProcessors[site].start({ status });
activeQProcessors[site].start({ paused });
return next();
});
}, err => {
Expand Down
10 changes: 5 additions & 5 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ class BackbeatConsumer extends EventEmitter {
* is paused until the current queue of tasks are processed. Once the task
* queue is empty, the current offset is committed and the fetch is resumed
* to get the next batch of messages
* @param {Boolean} [disableConsumer] - optional field, if true, kafka
* consumer should not subscribe to its topic
* @param {Boolean} [paused] - optional field. If true, kafka consumer should
* not subscribe to its topic
* @return {this} current instance
*/
subscribe(disableConsumer) {
if (!disableConsumer) {
subscribe(paused) {
if (!paused) {
this._consumer.subscribe([this._topic]);
} else {
this._log.info(`consumer not subcribed to topic ${this._topic}`);
this._log.debug(`consumer is paused for topic ${this._topic}`);
}

this._processingQueue = async.queue(
Expand Down
4 changes: 1 addition & 3 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ class BackbeatAPI {
getHealthcheck(details, cb) {
return this._healthcheck.getHealthcheck((err, data) => {
if (err) {
this._logger.error('error getting healthcheck', {
error: err,
});
this._logger.error('error getting healthcheck', err);
return cb(errors.InternalError);
}
return cb(null, data);
Expand Down

0 comments on commit f813eff

Please sign in to comment.