Skip to content

Commit

Permalink
added minimum key depth paramter resolves terascope#551
Browse files Browse the repository at this point in the history
  • Loading branch information
jsnoble committed Sep 18, 2017
1 parent 331a889 commit 1208996
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/ops-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ size | The limit to the number of docs pulled in a chunk, if the number of docs
full_response | If set to true, it will return the native response from elasticsearch with all meta-data included. If set to false it will return an array of the actual documents, no meta data included | Boolean | optional, defaults to false
key_type | Used to specify the key type of the \_ids of the documents being queryed | String | optional, defaults to elasticsearch id generator (base64url)
key_range | if provided, slicer will only recurse on these given keys | Array | optional
starting_key_depth | if provided, slicer will only produce keys with minimum length determined by this setting | Number | optional
fields | Used to restrict what is returned from elasticsearch. If used, only these fields on the documents are returned | Array | optional |
query | specify any valid lucene query for elasticsearch to use in filtering| String | optional |

Expand Down
10 changes: 5 additions & 5 deletions integration-tests/spec/cases/data/id-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ module.exports = function() {
});

it('should support reindexing by hex id', function(done) {
var job_spec = misc.newJob('id')
job_spec.name = 'reindex by hex id'
job_spec.operations[0].key_type = 'hexadecimal'
job_spec.operations[0].index = 'example-logs-10000-hex'
job_spec.operations[1].index = "test-hexadecimal-logs"
var job_spec = misc.newJob('id');
job_spec.name = 'reindex by hex id';
job_spec.operations[0].key_type = 'hexadecimal';
job_spec.operations[0].index = 'example-logs-10000-hex';
job_spec.operations[1].index = "test-hexadecimal-logs";

teraslice.jobs.submit(job_spec)
.then(function(job) {
Expand Down
16 changes: 16 additions & 0 deletions lib/readers/id_reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ function schema() {
}
}
},
starting_key_depth: {
doc: 'if provided, slicer will only produce keys with minimum length determined by this setting',
default: null,
format: function(val) {
if (val) {
if (isNaN(val)) {
throw new Error('starting_key_depth parameter for id_reader must be a number')
}
else {
if (val <= 0) {
throw new Error('starting_key_depth parameter for id_reader must be greater than zero')
}
}
}
}
},
query: {
doc: 'You may place a lucene query here, and the slicer will use it when making slices',
default: "",
Expand Down
31 changes: 30 additions & 1 deletion lib/readers/id_slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {
var keyRange = opConfig.key_range;
var baseKeyArray = getKeyArray(opConfig);
var keyArray = opConfig.key_range ? opConfig.key_range : baseKeyArray.slice();
var starting_key_depth = opConfig.starting_key_depth;
var elasticsearch = require('elasticsearch_api')(client, logger, opConfig);
var numOfRetries = job.max_retries;
var retry = {};
Expand Down Expand Up @@ -125,6 +126,25 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {
return
}

function* recurseDepth(baseArray, str) {
for (let key of baseArray) {
let newStr = str + key;

if (newStr.length >= starting_key_depth) {
let resp = yield newStr;

if (!resp) {
yield* recurse(baseArray, newStr)
}
}
else {
yield* recurse(baseArray, newStr)
}

}
return
}

function* generateKeys(baseArray, keyArray) {
for (let startKey of keyArray) {
let processKey = yield startKey;
Expand All @@ -137,6 +157,14 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {
return null
}

function* generateKeyDepth(baseArray, keyArray) {
for (let startKey of keyArray) {
yield *recurseDepth(baseArray, startKey)
}

return null
}

function divideKeyArray(keyArray, num) {
let results = [];
let len = num;
Expand All @@ -155,7 +183,8 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {
}

function keyGenerator(baseArray, keyArray, retryKey, range) {
let gen = generateKeys(baseArray, keyArray);
//if there is a starting depth, use the key depth generator, if not use default generator
let gen = starting_key_depth ? generateKeyDepth(baseArray, keyArray) : generateKeys(baseArray, keyArray);
let closePath = false;

if (retryKey) {
Expand Down
48 changes: 43 additions & 5 deletions spec/readers/id_slicer-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('id_reader', function() {
}
}
},
getEventEmitter: function(){
getEventEmitter: function() {
return eventEmitter;
}
},
Expand Down Expand Up @@ -171,6 +171,44 @@ describe('id_reader', function() {

});

it('it produces values starting at a specific depth', function(done) {
var retryData = [];
var job1 = {
jobConfig: {
slicers: 1,
operations: [{
_op: 'id_reader',
type: 'events-',
key_type: 'hexadecimal',
key_range: ['a', 'b', 'c', 'd'],
starting_key_depth: 3,
size: 200
}]
}
};

var slicer = id_reader.newSlicer(context, job1, retryData, slicerAnalytics, logger);

Promise.resolve(slicer)
.then(function(slicers) {
return Promise.resolve(slicers[0]())
.then(function(results) {
expect(results).toEqual({count: 100, key: 'events-#a00*'});
return Promise.resolve(slicers[0]())
})
.then(function(results) {
expect(results).toEqual({count: 100, key: 'events-#a01*'});

return Promise.resolve(slicers[0]())
})
.then(function(results) {
expect(results).toEqual({count: 100, key: 'events-#a02*'});
done();
});
})

});

it('key range gets divided up by number of slicers', function(done) {
var retryData = [];
var job1 = {
Expand Down Expand Up @@ -254,11 +292,11 @@ describe('id_reader', function() {
})

})

});

it('can return to previous position', function(done) {
var retryData = [{lastSlice: {key:'events-#a6*'}}];
var retryData = [{lastSlice: {key: 'events-#a6*'}}];
var job1 = {
jobConfig: {
slicers: 1,
Expand All @@ -273,7 +311,7 @@ describe('id_reader', function() {
};

var slicer = id_reader.newSlicer(context, job1, retryData, slicerAnalytics, logger);

Promise.resolve(slicer)
.then(function(slicers) {
return Promise.resolve(slicers[0]())
Expand All @@ -299,6 +337,6 @@ describe('id_reader', function() {
})
})
});


});

0 comments on commit 1208996

Please sign in to comment.