diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 40d1b49f45..891fe45642 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -1,5 +1,6 @@ 'use strict'; // eslint-disable-line +const async = require('async'); const http = require('http'); const { EventEmitter } = require('events'); const Redis = require('ioredis'); @@ -22,7 +23,7 @@ const ObjectQueueEntry = require('../utils/ObjectQueueEntry'); const BucketQueueEntry = require('../utils/BucketQueueEntry'); const { zookeeperReplicationNamespace } = require('../constants'); -const ZK_CRR_STATUS_PATH = '/status'; +const ZK_CRR_STATE_PATH = '/state'; const { proxyVaultPath, @@ -252,9 +253,12 @@ class QueueProcessor extends EventEmitter { * @return {undefined} */ _pauseService() { - this._consumer.pause(this.site); - this._updateZkServiceStatus(this._consumer.getServiceStatus()); - this.logger.info(`paused replication for location: ${this.site}`); + const enabled = this._consumer.getServiceStatus(); + if (enabled) { + this._consumer.pause(this.site); + this._updateZkServiceStatus(this._consumer.getServiceStatus()); + this.logger.info(`paused replication for location: ${this.site}`); + } } /** @@ -262,13 +266,16 @@ class QueueProcessor extends EventEmitter { * @return {undefined} */ _resumeService() { - this._consumer.resume(this.site); - this._updateZkServiceStatus(this._consumer.getServiceStatus()); - this.logger.info(`resumed replication for location: ${this.site}`); + const enabled = this._consumer.getServiceStatus(); + if (!enabled) { + this._consumer.resume(this.site); + this._updateZkServiceStatus(this._consumer.getServiceStatus()); + this.logger.info(`resumed replication for location: ${this.site}`); + } } - _getStatusZkPath() { - return `${zookeeperReplicationNamespace}${ZK_CRR_STATUS_PATH}/` + + _getZkSiteNode() { + return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}/` + `${this.site}`; } @@ -279,15 +286,57 @@ class QueueProcessor extends EventEmitter { * @return {undefined} */ _updateZkServiceStatus(status) { - const path = this._getStatusZkPath(); - const data = Buffer.from(status.toString()); - this.zkClient.setData(path, data, err => { - if (err) { - this.logger.error('could not update service status in ' + - 'zookeeper', { - method: 'QueueProcessor._updateZkState', + const path = this._getZkSiteNode(); + async.waterfall([ + next => this.zkClient.getData(path, (err, data) => { + if (err) { + const error = { + message: 'could not get state from zookeeper', + error: err.message, + }; + return next(error); + } + try { + const state = JSON.parse(data.toString()); + // set revised status + state.status = status; + return next(null, JSON.stringify(state)); + } catch (e) { + const error = { + message: 'could not parse status data from zookeeper', + error: e, + }; + return next(error); + } + }), + (state, next) => this.zkClient.setData(path, Buffer.from(state), + err => { + if (err) { + const error = { + message: 'could not save status data in zookeeper', + error: err, + }; + return next(error); + } + return next(); + }), + ], e => { + if (e) { + // if zookeeper state could not be set, then BackbeatConsumer + // should reflect that state, and revert any changes + const paused = status === false; + if (paused) { + // need to re-enable + this._consumer.resume(this.site); + } else { + // need to disable + this._consumer.pause(this.site); + } + this.logger.error(`${e.message}`, { + method: 'QueueProcessor._updateZkServiceStatus', zookeeperPath: path, - error: err, + error: e.error, + paused: this._consumer.getServiceStatus(), }); } }); diff --git a/extensions/replication/queueProcessor/task.js b/extensions/replication/queueProcessor/task.js index 38c128079f..6c00cbacbd 100644 --- a/extensions/replication/queueProcessor/task.js +++ b/extensions/replication/queueProcessor/task.js @@ -11,7 +11,7 @@ const MetricsProducer = require('../../../lib/MetricsProducer'); const zookeeper = require('../../../lib/clients/zookeeper'); const { zookeeperReplicationNamespace } = require('../constants'); -const ZK_CRR_STATUS_PATH = '/status'; +const ZK_CRR_STATE_PATH = '/state'; const zkConfig = config.zookeeper; const kafkaConfig = config.kafka; @@ -24,8 +24,8 @@ const log = new werelogs.Logger('Backbeat:QueueProcessor:task'); werelogs.configure({ level: config.log.logLevel, dump: config.log.dumpLevel }); -function getCRRStatusZkPath() { - return `${zookeeperReplicationNamespace}${ZK_CRR_STATUS_PATH}`; +function getCRRStateZkPath() { + return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}`; } /** @@ -36,11 +36,12 @@ function getCRRStatusZkPath() { * @param {Function} done - callback(error, status) where status is a boolean * @return {undefined} */ -function setupZkStatusNode(zkClient, site, done) { - const path = `${getCRRStatusZkPath()}/${site}`; - zkClient.create(path, Buffer.from('true'), err => { +function setupZkSiteNode(zkClient, site, done) { + const path = `${getCRRStateZkPath()}/${site}`; + const data = JSON.stringify({ paused: false }); + zkClient.create(path, Buffer.from(data), err => { if (err && err.name === 'NODE_EXISTS') { - zkClient.getData(path, (err, data) => { + return zkClient.getData(path, (err, data) => { if (err) { log.fatal('could not check site status in zookeeper', { method: 'QueueProcessor:task', @@ -48,8 +49,17 @@ function setupZkStatusNode(zkClient, site, done) { error: err.message }); return done(err); } - const paused = data.toString() === 'false'; - return done(null, paused); + try { + const paused = JSON.parse(data.toString()).paused; + return done(null, paused); + } catch (e) { + log.fatal('error setting state for queue processor', { + method: 'QueueProcessor:task', + site, + error: e, + }); + return done(e); + } }); } if (err) { @@ -60,7 +70,8 @@ function setupZkStatusNode(zkClient, site, done) { }); return done(err); } - return done(null, true); + // paused is false + return done(null, false); }); } @@ -92,7 +103,7 @@ async.series([ }); zkClient.once('ready', () => { zkClient.removeAllListeners('error'); - const path = getCRRStatusZkPath(); + const path = getCRRStateZkPath(); zkClient.mkdirp(path, err => { if (err) { log.fatal('could not create path in zookeeper', { @@ -146,7 +157,7 @@ async.series([ activeQProcessors[site] = new QueueProcessor( zkClient, kafkaConfig, sourceConfig, destConfig, repConfig, redisConfig, metricsProducer, site); - setupZkStatusNode(zkClient, site, (err, paused) => { + setupZkSiteNode(zkClient, site, (err, paused) => { if (err) { return next(err); } @@ -173,7 +184,7 @@ async.series([ activeQProcessors[site] = new QueueProcessor(zkClient, kafkaConfig, sourceConfig, destConfig, repConfig, redisConfig, metricsProducer, site); - return setupZkStatusNode(zkClient, site, (err, paused) => { + return setupZkSiteNode(zkClient, site, (err, paused) => { if (err) { return next(err); } diff --git a/lib/api/BackbeatAPI.js b/lib/api/BackbeatAPI.js index 1ed13c251d..3278525e4c 100644 --- a/lib/api/BackbeatAPI.js +++ b/lib/api/BackbeatAPI.js @@ -22,7 +22,7 @@ const monitoringClient = require('../clients/monitoringHandler').client; const INTERVAL = 300; // 5 minutes const EXPIRY = 900; // 15 minutes -const ZK_CRR_STATUS_PATH = '/status'; +const ZK_CRR_STATE_PATH = '/state'; /** * Class representing Backbeat API endpoints and internals @@ -600,14 +600,18 @@ class BackbeatAPI { const statuses = {}; async.each(sites, (site, next) => { const path = - `${zookeeperReplicationNamespace}${ZK_CRR_STATUS_PATH}/${site}`; + `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}/${site}`; this._zkClient.getData(path, (err, data) => { if (err) { return next(err); } - statuses[site] = data.toString() === 'true' ? - 'enabled' : 'disabled'; - return next(); + try { + const d = JSON.parse(data.toString()); + statuses[site] = d.paused ? 'disabled' : 'enabled'; + return next(); + } catch (e) { + return next(e); + } }); }, error => { if (error) {