Skip to content
This repository has been archived by the owner on Sep 30, 2023. It is now read-only.

_addOperation calls are serialized #85

Merged
merged 3 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"orbit-db-io": "~0.2.0",
"p-each-series": "^1.0.0",
"p-map": "^3.0.0",
"p-queue": "^6.2.1",
"readable-stream": "~2.3.5"
},
"standard": {
Expand Down
34 changes: 21 additions & 13 deletions src/Store.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const path = require('path')
const EventEmitter = require('events').EventEmitter
const Readable = require('readable-stream')
const mapSeries = require('p-each-series')
const { default: PQueue } = require('p-queue')
const Log = require('ipfs-log')
const Entry = Log.Entry
const Index = require('./Index')
Expand Down Expand Up @@ -66,6 +67,9 @@ class Store {
// Create the operations log
this._oplog = new Log(this._ipfs, this.identity, { logId: this.id, access: this.access, sortFn: this.options.sortFn })

// _addOperation queue
this._opqueue = new PQueue({ concurrency: 1 })

// Create the index
this._index = new this.options.Index(this.address.root)

Expand Down Expand Up @@ -166,6 +170,8 @@ class Store {
await this.options.onClose(this)
}

await this._opqueue.onEmpty()

// Replicator teardown logic
this._replicator.stop()

Expand Down Expand Up @@ -480,20 +486,22 @@ class Store {
}

async _addOperation (data, { onProgressCallback, pin = false } = {}) {
if (this._oplog) {
// check local cache?
if (this.options.syncLocal) {
await this.syncLocal()
}
return this._opqueue.add(async () => {
if (this._oplog) {
// check local cache?
if (this.options.syncLocal) {
await this.syncLocal()
}

const entry = await this._oplog.append(data, this.options.referenceCount, pin)
this._recalculateReplicationStatus(this.replicationStatus.progress + 1, entry.clock.time)
await this._cache.set(this.localHeadsPath, [entry])
await this._updateIndex()
this.events.emit('write', this.address.toString(), entry, this._oplog.heads)
if (onProgressCallback) onProgressCallback(entry)
return entry.hash
}
const entry = await this._oplog.append(data, this.options.referenceCount, pin)
this._recalculateReplicationStatus(this.replicationStatus.progress + 1, entry.clock.time)
await this._cache.set(this.localHeadsPath, [entry])
await this._updateIndex()
this.events.emit('write', this.address.toString(), entry, this._oplog.heads)
if (onProgressCallback) onProgressCallback(entry)
return entry.hash
}
})
}

_addOperationBatch (data, batchOperation, lastOperation, onProgressCallback) {
Expand Down