Skip to content

Commit

Permalink
fix: Break down large PouchDB.bulkDocs calls
Browse files Browse the repository at this point in the history
  Large `bulkDocs` calls (i.e. with thousands of documents to save at
  once) can be the source of crashes.

  We're not sure if this comes from PouchDB or the logger calls with
  each document but we've figured that creating smaller batches of
  documents, logging them all at once and then saving the batch in
  PouchDB prevents crashes.

  It should also result in less I/O since we're making a lot less logger
  calls.
  To accommodate for these logging changes, we update our `jq` `path`
  filter so it looks into batches for documents with the expected path.
  • Loading branch information
taratatach committed Dec 10, 2024
1 parent 74abd19 commit 797af62
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 23 deletions.
20 changes: 16 additions & 4 deletions .jq
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,24 @@ def issues: clean | select(is_issue);
def path(pattern):
clean | select(
(
try (.path // (.doc | .path) // (.change | .doc | .path) // (.idConflict | .newDoc | .path) // (.event | .path)),
try (.oldpath // (.was | .path) // (.idConflict | .existingDoc | .path) // (.event | .oldPath)),
try (
.path //
(.doc | .path) //
(.idConflict | .newDoc | .path) //
(.event | .path) //
(.docs | arrays | .[].path) //
(.batch | arrays | .[].path)
),
try (
.oldpath //
(.was | .path) //
(.idConflict | .existingDoc | .path) //
(.event | .oldPath)
),
""
)
| strings
| test(pattern)
| strings
| test(pattern)
);

def shortPath:
Expand Down
67 changes: 48 additions & 19 deletions core/pouch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const log = logger({
component: 'Pouch'
})

const POUCHDB_BATCH_SIZE = 1000

// Pouchdb is used to store all the metadata about files and folders.
// These metadata can come from the local filesystem or the remote cozy instance.
//
Expand Down Expand Up @@ -193,15 +195,23 @@ class Pouch {
// This method will completely erase an array of records from PouchDB while
// removing all their attributes.
// This method also does not care about invariants like `bulkDocs()` does.
async eraseDocuments(docs /*: Array<SavedMetadata> */) {
const docsToErase = []
docs.forEach(doc => {
const { path, _id, _rev } = _.clone(doc)
log.info('Erasing bulk record...', { path, _id, _rev })
docsToErase.push({ _id, _rev, _deleted: true })
})
async eraseDocuments(
docs /*: $ReadOnlyArray<SavedMetadata> */
) /*: Promise<Array<PouchRecord>> */ {
let results = []
for (const batch of createBatches(docs, POUCHDB_BATCH_SIZE)) {
const batchResults = await this._eraseDocuments(
batch.map(({ _id, _rev }) => ({ _id, _rev, _deleted: true }))
)
results = results.concat(batchResults)
}
return results
}

const results = await this.db.bulkDocs(docsToErase)
async _eraseDocuments(docs /*: $ReadOnlyArray<PouchRecord> */) {
log.info('Erasing bulk records...', { docs })

const results = await this.db.bulkDocs(docs)
for (let [idx, result] of results.entries()) {
if (result.error) {
const err = new PouchError(result)
Expand All @@ -221,19 +231,26 @@ class Pouch {
// WARNING: bulkDocs is not a transaction, some updates can be applied while
// others do not.
// Make sure lock is acquired before using it to avoid conflict.
async bulkDocs /*:: <T: Metadata|SavedMetadata> */(docs /*: Array<T> */) {
async bulkDocs(
docs /*: $ReadOnlyArray<Metadata|SavedMetadata> */
) /*: Promise<Array<SavedMetadata>> */ {
let results = []
for (const batch of createBatches(docs, POUCHDB_BATCH_SIZE)) {
const batchResults = await this._bulkDocs(batch)
results = results.concat(batchResults)
}

return results
}

// WARNING: _bulkDocs is not a transaction, some updates can be applied while
// others do not.
// Make sure lock is acquired before using it to avoid conflict.
async _bulkDocs(docs /*: $ReadOnlyArray<Metadata|SavedMetadata> */) {
log.info('Saving bulk metadata...', { docs })

for (const doc of docs) {
metadata.invariants(doc)
const { path, _id } = doc
const { local, remote } = doc.sides || {}
log.info('Saving bulk metadata...', {
path,
_id,
local,
remote,
_deleted: doc._deleted,
doc
})
}
const results = await this.db.bulkDocs(docs)
for (let [idx, result] of results.entries()) {
Expand Down Expand Up @@ -680,4 +697,16 @@ const sortByPath = (docA /*: SavedMetadata */, docB /*: SavedMetadata */) => {
return 0
}

const createBatches = /*::<T: Metadata|SavedMetadata> */ (
docs /*: $ReadOnlyArray<T> */,
batchSize /*: number */
) /*: Array<Array<T>> */ => {
let batches /*: Array<Array<T>> */ = []
for (let i = 0; i < docs.length / batchSize; i++) {
const batch = docs.slice(i * batchSize, (i + 1) * batchSize)
batches.push(batch)
}
return batches
}

module.exports = { Pouch, byPathKey, sortByPath }

0 comments on commit 797af62

Please sign in to comment.