Skip to content

Commit

Permalink
squash: change zookeeper node data type
Browse files Browse the repository at this point in the history
  • Loading branch information
philipyoo committed Jun 27, 2018
1 parent f813eff commit 0bb1482
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 34 deletions.
83 changes: 66 additions & 17 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'; // eslint-disable-line

const async = require('async');
const http = require('http');
const { EventEmitter } = require('events');
const Redis = require('ioredis');
Expand All @@ -22,7 +23,7 @@ const ObjectQueueEntry = require('../utils/ObjectQueueEntry');
const BucketQueueEntry = require('../utils/BucketQueueEntry');

const { zookeeperReplicationNamespace } = require('../constants');
const ZK_CRR_STATUS_PATH = '/status';
const ZK_CRR_STATE_PATH = '/state';

const {
proxyVaultPath,
Expand Down Expand Up @@ -252,23 +253,29 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
_pauseService() {
this._consumer.pause(this.site);
this._updateZkServiceStatus(this._consumer.getServiceStatus());
this.logger.info(`paused replication for location: ${this.site}`);
const enabled = this._consumer.getServiceStatus();
if (enabled) {
this._consumer.pause(this.site);
this._updateZkServiceStatus(this._consumer.getServiceStatus());
this.logger.info(`paused replication for location: ${this.site}`);
}
}

/**
* Resume replication consumers
* @return {undefined}
*/
_resumeService() {
this._consumer.resume(this.site);
this._updateZkServiceStatus(this._consumer.getServiceStatus());
this.logger.info(`resumed replication for location: ${this.site}`);
const enabled = this._consumer.getServiceStatus();
if (!enabled) {
this._consumer.resume(this.site);
this._updateZkServiceStatus(this._consumer.getServiceStatus());
this.logger.info(`resumed replication for location: ${this.site}`);
}
}

_getStatusZkPath() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATUS_PATH}/` +
_getZkSiteNode() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}/` +
`${this.site}`;
}

Expand All @@ -279,15 +286,57 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
_updateZkServiceStatus(status) {
const path = this._getStatusZkPath();
const data = Buffer.from(status.toString());
this.zkClient.setData(path, data, err => {
if (err) {
this.logger.error('could not update service status in ' +
'zookeeper', {
method: 'QueueProcessor._updateZkState',
const path = this._getZkSiteNode();
async.waterfall([
next => this.zkClient.getData(path, (err, data) => {
if (err) {
const error = {
message: 'could not get state from zookeeper',
error: err.message,
};
return next(error);
}
try {
const state = JSON.parse(data.toString());
// set revised status
state.status = status;
return next(null, JSON.stringify(state));
} catch (e) {
const error = {
message: 'could not parse status data from zookeeper',
error: e,
};
return next(error);
}
}),
(state, next) => this.zkClient.setData(path, Buffer.from(state),
err => {
if (err) {
const error = {
message: 'could not save status data in zookeeper',
error: err,
};
return next(error);
}
return next();
}),
], e => {
if (e) {
// if zookeeper state could not be set, then BackbeatConsumer
// should reflect that state, and revert any changes
const paused = status === false;
if (paused) {
// need to re-enable
this._consumer.resume(this.site);
} else {
// need to disable
this._consumer.pause(this.site);
}
this.logger.error(`${e.message}`, {
method: 'QueueProcessor._updateZkServiceStatus',
zookeeperPath: path,
error: err,
error: e.error,
paused: this._consumer.getServiceStatus(),
});
}
});
Expand Down
35 changes: 23 additions & 12 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const MetricsProducer = require('../../../lib/MetricsProducer');
const zookeeper = require('../../../lib/clients/zookeeper');

const { zookeeperReplicationNamespace } = require('../constants');
const ZK_CRR_STATUS_PATH = '/status';
const ZK_CRR_STATE_PATH = '/state';

const zkConfig = config.zookeeper;
const kafkaConfig = config.kafka;
Expand All @@ -24,8 +24,8 @@ const log = new werelogs.Logger('Backbeat:QueueProcessor:task');
werelogs.configure({ level: config.log.logLevel,
dump: config.log.dumpLevel });

function getCRRStatusZkPath() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATUS_PATH}`;
function getCRRStateZkPath() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}`;
}

/**
Expand All @@ -36,9 +36,10 @@ function getCRRStatusZkPath() {
* @param {Function} done - callback(error, status) where status is a boolean
* @return {undefined}
*/
function setupZkStatusNode(zkClient, site, done) {
const path = `${getCRRStatusZkPath()}/${site}`;
zkClient.create(path, Buffer.from('true'), err => {
function setupZkSiteNode(zkClient, site, done) {
const path = `${getCRRStateZkPath()}/${site}`;
const data = JSON.stringify({ paused: false });
zkClient.create(path, Buffer.from(data), err => {
if (err && err.name === 'NODE_EXISTS') {
zkClient.getData(path, (err, data) => {
if (err) {
Expand All @@ -48,8 +49,17 @@ function setupZkStatusNode(zkClient, site, done) {
error: err.message });
return done(err);
}
const paused = data.toString() === 'false';
return done(null, paused);
try {
const paused = JSON.parse(data.toString()).paused;
return done(null, paused);
} catch (e) {
log.fatal('error setting state for queue processor', {
method: 'QueueProcessor:task',
site,
error: e,
});
return done(e);
}
});
}
if (err) {
Expand All @@ -60,7 +70,8 @@ function setupZkStatusNode(zkClient, site, done) {
});
return done(err);
}
return done(null, true);
// paused is false
return done(null, false);
});
}

Expand Down Expand Up @@ -92,7 +103,7 @@ async.series([
});
zkClient.once('ready', () => {
zkClient.removeAllListeners('error');
const path = getCRRStatusZkPath();
const path = getCRRStateZkPath();
zkClient.mkdirp(path, err => {
if (err) {
log.fatal('could not create path in zookeeper', {
Expand Down Expand Up @@ -146,7 +157,7 @@ async.series([
activeQProcessors[site] = new QueueProcessor(
zkClient, kafkaConfig, sourceConfig, destConfig,
repConfig, redisConfig, metricsProducer, site);
setupZkStatusNode(zkClient, site, (err, paused) => {
setupZkSiteNode(zkClient, site, (err, paused) => {
if (err) {
return next(err);
}
Expand All @@ -173,7 +184,7 @@ async.series([
activeQProcessors[site] = new QueueProcessor(zkClient,
kafkaConfig, sourceConfig, destConfig, repConfig,
redisConfig, metricsProducer, site);
return setupZkStatusNode(zkClient, site, (err, paused) => {
return setupZkSiteNode(zkClient, site, (err, paused) => {
if (err) {
return next(err);
}
Expand Down
14 changes: 9 additions & 5 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const monitoringClient = require('../clients/monitoringHandler').client;
const INTERVAL = 300; // 5 minutes
const EXPIRY = 900; // 15 minutes

const ZK_CRR_STATUS_PATH = '/status';
const ZK_CRR_STATE_PATH = '/state';

/**
* Class representing Backbeat API endpoints and internals
Expand Down Expand Up @@ -600,14 +600,18 @@ class BackbeatAPI {
const statuses = {};
async.each(sites, (site, next) => {
const path =
`${zookeeperReplicationNamespace}${ZK_CRR_STATUS_PATH}/${site}`;
`${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}/${site}`;
this._zkClient.getData(path, (err, data) => {
if (err) {
return next(err);
}
statuses[site] = data.toString() === 'true' ?
'enabled' : 'disabled';
return next();
try {
const d = JSON.parse(data.toString());
statuses[site] = d.paused ? 'disabled' : 'enabled';
return next();
} catch (e) {
return next(e);
}
});
}, error => {
if (error) {
Expand Down

0 comments on commit 0bb1482

Please sign in to comment.