Skip to content

Commit

Permalink
ft: ZENKO-235 add pause/resume api routes, tests
Browse files Browse the repository at this point in the history
  • Loading branch information
philipyoo committed Jun 27, 2018
1 parent 49cad88 commit f288387
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 16 deletions.
58 changes: 56 additions & 2 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'; // eslint-disable-line strict

const async = require('async');
const Redis = require('ioredis');
const zookeeper = require('node-zookeeper-client');

const { errors } = require('arsenal');
Expand Down Expand Up @@ -62,6 +63,10 @@ class BackbeatAPI {
this._internalStart = Date.now();
}

// Redis instance for publishing messages to BackbeatConsumers
this._redisPublisher = new Redis(this._redisConfig);

// Redis instance is used for getting/setting keys
this._redisClient = new RedisClient(this._redisConfig, this._logger);
// Redis expiry increased by an additional interval so we can reference
// the immediate older data for average throughput calculation
Expand Down Expand Up @@ -167,11 +172,14 @@ class BackbeatAPI {
addKeys.key = key;
addKeys.versionId = versionId;
}
if (bbRequest.getHTTPMethod() === 'GET' && type === 'all' &&
marker) {
if (marker) {
// this is optional, so doesn't matter if set or not
addKeys.marker = marker;
}
} else {
// currently only pause/resume
filteredRoutes = filteredRoutes.filter(r =>
rDetails.status === r.type);
}
}
// if rDetails has a type property
Expand Down Expand Up @@ -527,6 +535,52 @@ class BackbeatAPI {
return { reqBody };
}

/**
* Pause CRR operations for given site(s)
* @param {Object} details - The route details
* @param {String} body - The POST request body string
* @param {Function} cb - The callback to call
* @return {undefined}
*/
pauseCRRService(details, body, cb) {
let sites;
if (details.site === 'all') {
sites = details.extensions.crr.filter(s => s !== 'all');
} else {
sites = [details.site];
}
sites.forEach(site => {
const channel = `${this._crrTopic}-${site}`;
const message = JSON.stringify({ action: 'pauseService' });
this._redisPublisher.publish(channel, message);
});
this._logger.info(`replication service paused for locations: ${sites}`);
return cb(null, {});
}

/**
* Resume CRR operations for given site(s)
* @param {Object} details - The route details
* @param {String} body - The POST request body string
* @param {Function} cb - The callback to call
* @return {undefined}
*/
resumeCRRService(details, body, cb) {
let sites;
if (details.site === 'all') {
sites = details.extensions.crr.filter(s => s !== 'all');
} else {
sites = [details.site];
}
sites.forEach(site => {
const channel = `${this._crrTopic}-${site}`;
const message = JSON.stringify({ action: 'resumeService' });
this._redisPublisher.publish(channel, message);
});
this._logger.info(`replication service resumed for locations ${sites}`);
return cb(null, {});
}

/**
* Setup internals
* @param {function} cb - callback(error)
Expand Down
26 changes: 17 additions & 9 deletions lib/api/BackbeatRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ class BackbeatRequest {
}

/**
* Parse the route details for any of the retry routes.
* Parse the route details for any of the crr routes (retry, pause, resume).
* @param {Array} parts - The route schema split by '/'
* @param {String} query - The query string.
* @return {undefined}
*/
_parseRetryRoutes(parts, query) {
this._routeDetails.extension = parts[0];
this._routeDetails.status = parts[1];
this._routeDetails.bucket = parts[2];
this._routeDetails.key = parts[3];
this._routeDetails.versionId = parts[4];
this._routeDetails.marker = querystring.parse(query).marker;
_parseCRRRoutes(parts, query) {
if (parts[1] && parts[1] === 'failed') {
this._routeDetails.extension = parts[0];
this._routeDetails.status = parts[1];
this._routeDetails.bucket = parts[2];
this._routeDetails.key = parts[3];
this._routeDetails.versionId = parts[4];
this._routeDetails.marker = querystring.parse(query).marker;
} else {
// for now: pause/resume
this._routeDetails.extension = parts[0];
this._routeDetails.status = parts[1];
this._routeDetails.site = parts[2] || 'all';
}
}

/**
Expand Down Expand Up @@ -81,8 +88,9 @@ class BackbeatRequest {
const { pathname, query } = url.parse(this._route);
const parts = pathname ? pathname.split('/') : [];

// crr retry/pause/resume routes
if (parts[0] === 'crr') {
this._parseRetryRoutes(parts, query);
this._parseCRRRoutes(parts, query);
} else if (parts[0] === 'metrics') {
this._parseMetricsRoutes(parts);
} else if (parts[0] === 'monitoring') {
Expand Down
137 changes: 133 additions & 4 deletions tests/functional/api/BackbeatServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,13 @@ describe('Backbeat Server', () => {
done();
});

after(() => {
after(done => {
redis.keys('*:test:bb:*').then(keys => {
const pipeline = redis.pipeline();
keys.forEach(key => {
pipeline.del(key);
});
return pipeline.exec();
pipeline.exec(done);
});
});

Expand Down Expand Up @@ -467,13 +467,13 @@ describe('Backbeat Server', () => {
});

describe('No metrics data in Redis', () => {
before(() => {
before(done => {
redis.keys('*:test:bb:*').then(keys => {
const pipeline = redis.pipeline();
keys.forEach(key => {
pipeline.del(key);
});
return pipeline.exec();
pipeline.exec(done);
});
});

Expand Down Expand Up @@ -917,4 +917,133 @@ describe('Backbeat Server', () => {
});
});
});

describe('CRR Pause/Resume service routes', () => {
let redis1;
let redis2;
let cache1 = [];
let cache2 = [];
let channel1;
let channel2;

const emptyBody = '';
const crrConfigs = config.extensions.replication;
const crrTopic = crrConfigs.topic;
const crrSites = crrConfigs.destination.bootstrapList.map(i =>
i.site);

before(() => {
redis1 = new Redis();
redis2 = new Redis();

channel1 = `${crrTopic}-${crrSites[0]}`;
redis1.subscribe(channel1, err => assert.ifError(err));
redis1.on('message', (channel, message) => {
cache1.push({ channel, message });
});

channel2 = `${crrTopic}-${crrSites[1]}`;
redis2.subscribe(channel2, err => assert.ifError(err));
redis2.on('message', (channel, message) => {
cache2.push({ channel, message });
});
});

afterEach(() => {
cache1 = [];
cache2 = [];
});

const validRequests = [
{ path: '/_/crr/pause', method: 'POST' },
{ path: '/_/crr/resume', method: 'POST' },
{ path: `/_/crr/resume/${crrSites[0]}`, method: 'POST' },
];
validRequests.forEach(entry => {
it(`should get a 200 response for route: ${entry.path}`, done => {
const options = Object.assign({}, defaultOptions, {
method: entry.method,
path: entry.path,
});
const req = http.request(options, res => {
assert.equal(res.statusCode, 200);
done();
});
req.end();
});
});

const invalidRequests = [
{ path: '/_/crr/pause/invalid-site', method: 'POST' },
{ path: '/_/crr/resume/invalid-site', method: 'POST' },
{ path: '/_/crr/resume/all', method: 'GET' },
];
invalidRequests.forEach(entry => {
it(`should get a 404 response for route: ${entry.path}`, done => {
const options = Object.assign({}, defaultOptions, {
method: entry.method,
path: entry.path,
});
const req = http.request(options, res => {
assert.equal(res.statusCode, 404);
assert.equal(res.statusMessage, 'Not Found');
assert.equal(cache1.length, 0);
assert.equal(cache2.length, 0);
done();
});
req.end();
});
});

it('should receive a pause request on all site channels from route ' +
'/_/crr/pause', done => {
const options = Object.assign({}, defaultOptions, {
method: 'POST',
path: '/_/crr/pause',
});
makePOSTRequest(options, emptyBody, err => {
assert.ifError(err);

setTimeout(() => {
assert.strictEqual(cache1.length, 1);
assert.strictEqual(cache2.length, 1);

assert.deepStrictEqual(cache1[0].channel, channel1);
assert.deepStrictEqual(cache2[0].channel, channel2);

const message1 = JSON.parse(cache1[0].message);
const message2 = JSON.parse(cache2[0].message);
const expected = { action: 'pauseService' };
assert.deepStrictEqual(message1, expected);
assert.deepStrictEqual(message2, expected);
done();
}, 1000);
});
});

it('should receive a resume request on all site channels from route ' +
'/_/crr/resume', done => {
const options = Object.assign({}, defaultOptions, {
method: 'POST',
path: '/_/crr/resume',
});
makePOSTRequest(options, emptyBody, err => {
assert.ifError(err);

setTimeout(() => {
assert.strictEqual(cache1.length, 1);
assert.strictEqual(cache2.length, 1);
assert.deepStrictEqual(cache1[0].channel, channel1);
assert.deepStrictEqual(cache2[0].channel, channel2);

const message1 = JSON.parse(cache1[0].message);
const message2 = JSON.parse(cache2[0].message);
const expected = { action: 'resumeService' };
assert.deepStrictEqual(message1, expected);
assert.deepStrictEqual(message2, expected);
done();
}, 1000);
});
});
});
});
5 changes: 5 additions & 0 deletions tests/unit/api/BackbeatAPI.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ describe('BackbeatAPI', () => {
// invalid params but will default to getting all buckets
{ url: '/_/crr/failed/mybucket', method: 'GET' },
{ url: '/_/crr/failed', method: 'POST' },
{ url: '/_/crr/pause', method: 'POST' },
{ url: '/_/crr/resume', method: 'POST' },
].forEach(request => {
const req = new BackbeatRequest(request);
const routeError = bbapi.findValidRoute(req);
Expand All @@ -47,9 +49,12 @@ describe('BackbeatAPI', () => {
{ url: '/_/metrics/crr/all/backlo', method: 'GET' },
{ url: '/_/metrics/crr/all/completionss', method: 'GET' },
{ url: '/_/invalid/crr/all', method: 'GET' },
{ url: '/_/metrics/pause/all', method: 'GET' },
// // invalid http verb
{ url: '/_/healthcheck', method: 'POST' },
{ url: '/_/monitoring/metrics', method: 'POST' },
{ url: '/_/crr/pause', method: 'GET' },
{ url: '/_/crr/resume', method: 'GET' },
].forEach(request => {
const req = new BackbeatRequest(request);
const routeError = bbapi.findValidRoute(req);
Expand Down
23 changes: 22 additions & 1 deletion tests/unit/api/BackbeatRequest.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,34 @@ describe('BackbeatRequest helper class', () => {
'details', () => {
const req = new BackbeatRequest({
url: '/_/monitoring/metrics',
method: 'GET',
method: 'POST',
});
const details = req.getRouteDetails();

assert.strictEqual(details.category, 'monitoring');
assert.strictEqual(details.type, 'metrics');
});

it('should parse crr pause/resume routes and store internally as ' +
'route details', () => {
const req = new BackbeatRequest({
url: '/_/crr/pause/mysite',
method: 'POST',
});
const details = req.getRouteDetails();

assert.strictEqual(details.extension, 'crr');
assert.strictEqual(details.status, 'pause');
assert.strictEqual(details.site, 'mysite');

const req2 = new BackbeatRequest({
url: '/_/crr/pause',
method: 'POST',
});
const details2 = req2.getRouteDetails();
// should default to 'all' if none specified
assert.strictEqual(details2.site, 'all');
});
});

it('should set route without prefix if valid route has valid prefix',
Expand Down

0 comments on commit f288387

Please sign in to comment.