Skip to content
This repository has been archived by the owner on Sep 30, 2023. It is now read-only.

Commit

Permalink
Merge pull request #85 from tabcat/queue_op
Browse files Browse the repository at this point in the history
_addOperation calls are serialized
  • Loading branch information
aphelionz authored Apr 3, 2020
2 parents 6f83802 + 4e3c306 commit 4baba75
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
8 changes: 2 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"orbit-db-io": "~0.2.0",
"p-each-series": "^1.0.0",
"p-map": "^3.0.0",
"p-queue": "^6.2.1",
"readable-stream": "~2.3.5"
},
"standard": {
Expand Down
33 changes: 21 additions & 12 deletions src/Store.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const EventEmitter = require('events').EventEmitter
const Readable = require('readable-stream')
const toStream = require('it-to-stream')
const mapSeries = require('p-each-series')
const { default: PQueue } = require('p-queue')
const Log = require('ipfs-log')
const Entry = Log.Entry
const Index = require('./Index')
Expand Down Expand Up @@ -67,6 +68,9 @@ class Store {
// Create the operations log
this._oplog = new Log(this._ipfs, this.identity, { logId: this.id, access: this.access, sortFn: this.options.sortFn })

// _addOperation queue
this._opqueue = new PQueue({ concurrency: 1 })

// Create the index
this._index = new this.options.Index(this.address.root)

Expand Down Expand Up @@ -167,6 +171,8 @@ class Store {
await this.options.onClose(this)
}

await this._opqueue.onEmpty()

// Replicator teardown logic
this._replicator.stop()

Expand Down Expand Up @@ -500,20 +506,23 @@ class Store {
}

async _addOperation (data, { onProgressCallback, pin = false } = {}) {
if (this._oplog) {
// check local cache?
if (this.options.syncLocal) {
await this.syncLocal()
}
async function addOperation () {
if (this._oplog) {
// check local cache?
if (this.options.syncLocal) {
await this.syncLocal()
}

const entry = await this._oplog.append(data, this.options.referenceCount, pin)
this._recalculateReplicationStatus(this.replicationStatus.progress + 1, entry.clock.time)
await this._cache.set(this.localHeadsPath, [entry])
await this._updateIndex()
this.events.emit('write', this.address.toString(), entry, this._oplog.heads)
if (onProgressCallback) onProgressCallback(entry)
return entry.hash
const entry = await this._oplog.append(data, this.options.referenceCount, pin)
this._recalculateReplicationStatus(this.replicationStatus.progress + 1, entry.clock.time)
await this._cache.set(this.localHeadsPath, [entry])
await this._updateIndex()
this.events.emit('write', this.address.toString(), entry, this._oplog.heads)
if (onProgressCallback) onProgressCallback(entry)
return entry.hash
}
}
return this._opqueue.add(addOperation.bind(this))
}

_addOperationBatch (data, batchOperation, lastOperation, onProgressCallback) {
Expand Down

0 comments on commit 4baba75

Please sign in to comment.