Skip to content

Commit

Permalink
Merge pull request #550 from jsnoble/id_slicer_fix
Browse files Browse the repository at this point in the history
fixed returns on retries to not immediatly complete resolves #549
  • Loading branch information
godber authored Sep 18, 2017
2 parents 9085b8e + f0ddf4b commit 67b9571
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 59 deletions.
32 changes: 5 additions & 27 deletions lib/readers/elasticsearch_date_range/slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ var dateOptions = require('./../../utils/date_utils').dateOptions;
var dateFormatMS = require('./../../utils/date_utils').dateFormat;
var dateFormatS = require('./../../utils/date_utils').dateFormatSeconds;
var parseError = require('../../utils/error_utils').parseError;
var retryModule = require('../../utils/error_utils').retryModule;

function newSlicer(context, opConfig, job, retryData, logger, client) {
var events = context.foundation.getEventEmitter();
var jobConfig = job.jobConfig;
var isPersistent = jobConfig.lifecycle === 'persistent';
var slicers = [];
var numOfRetries = job.max_retries;
var time_resolution = dateOptions(opConfig.time_resolution);
var retryError = retryModule(logger, job.max_retries);


var dateFormat = time_resolution === 'ms' ? dateFormatMS : dateFormatS;

Expand Down Expand Up @@ -293,26 +295,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
return getIdData(idSubslicer)
}

function retryError(retry, dateObj, err, fn, msg) {
var errMessage = parseError(err);
logger.error('error while getting next slice', errMessage);
var startKey = dateObj.start.format(dateFormat);

if (!retry[startKey]) {
retry[startKey] = 1;
fn(msg)
}
else {
retry[startKey] += 1;
if (retry[startKey] > numOfRetries) {
return Promise.reject(`max_retries met for slice, start: ${startKey}`, errMessage);
}
else {
fn(msg)
}
}
}

function nextChunk(opConfig, client, jobConfig, dates, slicer_id, retryData) {
var shouldDivideByID = opConfig.subslice_by_key;
var threshold = opConfig.subslice_key_threshold;
Expand All @@ -329,7 +311,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
dateParams.end = moment(dateParams.start.format(dateFormat)).add(dateParams.interval[0], dateParams.interval[1]);
logger.debug('all date configurations for date slicer', dateParams);
//used to keep track of retried queries
var retry = {};

return function sliceDate(msg) {
if (dateParams.start.isSameOrAfter(dateParams.limit)) {
Expand Down Expand Up @@ -368,7 +349,7 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
}
})
.catch(function(err) {
return retryError(retry, dateParams, err, sliceDate, msg)
return retryError(dateParams.start.format(dateFormat), err, sliceDate, msg)
})
}
};
Expand Down Expand Up @@ -436,9 +417,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {

logger.debug('all date configurations for date slicer', dateParams);

//used to keep track of retried queries
var retry = {};

//set a timer to add the next set it should process
setInterval(function() {
//keep a list of next batches in cases current batch is still running
Expand Down Expand Up @@ -495,7 +473,7 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
}
}
.catch(function(err) {
return retryError(retry, dateParams, err, sliceDate, msg)
return retryError(dateParams.start.format(dateFormat), err, sliceDate, msg)
})
);
}
Expand Down
25 changes: 3 additions & 22 deletions lib/readers/id_slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

var _ = require('lodash');
var parseError = require('../utils/error_utils').parseError;
var retryModule = require('../utils/error_utils').retryModule;

var base64url = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
Expand All @@ -18,27 +19,7 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {
var baseKeyArray = getKeyArray(opConfig);
var keyArray = opConfig.key_range ? opConfig.key_range : baseKeyArray.slice();
var elasticsearch = require('elasticsearch_api')(client, logger, opConfig);
var numOfRetries = job.max_retries;
var retry = {};

function retryKey(key, err, fn, msg) {
var errMessage = parseError(err);
logger.error('error while getting next slice', errMessage);

if (!retry[key]) {
retry[key] = 1;
fn(msg)
}
else {
retry[key] += 1;
if (retry[key] > numOfRetries) {
return Promise.reject(`max_retries met for slice, key: ${key}`, errMessage);
}
else {
fn(msg)
}
}
}
var retryError = retryModule(logger, job.max_retries);

function getCountForKey(query) {
return elasticsearch.search(query);
Expand Down Expand Up @@ -106,7 +87,7 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {

})
.catch(function(err) {
return retryKey(key, err, getKeySlice, query)
return retryError(key, err, getKeySlice, query)
})
}

Expand Down
25 changes: 24 additions & 1 deletion lib/utils/error_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@ function parseError(err) {
return err.response ? err.response : err;
}

function retryModule(logger, numOfRetries) {
let retry = {};
return function(key, err, fn, msg) {
var errMessage = parseError(err);
logger.error('error while getting next slice', errMessage);

if (!retry[key]) {
retry[key] = 1;
return fn(msg)
}
else {
retry[key] += 1;
if (retry[key] > numOfRetries) {
return Promise.reject(`max_retries met for slice, key: ${key}`, errMessage);
}
else {
return fn(msg)
}
}
}
}

module.exports = {
parseError: parseError
parseError: parseError,
retryModule: retryModule
};
67 changes: 58 additions & 9 deletions spec/readers/id_slicer-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var eventEmitter = new events.EventEmitter();
describe('id_reader', function() {

var clientData;
var makeSearchFailure = false;

beforeEach(function() {
clientData = [{hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}];
Expand All @@ -19,20 +20,26 @@ describe('id_reader', function() {
return {
client: {
search: function() {
var metaData = {_shards: {failed: 0}};
if (makeSearchFailure) {
metaData._shards.failed = 1;
metaData._shards.failures = [{reason: {type: 'some Error'}}];
makeSearchFailure = false;
}

if (clientData.length > 1) {
var data = clientData.shift();

return Promise.resolve(
Object.assign({}, data, {_shards: {failed: 0}}))
Object.assign({}, data, metaData))
}
else {
return Promise.resolve(Object.assign({}, clientData[0], {_shards: {failed: 0}}));
return Promise.resolve(Object.assign({}, clientData[0], metaData));
}
}
}
}
},
getEventEmitter: function(){
getEventEmitter: function() {
return eventEmitter;
}
},
Expand Down Expand Up @@ -101,7 +108,6 @@ describe('id_reader', function() {
}).toThrowError(errorStr3);

});
// context, job, retryData, slicerAnalytics, logger

it('can create multiple slicers', function(done) {
var retryData = [];
Expand Down Expand Up @@ -171,6 +177,49 @@ describe('id_reader', function() {

});

it('it produces values even with an initial search error', function(done) {
var retryData = [];
var job1 = {
jobConfig: {
slicers: 1,
operations: [{
_op: 'id_reader',
type: 'events-',
key_type: 'hexadecimal',
key_range: ['a', 'b'],
size: 200
}]
}
};

var slicer = id_reader.newSlicer(context, job1, retryData, slicerAnalytics, logger);

Promise.resolve(slicer)
.then(function(slicers) {
makeSearchFailure = true;
return Promise.resolve(slicers[0]())
.then(function(results) {
expect(results).toBeDefined();
expect(results).toEqual({count: 100, key: 'events-#a*'});
makeSearchFailure = false;
return Promise.resolve(slicers[0]())
})
.then(function(results) {
expect(results).toEqual({count: 100, key: 'events-#b*'});

return Promise.resolve(slicers[0]())
})
.then(function(results) {
expect(results).toEqual(null);
done();
})
.catch(function(err) {
fail(err)
});
})

});

it('key range gets divided up by number of slicers', function(done) {
var retryData = [];
var job1 = {
Expand Down Expand Up @@ -254,11 +303,11 @@ describe('id_reader', function() {
})

})

});

it('can return to previous position', function(done) {
var retryData = [{lastSlice: {key:'events-#a6*'}}];
var retryData = [{lastSlice: {key: 'events-#a6*'}}];
var job1 = {
jobConfig: {
slicers: 1,
Expand All @@ -273,7 +322,7 @@ describe('id_reader', function() {
};

var slicer = id_reader.newSlicer(context, job1, retryData, slicerAnalytics, logger);

Promise.resolve(slicer)
.then(function(slicers) {
return Promise.resolve(slicers[0]())
Expand All @@ -299,6 +348,6 @@ describe('id_reader', function() {
})
})
});


});

0 comments on commit 67b9571

Please sign in to comment.