Skip to content

Commit

Permalink
fix: Break down large PouchDB.bulkDocs calls (#2362)
Browse files Browse the repository at this point in the history
Large `bulkDocs` calls (i.e. with thousands of documents to save at
once) can be the source of crashes.

We're not sure if this comes from PouchDB or the logger calls with
each document but we've figured that creating smaller batches of
documents, logging them all at once and then saving the batch in
PouchDB prevents crashes.

It should also result in less I/O since we're making a lot less logger
calls.
To accommodate for these logging changes, we update our `jq` `path`
filter so it looks into batches for documents with the expected path.
  • Loading branch information
taratatach authored Dec 10, 2024
1 parent 74abd19 commit 0e5b89b
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 25 deletions.
20 changes: 16 additions & 4 deletions .jq
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,24 @@ def issues: clean | select(is_issue);
def path(pattern):
clean | select(
(
try (.path // (.doc | .path) // (.change | .doc | .path) // (.idConflict | .newDoc | .path) // (.event | .path)),
try (.oldpath // (.was | .path) // (.idConflict | .existingDoc | .path) // (.event | .oldPath)),
try (
.path //
(.doc | .path) //
(.idConflict | .newDoc | .path) //
(.event | .path) //
(.docs | arrays | .[].path) //
(.batch | arrays | .[].path)
),
try (
.oldpath //
(.was | .path) //
(.idConflict | .existingDoc | .path) //
(.event | .oldPath)
),
""
)
| strings
| test(pattern)
| strings
| test(pattern)
);

def shortPath:
Expand Down
71 changes: 51 additions & 20 deletions core/pouch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const log = logger({
component: 'Pouch'
})

const POUCHDB_BATCH_SIZE = 1000

// Pouchdb is used to store all the metadata about files and folders.
// These metadata can come from the local filesystem or the remote cozy instance.
//
Expand Down Expand Up @@ -193,15 +195,23 @@ class Pouch {
// This method will completely erase an array of records from PouchDB while
// removing all their attributes.
// This method also does not care about invariants like `bulkDocs()` does.
async eraseDocuments(docs /*: Array<SavedMetadata> */) {
const docsToErase = []
docs.forEach(doc => {
const { path, _id, _rev } = _.clone(doc)
log.info('Erasing bulk record...', { path, _id, _rev })
docsToErase.push({ _id, _rev, _deleted: true })
})
async eraseDocuments(
docs /*: $ReadOnlyArray<SavedMetadata> */
) /*: Promise<Array<PouchRecord>> */ {
let results = []
for (const batch of createBatches(docs, POUCHDB_BATCH_SIZE)) {
const batchResults = await this._eraseDocuments(
batch.map(({ _id, _rev }) => ({ _id, _rev, _deleted: true }))
)
results = results.concat(batchResults)
}
return results
}

const results = await this.db.bulkDocs(docsToErase)
async _eraseDocuments(docs /*: $ReadOnlyArray<PouchRecord> */) {
log.info('Erasing bulk records...', { docs })

const results = await this.db.bulkDocs(docs)
for (let [idx, result] of results.entries()) {
if (result.error) {
const err = new PouchError(result)
Expand All @@ -221,19 +231,26 @@ class Pouch {
// WARNING: bulkDocs is not a transaction, some updates can be applied while
// others do not.
// Make sure lock is acquired before using it to avoid conflict.
async bulkDocs /*:: <T: Metadata|SavedMetadata> */(docs /*: Array<T> */) {
async bulkDocs(
docs /*: $ReadOnlyArray<Metadata|SavedMetadata> */
) /*: Promise<Array<SavedMetadata>> */ {
let results = []
for (const batch of createBatches(docs, POUCHDB_BATCH_SIZE)) {
const batchResults = await this._bulkDocs(batch)
results = results.concat(batchResults)
}

return results
}

// WARNING: _bulkDocs is not a transaction, some updates can be applied while
// others do not.
// Make sure lock is acquired before using it to avoid conflict.
async _bulkDocs(docs /*: $ReadOnlyArray<Metadata|SavedMetadata> */) {
log.info('Saving bulk metadata...', { docs })

for (const doc of docs) {
metadata.invariants(doc)
const { path, _id } = doc
const { local, remote } = doc.sides || {}
log.info('Saving bulk metadata...', {
path,
_id,
local,
remote,
_deleted: doc._deleted,
doc
})
}
const results = await this.db.bulkDocs(docs)
for (let [idx, result] of results.entries()) {
Expand Down Expand Up @@ -680,4 +697,18 @@ const sortByPath = (docA /*: SavedMetadata */, docB /*: SavedMetadata */) => {
return 0
}

module.exports = { Pouch, byPathKey, sortByPath }
const createBatches = /*::<T: Metadata|SavedMetadata> */ (
docs /*: $ReadOnlyArray<T> */,
batchSize /*: number */
) /*: Array<Array<T>> */ => {
if (batchSize <= 0) return [[...docs]]

let batches /*: Array<Array<T>> */ = []
for (let i = 0; i < docs.length / batchSize; i++) {
const batch = docs.slice(i * batchSize, (i + 1) * batchSize)
batches.push(batch)
}
return batches
}

module.exports = { Pouch, byPathKey, sortByPath, createBatches }
50 changes: 49 additions & 1 deletion test/unit/pouch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const _ = require('lodash')
const { REV_CONFLICT } = require('pouchdb')

const metadata = require('../../../core/metadata')
const { sortByPath } = require('../../../core/pouch')
const { sortByPath, createBatches } = require('../../../core/pouch')

const Builders = require('../../support/builders')
const configHelpers = require('../../support/helpers/config')
Expand Down Expand Up @@ -726,3 +726,51 @@ describe('Pouch', function () {
})
})
})

describe('createBatches', () => {
it('creates batches of at most the given size from the given documents', () => {
const builders = new Builders()

const docs = [
builders.metafile().build(),
builders.metafile().build(),
builders.metafile().build(),
builders.metafile().build(),
builders.metafile().build()
]

should(createBatches(docs, 1)).deepEqual([
[docs[0]],
[docs[1]],
[docs[2]],
[docs[3]],
[docs[4]]
])

should(createBatches(docs, 2)).deepEqual([
[docs[0], docs[1]],
[docs[2], docs[3]],
[docs[4]]
])

should(createBatches(docs, 3)).deepEqual([
[docs[0], docs[1], docs[2]],
[docs[3], docs[4]]
])

should(createBatches(docs, 5)).deepEqual([
[docs[0], docs[1], docs[2], docs[3], docs[4]]
])

should(createBatches(docs, 6)).deepEqual([
[docs[0], docs[1], docs[2], docs[3], docs[4]]
])

should(createBatches(docs, 0)).deepEqual([
[docs[0], docs[1], docs[2], docs[3], docs[4]]
])
should(createBatches(docs, -1)).deepEqual([
[docs[0], docs[1], docs[2], docs[3], docs[4]]
])
})
})

0 comments on commit 0e5b89b

Please sign in to comment.