Skip to content

Commit

Permalink
Re-writing locking support to provide better safety
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Nov 7, 2024
1 parent 2ad0eda commit 012860e
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 88 deletions.
36 changes: 24 additions & 12 deletions async/src/main/scala/lightdb/async/AsyncCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,32 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un
def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): IO[Doc] =
IO.blocking(underlying(id))

def withLock(id: Id[Doc], doc: IO[Option[Doc]], establishLock: Boolean = true)
(f: Option[Doc] => IO[Option[Doc]]): IO[Option[Doc]] = if (establishLock) {
doc.map(d => if (establishLock) underlying.lock.acquire(id, d) else d).flatMap { existing =>
f(existing)
.attempt
.flatMap {
case Left(err) =>
if (establishLock) underlying.lock.release(id, existing)
IO.raiseError(err)
case Right(modified) =>
if (establishLock) underlying.lock.release(id, modified)
IO.pure(modified)
}
}
} else {
doc.flatMap(f)
}

def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false)
(f: Option[Doc] => IO[Option[Doc]])
(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = {
if (lock) transaction.lock(id)
get(id)
.flatMap(f)
.flatMap {
case Some(doc) => upsert(doc).map(doc => Some(doc))
case None if deleteOnNone => delete(id).map(_ => None)
case None => IO.pure(None)
}
.guarantee(IO {
if (lock) transaction.unlock(id)
})
(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), lock) { existing =>
f(existing).flatMap {
case Some(doc) => upsert(doc).map(doc => Some(doc))
case None if deleteOnNone => delete(id).map(_ => None)
case None => IO.pure(None)
}
}

def getOrCreate(id: Id[Doc], create: => IO[Doc], lock: Boolean = true)
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/lightdb/collection/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import lightdb.trigger.CollectionTriggers
import lightdb.util.Initializable
import lightdb._
import lightdb.field.Field._
import lightdb.lock.LockManager

import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.IteratorHasAsScala
Expand All @@ -20,6 +21,8 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S
loadStore: () => Store[Doc, Model],
maxInsertBatch: Int = 1_000_000,
cacheQueries: Boolean = Collection.DefaultCacheQueries) extends Initializable { collection =>
lazy val lock: LockManager[Id[Doc], Doc] = new LockManager

object trigger extends CollectionTriggers[Doc]

lazy val store: Store[Doc, Model] = loadStore()
Expand Down Expand Up @@ -220,15 +223,17 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S

def list()(implicit transaction: Transaction[Doc]): List[Doc] = iterator.toList

def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false)
def modify(id: Id[Doc],
lock: Boolean = true,
deleteOnNone: Boolean = false)
(f: Option[Doc] => Option[Doc])
(implicit transaction: Transaction[Doc]): Option[Doc] = transaction.mayLock(id, lock) {
f(get(_ => model._id -> id)) match {
(implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(id), lock) { existing =>
f(existing) match {
case Some(doc) =>
upsert(doc)
Some(doc)
case None if deleteOnNone =>
delete(_ => model._id -> id)
delete(id)
None
case None => None
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/lightdb/lock/Lock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package lightdb.lock

import java.util.concurrent.locks.ReentrantLock

class Lock[V](value: => Option[V], val lock: ReentrantLock) {
private lazy val v: Option[V] = value

def apply(): Option[V] = v
}
51 changes: 51 additions & 0 deletions core/src/main/scala/lightdb/lock/LockManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package lightdb.lock

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock

class LockManager[K, V] {
private val locks = new ConcurrentHashMap[K, Lock[V]]()

def apply(key: K, value: => Option[V], establishLock: Boolean = true)
(f: Option[V] => Option[V]): Option[V] = if (establishLock) {
val v = acquire(key, value)
try {
val modified = f(v)
release(key, modified)
} catch {
case t: Throwable =>
release(key, v)
throw t
}
} else {
f(value)
}

// Attempts to acquire a lock for a given K and V.
def acquire(key: K, value: => Option[V]): Option[V] = {
// Get or create the Lock object with the ReentrantLock.
val lock = locks.computeIfAbsent(key, _ => new Lock(value, new ReentrantLock))

// Acquire the underlying ReentrantLock.
lock.lock.lock()
lock() // Return the associated value after acquiring the lock.
}

// Releases the lock for the given K and supplies the latest V.
def release(key: K, newValue: => Option[V]): Option[V] = {
val v: Option[V] = newValue
locks.compute(key, (_, existingLock) => {
// Update the value associated with the lock.
existingLock.lock.unlock()

if (!existingLock.lock.hasQueuedThreads) {
// No other threads are waiting, so remove the lock entry.
null
} else {
// Other threads are waiting, so update the value but keep the lock.
new Lock(v, existingLock.lock)
}
})
v
}
}
72 changes: 0 additions & 72 deletions core/src/main/scala/lightdb/transaction/Transaction.scala
Original file line number Diff line number Diff line change
@@ -1,49 +1,9 @@
package lightdb.transaction

import lightdb.Id
import lightdb.doc.Document
import lightdb.feature.FeatureSupport

import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import scala.concurrent.duration.{DurationInt, FiniteDuration}

final class Transaction[Doc <: Document[Doc]] extends FeatureSupport[TransactionKey] { transaction =>
private var locks = Set.empty[Id[Doc]]

def lock(id: Id[Doc], delay: FiniteDuration = 100.millis): Unit = {
Transaction.lock(id, this, delay)
transaction.synchronized {
locks += id
}
}

def unlock(id: Id[Doc]): Unit = {
Transaction.unlock(id, this)
transaction.synchronized {
locks -= id
}
}

def withLock[Return](id: Id[Doc], delay: FiniteDuration = 100.millis)
(f: => Return): Return = {
lock(id, delay)
try {
f
} finally {
unlock(id)
}
}

def mayLock[Return](id: Id[Doc],
establishLock: Boolean = true,
delay: FiniteDuration = 100.millis)
(f: => Return): Return = if (establishLock) {
withLock(id, delay)(f)
} else {
f
}

def commit(): Unit = {
features.foreach {
case f: TransactionFeature => f.commit()
Expand All @@ -63,37 +23,5 @@ final class Transaction[Doc <: Document[Doc]] extends FeatureSupport[Transaction
case f: TransactionFeature => f.close()
case _ => // Ignore
}
locks.foreach(unlock)
}
}

object Transaction {
private lazy val locks = new ConcurrentHashMap[Id[_], Transaction[_]]

@tailrec
private def lock[Doc <: Document[Doc]](id: Id[Doc],
transaction: Transaction[Doc],
delay: FiniteDuration): Unit = {
val existingTransaction = locks
.compute(id, (_, currentTransaction) => {
if (currentTransaction == null || currentTransaction == transaction) {
transaction
} else {
currentTransaction
}
})
if (existingTransaction != transaction) {
Thread.sleep(delay.toMillis)
lock[Doc](id, transaction, delay)
}
}

private def unlock[Doc <: Document[Doc]](id: Id[Doc], transaction: Transaction[Doc]): Unit = locks
.compute(id, (_, currentTransaction) => {
if (currentTransaction == transaction) {
null
} else {
currentTransaction
}
})
}

0 comments on commit 012860e

Please sign in to comment.