Skip to content

Commit

Permalink
ft: ZENKO-583 add status check in processor, api
Browse files Browse the repository at this point in the history
  • Loading branch information
philipyoo committed Jun 28, 2018
1 parent 161f197 commit 2d3a30f
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 44 deletions.
3 changes: 2 additions & 1 deletion extensions/replication/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
const testIsOn = process.env.TEST_SWITCH === '1';

const constants = {
zookeeperReplicationNamespace: '/backbeat/replication',
zookeeperReplicationNamespace:
testIsOn ? '/backbeattest' : '/backbeat/replication',
proxyVaultPath: '/_/backbeat/vault',
proxyIAMPath: '/_/backbeat/iam',
metricsExtension: 'crr',
Expand Down
106 changes: 98 additions & 8 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'; // eslint-disable-line

const async = require('async');
const http = require('http');
const { EventEmitter } = require('events');
const Redis = require('ioredis');
Expand All @@ -21,6 +22,9 @@ const EchoBucket = require('../tasks/EchoBucket');
const ObjectQueueEntry = require('../utils/ObjectQueueEntry');
const BucketQueueEntry = require('../utils/BucketQueueEntry');

const { zookeeperReplicationNamespace } = require('../constants');
const ZK_CRR_STATE_PATH = '/state';

const {
proxyVaultPath,
proxyIAMPath,
Expand All @@ -37,9 +41,9 @@ class QueueProcessor extends EventEmitter {
* entries to a target S3 endpoint.
*
* @constructor
* @param {Object} zkConfig - zookeeper configuration object
* @param {node-zookeeper-client.Client} zkClient - zookeeper client
* @param {Object} kafkaConfig - kafka configuration object
* @param {string} kafkaConfig.hosts - list of kafka brokers
* @param {String} kafkaConfig.hosts - list of kafka brokers
* as "host:port[,host:port...]"
* @param {Object} sourceConfig - source S3 configuration
* @param {Object} sourceConfig.s3 - s3 endpoint configuration object
Expand All @@ -59,9 +63,10 @@ class QueueProcessor extends EventEmitter {
* @param {MetricsProducer} mProducer - instance of metrics producer
* @param {String} site - site name
*/
constructor(zkConfig, kafkaConfig, sourceConfig, destConfig, repConfig,
constructor(zkClient, kafkaConfig, sourceConfig, destConfig, repConfig,
redisConfig, mProducer, site) {
super();
this.zkClient = zkClient;
this.kafkaConfig = kafkaConfig;
this.sourceConfig = sourceConfig;
this.destConfig = destConfig;
Expand Down Expand Up @@ -203,6 +208,12 @@ class QueueProcessor extends EventEmitter {
this.accountCredsCache = {};
}

/**
* Setup the Redis Subscriber which listens for actions from other processes
* (i.e. BackbeatAPI for pause/resume)
* @param {object} redisConfig - redis configs
* @return {undefined}
*/
_setupRedis(redisConfig) {
// redis pub/sub for pause/resume
const redis = new Redis(redisConfig);
Expand Down Expand Up @@ -242,17 +253,93 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
_pauseService() {
this._consumer.pause(this.site);
this.logger.info(`paused replication for location: ${this.site}`);
const enabled = this._consumer.getServiceStatus();
if (enabled) {
this._consumer.pause(this.site);
this._updateZkServiceStatus();
this.logger.info(`paused replication for location: ${this.site}`);
}
}

/**
* Resume replication consumers
* @return {undefined}
*/
_resumeService() {
this._consumer.resume(this.site);
this.logger.info(`resumed replication for location: ${this.site}`);
const enabled = this._consumer.getServiceStatus();
if (!enabled) {
this._consumer.resume(this.site);
this._updateZkServiceStatus();
this.logger.info(`resumed replication for location: ${this.site}`);
}
}

_getZkSiteNode() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}/` +
`${this.site}`;
}

/**
* Update Kafka consumer status in zookeeper for this site-defined
* QueueProcessor
* @return {undefined}
*/
_updateZkServiceStatus() {
const path = this._getZkSiteNode();
const enabled = this._consumer.getServiceStatus();
async.waterfall([
next => this.zkClient.getData(path, (err, data) => {
if (err) {
const error = {
message: 'could not get state from zookeeper',
error: err.message,
};
return next(error);
}
try {
const state = JSON.parse(data.toString());
// set revised status
state.paused = !enabled;
return next(null, JSON.stringify(state));
} catch (e) {
const error = {
message: 'could not parse status data from zookeeper',
error: e,
};
return next(error);
}
}),
(state, next) => this.zkClient.setData(path, Buffer.from(state),
err => {
if (err) {
const error = {
message: 'could not save status data in zookeeper',
error: err,
};
return next(error);
}
return next();
}),
], e => {
if (e) {
// if zookeeper state could not be set, then BackbeatConsumer
// should reflect that state, and revert any changes
const paused = !enabled;
if (paused) {
// need to re-enable
this._consumer.resume(this.site);
} else {
// need to disable
this._consumer.pause(this.site);
}
this.logger.error(`${e.message}`, {
method: 'QueueProcessor._updateZkServiceStatus',
zookeeperPath: path,
error: e.error,
paused: this._consumer.getServiceStatus(),
});
}
});
}

getStateVars() {
Expand Down Expand Up @@ -282,6 +369,7 @@ class QueueProcessor extends EventEmitter {
* @param {boolean} [options.disableConsumer] - true to disable
* startup of consumer (for testing: one has to call
* processQueueEntry() explicitly)
* @param {boolean} [options.paused] - if true, kafka consumer is paused
* @return {undefined}
*/
start(options) {
Expand Down Expand Up @@ -314,7 +402,9 @@ class QueueProcessor extends EventEmitter {
});
this._consumer.on('ready', () => {
consumerReady = true;
this._consumer.subscribe();
const paused = options && options.paused;
this._consumer.subscribe(paused);

this.logger.info('queue processor is ready to consume ' +
'replication entries');
this.emit('ready');
Expand Down
145 changes: 131 additions & 14 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'; // eslint-disable-line

const async = require('async');
const werelogs = require('werelogs');

const QueueProcessor = require('./QueueProcessor');
Expand All @@ -8,9 +9,13 @@ const { initManagement } = require('../../../lib/management/index');
const { applyBucketReplicationWorkflows } = require('../management');
const { HealthProbeServer } = require('arsenal').network.probe;

const zkConfig = config.zookeeper;
const MetricsProducer = require('../../../lib/MetricsProducer');
const zookeeper = require('../../../lib/clients/zookeeper');

const { zookeeperReplicationNamespace } = require('../constants');
const ZK_CRR_STATE_PATH = '/state';

const zkConfig = config.zookeeper;
const kafkaConfig = config.kafka;
const repConfig = config.extensions.replication;
const sourceConfig = repConfig.source;
Expand All @@ -28,17 +33,107 @@ const healthServer = new HealthProbeServer({
port: config.healthcheckServer.port,
});

const activeQProcessors = {};
function getCRRStateZkPath() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}`;
}

/**
* On startup and when replication sites change, create necessary zookeeper
* status node to save persistent state.
* @param {node-zookeeper-client.Client} zkClient - zookeeper client
* @param {String} site - replication site name
* @param {Function} done - callback(error, status) where status is a boolean
* @return {undefined}
*/
function setupZkSiteNode(zkClient, site, done) {
const path = `${getCRRStateZkPath()}/${site}`;
const data = JSON.stringify({ paused: false });
zkClient.create(path, Buffer.from(data), err => {
if (err && err.name === 'NODE_EXISTS') {
return zkClient.getData(path, (err, data) => {
if (err) {
log.fatal('could not check site status in zookeeper',
{ method: 'QueueProcessor:task',
zookeeperPath: path,
error: err.message });
return done(err);
}
try {
const paused = JSON.parse(data.toString()).paused;
return done(null, paused);
} catch (e) {
log.fatal('error setting state for queue processor', {
method: 'QueueProcessor:task',
site,
error: e,
});
return done(e);
}
});
}
if (err) {
log.fatal('could not setup zookeeper node', {
method: 'QueueProcessor:task',
zookeeperPath: path,
error: err.message,
});
return done(err);
}
// paused is false
return done(null, false);
});
}

const metricsProducer = new MetricsProducer(kafkaConfig, mConfig);
metricsProducer.setupProducer(err => {
if (err) {
log.error('error starting metrics producer for queue processor', {
error: err,
method: 'MetricsProducer::setupProducer',
let zkClient;
async.series([
done => metricsProducer.setupProducer(err => {
if (err) {
log.fatal('error starting metrics producer for queue ' +
'processor', {
error: err,
method: 'MetricsProducer::setupProducer',
});
}
return done(err);
}),
done => {
const { connectionString, autoCreateNamespace } = zkConfig;
log.info('opening zookeeper connection for replication processors');
zkClient = zookeeper.createClient(connectionString, {
autoCreateNamespace,
});
return undefined;
zkClient.connect();
zkClient.once('error', err => {
log.fatal('error connecting to zookeeper', {
error: err.message,
});
return done(err);
});
zkClient.once('ready', () => {
zkClient.removeAllListeners('error');
const path = getCRRStateZkPath();
zkClient.mkdirp(path, err => {
if (err) {
log.fatal('could not create path in zookeeper', {
method: 'QueueProcessor:task',
zookeeperPath: path,
error: err.message,
});
return done(err);
}
return done();
});
});
},
], err => {
if (err) {
// error occurred at startup trying to start internal clients,
// fail immediately
process.exit(1);
}
const activeQProcessors = {};

function initAndStart() {
initManagement({
serviceName: 'replication',
Expand All @@ -65,29 +160,51 @@ metricsProducer.setupProducer(err => {
const updatedSites = destConfig.bootstrapList.map(i => i.site);
const allSites = [...new Set(activeSites.concat(updatedSites))];

allSites.forEach(site => {
async.each(allSites, (site, next) => {
if (updatedSites.includes(site)) {
if (!activeSites.includes(site)) {
activeQProcessors[site] = new QueueProcessor(
zkConfig, kafkaConfig, sourceConfig, destConfig,
zkClient, kafkaConfig, sourceConfig, destConfig,
repConfig, redisConfig, metricsProducer, site);
activeQProcessors[site].start();
setupZkSiteNode(zkClient, site, (err, paused) => {
if (err) {
return next(err);
}
activeQProcessors[site].start({ paused });
return next();
});
}
} else {
// this site is no longer in bootstrapList
activeQProcessors[site].stop(() => {});
delete activeQProcessors[site];
next();
}
}, err => {
if (err) {
process.exit(1);
}
});
});

// Start QueueProcessor for each site
const siteNames = bootstrapList.map(i => i.site);
siteNames.forEach(site => {
activeQProcessors[site] = new QueueProcessor(zkConfig,
async.each(siteNames, (site, next) => {
activeQProcessors[site] = new QueueProcessor(zkClient,
kafkaConfig, sourceConfig, destConfig, repConfig,
redisConfig, metricsProducer, site);
activeQProcessors[site].start();
return setupZkSiteNode(zkClient, site, (err, paused) => {
if (err) {
return next(err);
}
activeQProcessors[site].start({ paused });
return next();
});
}, err => {
if (err) {
// already logged error in prior function calls
process.exit(1);
}
});
healthServer.onReadyCheck(() => {
let passed = true;
Expand Down
Loading

0 comments on commit 2d3a30f

Please sign in to comment.