Skip to content

Commit

Permalink
drop: test
Browse files Browse the repository at this point in the history
  • Loading branch information
philipyoo committed Jun 27, 2018
1 parent 51155a7 commit c3f8525
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class QueueProcessor extends EventEmitter {
const enabled = this._consumer.getServiceStatus();
if (enabled) {
this._consumer.pause(this.site);
this._updateZkServiceStatus(this._consumer.getServiceStatus());
this._updateZkServiceStatus();
this.logger.info(`paused replication for location: ${this.site}`);
}
}
Expand All @@ -269,7 +269,7 @@ class QueueProcessor extends EventEmitter {
const enabled = this._consumer.getServiceStatus();
if (!enabled) {
this._consumer.resume(this.site);
this._updateZkServiceStatus(this._consumer.getServiceStatus());
this._updateZkServiceStatus();
this.logger.info(`resumed replication for location: ${this.site}`);
}
}
Expand All @@ -282,11 +282,11 @@ class QueueProcessor extends EventEmitter {
/**
* Update Kafka consumer status in zookeeper for this site-defined
* QueueProcessor
* @param {boolean} status - true if consumer is enabled
* @return {undefined}
*/
_updateZkServiceStatus(status) {
_updateZkServiceStatus() {
const path = this._getZkSiteNode();
const enabled = this._consumer.getServiceStatus();
async.waterfall([
next => this.zkClient.getData(path, (err, data) => {
if (err) {
Expand All @@ -299,7 +299,8 @@ class QueueProcessor extends EventEmitter {
try {
const state = JSON.parse(data.toString());
// set revised status
state.status = status;
state.paused = !enabled;
console.log('\nSTATE:', state)
return next(null, JSON.stringify(state));
} catch (e) {
const error = {
Expand All @@ -324,7 +325,7 @@ class QueueProcessor extends EventEmitter {
if (e) {
// if zookeeper state could not be set, then BackbeatConsumer
// should reflect that state, and revert any changes
const paused = status === false;
const paused = !enabled;
if (paused) {
// need to re-enable
this._consumer.resume(this.site);
Expand Down

0 comments on commit c3f8525

Please sign in to comment.