From e99f418de668d0292efa48ed92549ec357091d14 Mon Sep 17 00:00:00 2001 From: Jared Noble Date: Fri, 15 Sep 2017 14:13:03 -0700 Subject: [PATCH 1/2] fixed returns on retries to not immediatly complete resolves #549 --- .../elasticsearch_date_range/slicer.js | 32 +++---------------- lib/readers/id_slicer.js | 25 ++------------- lib/utils/error_utils.js | 25 ++++++++++++++- 3 files changed, 32 insertions(+), 50 deletions(-) diff --git a/lib/readers/elasticsearch_date_range/slicer.js b/lib/readers/elasticsearch_date_range/slicer.js index b0091810f84..94f8c8cbe70 100644 --- a/lib/readers/elasticsearch_date_range/slicer.js +++ b/lib/readers/elasticsearch_date_range/slicer.js @@ -8,14 +8,16 @@ var dateOptions = require('./../../utils/date_utils').dateOptions; var dateFormatMS = require('./../../utils/date_utils').dateFormat; var dateFormatS = require('./../../utils/date_utils').dateFormatSeconds; var parseError = require('../../utils/error_utils').parseError; +var retryModule = require('../../utils/error_utils').retryModule; function newSlicer(context, opConfig, job, retryData, logger, client) { var events = context.foundation.getEventEmitter(); var jobConfig = job.jobConfig; var isPersistent = jobConfig.lifecycle === 'persistent'; var slicers = []; - var numOfRetries = job.max_retries; var time_resolution = dateOptions(opConfig.time_resolution); + var retryError = retryModule(logger, job.max_retries); + var dateFormat = time_resolution === 'ms' ? dateFormatMS : dateFormatS; @@ -293,26 +295,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) { return getIdData(idSubslicer) } - function retryError(retry, dateObj, err, fn, msg) { - var errMessage = parseError(err); - logger.error('error while getting next slice', errMessage); - var startKey = dateObj.start.format(dateFormat); - - if (!retry[startKey]) { - retry[startKey] = 1; - fn(msg) - } - else { - retry[startKey] += 1; - if (retry[startKey] > numOfRetries) { - return Promise.reject(`max_retries met for slice, start: ${startKey}`, errMessage); - } - else { - fn(msg) - } - } - } - function nextChunk(opConfig, client, jobConfig, dates, slicer_id, retryData) { var shouldDivideByID = opConfig.subslice_by_key; var threshold = opConfig.subslice_key_threshold; @@ -329,7 +311,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) { dateParams.end = moment(dateParams.start.format(dateFormat)).add(dateParams.interval[0], dateParams.interval[1]); logger.debug('all date configurations for date slicer', dateParams); //used to keep track of retried queries - var retry = {}; return function sliceDate(msg) { if (dateParams.start.isSameOrAfter(dateParams.limit)) { @@ -368,7 +349,7 @@ function newSlicer(context, opConfig, job, retryData, logger, client) { } }) .catch(function(err) { - return retryError(retry, dateParams, err, sliceDate, msg) + return retryError(dateParams.start.format(dateFormat), err, sliceDate, msg) }) } }; @@ -436,9 +417,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) { logger.debug('all date configurations for date slicer', dateParams); - //used to keep track of retried queries - var retry = {}; - //set a timer to add the next set it should process setInterval(function() { //keep a list of next batches in cases current batch is still running @@ -495,7 +473,7 @@ function newSlicer(context, opConfig, job, retryData, logger, client) { } } .catch(function(err) { - return retryError(retry, dateParams, err, sliceDate, msg) + return retryError(dateParams.start.format(dateFormat), err, sliceDate, msg) }) ); } diff --git a/lib/readers/id_slicer.js b/lib/readers/id_slicer.js index 31f00e1891c..4572c947c71 100644 --- a/lib/readers/id_slicer.js +++ b/lib/readers/id_slicer.js @@ -2,6 +2,7 @@ var _ = require('lodash'); var parseError = require('../utils/error_utils').parseError; +var retryModule = require('../utils/error_utils').retryModule; var base64url = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', @@ -18,27 +19,7 @@ module.exports = function(client, job, opConfig, logger, retryData, range) { var baseKeyArray = getKeyArray(opConfig); var keyArray = opConfig.key_range ? opConfig.key_range : baseKeyArray.slice(); var elasticsearch = require('elasticsearch_api')(client, logger, opConfig); - var numOfRetries = job.max_retries; - var retry = {}; - - function retryKey(key, err, fn, msg) { - var errMessage = parseError(err); - logger.error('error while getting next slice', errMessage); - - if (!retry[key]) { - retry[key] = 1; - fn(msg) - } - else { - retry[key] += 1; - if (retry[key] > numOfRetries) { - return Promise.reject(`max_retries met for slice, key: ${key}`, errMessage); - } - else { - fn(msg) - } - } - } + var retryError = retryModule(logger, job.max_retries); function getCountForKey(query) { return elasticsearch.search(query); @@ -106,7 +87,7 @@ module.exports = function(client, job, opConfig, logger, retryData, range) { }) .catch(function(err) { - return retryKey(key, err, getKeySlice, query) + return retryError(key, err, getKeySlice, query) }) } diff --git a/lib/utils/error_utils.js b/lib/utils/error_utils.js index cb98850a65a..3322060766c 100644 --- a/lib/utils/error_utils.js +++ b/lib/utils/error_utils.js @@ -17,6 +17,29 @@ function parseError(err) { return err.response ? err.response : err; } +function retryModule(logger, numOfRetries) { + let retry = {}; + return function(key, err, fn, msg) { + var errMessage = parseError(err); + logger.error('error while getting next slice', errMessage); + + if (!retry[key]) { + retry[key] = 1; + return fn(msg) + } + else { + retry[key] += 1; + if (retry[key] > numOfRetries) { + return Promise.reject(`max_retries met for slice, key: ${key}`, errMessage); + } + else { + return fn(msg) + } + } + } +} + module.exports = { - parseError: parseError + parseError: parseError, + retryModule: retryModule }; \ No newline at end of file From f0ddf4bcb33ddbcddf4a398904aac0988cc8846f Mon Sep 17 00:00:00 2001 From: Jared Noble Date: Mon, 18 Sep 2017 09:38:56 -0700 Subject: [PATCH 2/2] added test for retry change --- spec/readers/id_slicer-spec.js | 67 +++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/spec/readers/id_slicer-spec.js b/spec/readers/id_slicer-spec.js index 5e3ad51120e..40347e3643a 100644 --- a/spec/readers/id_slicer-spec.js +++ b/spec/readers/id_slicer-spec.js @@ -8,6 +8,7 @@ var eventEmitter = new events.EventEmitter(); describe('id_reader', function() { var clientData; + var makeSearchFailure = false; beforeEach(function() { clientData = [{hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}, {hits: {total: 100}}]; @@ -19,20 +20,26 @@ describe('id_reader', function() { return { client: { search: function() { + var metaData = {_shards: {failed: 0}}; + if (makeSearchFailure) { + metaData._shards.failed = 1; + metaData._shards.failures = [{reason: {type: 'some Error'}}]; + makeSearchFailure = false; + } + if (clientData.length > 1) { var data = clientData.shift(); - return Promise.resolve( - Object.assign({}, data, {_shards: {failed: 0}})) + Object.assign({}, data, metaData)) } else { - return Promise.resolve(Object.assign({}, clientData[0], {_shards: {failed: 0}})); + return Promise.resolve(Object.assign({}, clientData[0], metaData)); } } } } }, - getEventEmitter: function(){ + getEventEmitter: function() { return eventEmitter; } }, @@ -101,7 +108,6 @@ describe('id_reader', function() { }).toThrowError(errorStr3); }); - // context, job, retryData, slicerAnalytics, logger it('can create multiple slicers', function(done) { var retryData = []; @@ -171,6 +177,49 @@ describe('id_reader', function() { }); + it('it produces values even with an initial search error', function(done) { + var retryData = []; + var job1 = { + jobConfig: { + slicers: 1, + operations: [{ + _op: 'id_reader', + type: 'events-', + key_type: 'hexadecimal', + key_range: ['a', 'b'], + size: 200 + }] + } + }; + + var slicer = id_reader.newSlicer(context, job1, retryData, slicerAnalytics, logger); + + Promise.resolve(slicer) + .then(function(slicers) { + makeSearchFailure = true; + return Promise.resolve(slicers[0]()) + .then(function(results) { + expect(results).toBeDefined(); + expect(results).toEqual({count: 100, key: 'events-#a*'}); + makeSearchFailure = false; + return Promise.resolve(slicers[0]()) + }) + .then(function(results) { + expect(results).toEqual({count: 100, key: 'events-#b*'}); + + return Promise.resolve(slicers[0]()) + }) + .then(function(results) { + expect(results).toEqual(null); + done(); + }) + .catch(function(err) { + fail(err) + }); + }) + + }); + it('key range gets divided up by number of slicers', function(done) { var retryData = []; var job1 = { @@ -254,11 +303,11 @@ describe('id_reader', function() { }) }) - + }); it('can return to previous position', function(done) { - var retryData = [{lastSlice: {key:'events-#a6*'}}]; + var retryData = [{lastSlice: {key: 'events-#a6*'}}]; var job1 = { jobConfig: { slicers: 1, @@ -273,7 +322,7 @@ describe('id_reader', function() { }; var slicer = id_reader.newSlicer(context, job1, retryData, slicerAnalytics, logger); - + Promise.resolve(slicer) .then(function(slicers) { return Promise.resolve(slicers[0]()) @@ -299,6 +348,6 @@ describe('id_reader', function() { }) }) }); - + });