Skip to content

Commit

Permalink
(#127) - avoid lastSeq doc growing out of control
Browse files Browse the repository at this point in the history
  • Loading branch information
nolanlawson authored and calvinmetcalf committed Apr 22, 2014
1 parent 4ec3a2f commit b362637
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 53 deletions.
1 change: 1 addition & 0 deletions create-view.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module.exports = function (opts) {
return upsert(sourceDB, '_local/mrviews', diffFunction).then(function () {
return sourceDB.registerDependentDatabase(depDbName).then(function (res) {
var db = res.db;
db.auto_compaction = true;
var view = new View(depDbName, db, sourceDB, mapFun, reduceFun);
return view.db.get('_local/lastSeq').then(null, function (err) {
if (err.name === 'not_found') {
Expand Down
121 changes: 68 additions & 53 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ function tryCode(db, fun, args) {
}
}

function sortByKeyThenValue(x, y) {
var keyCompare = collate(x.key, y.key);
return keyCompare !== 0 ? keyCompare : collate(x.value, y.value);
}

function sliceResults(results, limit, skip) {
skip = skip || 0;
if (typeof limit === 'number') {
Expand Down Expand Up @@ -234,49 +239,59 @@ function defaultsTo(value) {
throw reason;
};
}
function saveKeyValues(view, indexableKeysToKeyValues, docId, seq) {
function saveKeyValues(view, docIdsToEmits, seq) {
return view.db.get('_local/lastSeq')
.then(null, defaultsTo({_id: '_local/lastSeq', seq: 0}))
.then(function (lastSeqDoc) {
return view.db.get('_local/doc_' + docId)
.then(null, defaultsTo({_id : '_local/doc_' + docId, keys : []}))
.then(function (metaDoc) {
return view.db.allDocs({keys : metaDoc.keys, include_docs : true}).then(function (res) {
var kvDocs = res.rows.map(function (row) {
return row.doc;
}).filter(function (row) {
return row;
});

var oldKeysMap = {};
kvDocs.forEach(function (kvDoc) {
oldKeysMap[kvDoc._id] = true;
kvDoc._deleted = !indexableKeysToKeyValues[kvDoc._id];
if (!kvDoc._deleted) {
kvDoc.value = indexableKeysToKeyValues[kvDoc._id];
}
});

var newKeys = Object.keys(indexableKeysToKeyValues);
newKeys.forEach(function (key) {
if (!oldKeysMap[key]) {
// new doc
kvDocs.push({
_id : key,
value : indexableKeysToKeyValues[key]
return Promise.all(Object.keys(docIdsToEmits).map(function (docId) {
return view.db.get('_local/doc_' + docId)
.then(null, defaultsTo({_id : '_local/doc_' + docId, keys : []}))
.then(function (metaDoc) {
return view.db.allDocs({keys : metaDoc.keys, include_docs : true}).then(function (res) {
var kvDocs = res.rows.map(function (row) {
return row.doc;
}).filter(function (row) {
return row;
});

var indexableKeysToKeyValues = docIdsToEmits[docId];
var oldKeysMap = {};
kvDocs.forEach(function (kvDoc) {
oldKeysMap[kvDoc._id] = true;
kvDoc._deleted = !indexableKeysToKeyValues[kvDoc._id];
if (!kvDoc._deleted) {
kvDoc.value = indexableKeysToKeyValues[kvDoc._id];
}
});

var newKeys = Object.keys(indexableKeysToKeyValues);
newKeys.forEach(function (key) {
if (!oldKeysMap[key]) {
// new doc
kvDocs.push({
_id : key,
value : indexableKeysToKeyValues[key]
});
}
});
metaDoc.keys = utils.uniq(newKeys.concat(metaDoc.keys));
kvDocs.push(metaDoc);

return kvDocs;
});
}
});
})).then(function (listOfDocsToPersist) {
var docsToPersist = [];
listOfDocsToPersist.forEach(function (docList) {
docsToPersist = docsToPersist.concat(docList);
});
metaDoc.keys = utils.uniq(newKeys.concat(metaDoc.keys));
kvDocs.push(metaDoc);

lastSeqDoc.seq = seq;
kvDocs.push(lastSeqDoc);
docsToPersist.push(lastSeqDoc);

return view.db.bulkDocs({docs : kvDocs});
return view.db.bulkDocs({docs : docsToPersist});
});
});
});
}

var updateView = utils.sequentialize(mainQueue, function (view) {
Expand Down Expand Up @@ -305,26 +320,9 @@ var updateView = utils.sequentialize(mainQueue, function (view) {

var currentSeq = view.seq || 0;

function processChange(currentDoc, seq) {
function processChange(docIdsToEmits, seq) {
return function () {
mapResults = [];
doc = currentDoc;

if (!doc._deleted) {
tryCode(view.sourceDB, mapFun, [doc]);
}
mapResults.sort(function (x, y) {
var keyCompare = collate(x.key, y.key);
return keyCompare !== 0 ? keyCompare : collate(x.value, y.value);
});
var indexableKeysToKeyValues = {};
mapResults.forEach(function (o, pos) {
indexableKeysToKeyValues[toIndexableString([o.key, o.id, pos])] = o;
});
return saveKeyValues(view, indexableKeysToKeyValues, doc._id, seq)
.then(function () {
currentSeq = Math.max(currentSeq, seq);
});
return saveKeyValues(view, docIdsToEmits, seq);
};
}
var queue = new TaskQueue();
Expand All @@ -350,12 +348,29 @@ var updateView = utils.sequentialize(mainQueue, function (view) {
if (!results.length) {
return complete();
}
var docIdsToEmits = {};
for (var i = 0, l = results.length; i < l; i++) {
var change = results[i];
if (change.doc._id[0] !== '_') {
queue.add(processChange(change.doc, change.seq));
mapResults = [];
doc = change.doc;

if (!doc._deleted) {
tryCode(view.sourceDB, mapFun, [doc]);
}
mapResults.sort(sortByKeyThenValue);

var indexableKeysToKeyValues = {};
for (var j = 0, jl = mapResults.length; j < jl; j++) {
var obj = mapResults[j];
var indexableKey = toIndexableString([obj.key, obj.id, j]);
indexableKeysToKeyValues[indexableKey] = obj;
}
docIdsToEmits[change.doc._id] = indexableKeysToKeyValues;
}
}
queue.add(processChange(docIdsToEmits, response.last_seq));
currentSeq = response.last_seq;
if (results.length < CHANGES_BATCH_SIZE) {
return complete();
}
Expand Down

0 comments on commit b362637

Please sign in to comment.