From 012860e10c2af092ee149901f655e6e45f7fd2d2 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Thu, 7 Nov 2024 12:52:23 -0600 Subject: [PATCH] Re-writing locking support to provide better safety --- .../scala/lightdb/async/AsyncCollection.scala | 36 ++++++---- .../scala/lightdb/collection/Collection.scala | 13 ++-- core/src/main/scala/lightdb/lock/Lock.scala | 9 +++ .../main/scala/lightdb/lock/LockManager.scala | 51 +++++++++++++ .../lightdb/transaction/Transaction.scala | 72 ------------------- 5 files changed, 93 insertions(+), 88 deletions(-) create mode 100644 core/src/main/scala/lightdb/lock/Lock.scala create mode 100644 core/src/main/scala/lightdb/lock/LockManager.scala diff --git a/async/src/main/scala/lightdb/async/AsyncCollection.scala b/async/src/main/scala/lightdb/async/AsyncCollection.scala index f8554197..f7b6bcff 100644 --- a/async/src/main/scala/lightdb/async/AsyncCollection.scala +++ b/async/src/main/scala/lightdb/async/AsyncCollection.scala @@ -43,20 +43,32 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): IO[Doc] = IO.blocking(underlying(id)) + def withLock(id: Id[Doc], doc: IO[Option[Doc]], establishLock: Boolean = true) + (f: Option[Doc] => IO[Option[Doc]]): IO[Option[Doc]] = if (establishLock) { + doc.map(d => if (establishLock) underlying.lock.acquire(id, d) else d).flatMap { existing => + f(existing) + .attempt + .flatMap { + case Left(err) => + if (establishLock) underlying.lock.release(id, existing) + IO.raiseError(err) + case Right(modified) => + if (establishLock) underlying.lock.release(id, modified) + IO.pure(modified) + } + } + } else { + doc.flatMap(f) + } + def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false) (f: Option[Doc] => IO[Option[Doc]]) - (implicit transaction: Transaction[Doc]): IO[Option[Doc]] = { - if (lock) transaction.lock(id) - get(id) - .flatMap(f) - .flatMap { - case Some(doc) => upsert(doc).map(doc => Some(doc)) - case None if deleteOnNone => delete(id).map(_ => None) - case None => IO.pure(None) - } - .guarantee(IO { - if (lock) transaction.unlock(id) - }) + (implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), lock) { existing => + f(existing).flatMap { + case Some(doc) => upsert(doc).map(doc => Some(doc)) + case None if deleteOnNone => delete(id).map(_ => None) + case None => IO.pure(None) + } } def getOrCreate(id: Id[Doc], create: => IO[Doc], lock: Boolean = true) diff --git a/core/src/main/scala/lightdb/collection/Collection.scala b/core/src/main/scala/lightdb/collection/Collection.scala index d5d3025c..365582c4 100644 --- a/core/src/main/scala/lightdb/collection/Collection.scala +++ b/core/src/main/scala/lightdb/collection/Collection.scala @@ -11,6 +11,7 @@ import lightdb.trigger.CollectionTriggers import lightdb.util.Initializable import lightdb._ import lightdb.field.Field._ +import lightdb.lock.LockManager import java.util.concurrent.ConcurrentHashMap import scala.jdk.CollectionConverters.IteratorHasAsScala @@ -20,6 +21,8 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S loadStore: () => Store[Doc, Model], maxInsertBatch: Int = 1_000_000, cacheQueries: Boolean = Collection.DefaultCacheQueries) extends Initializable { collection => + lazy val lock: LockManager[Id[Doc], Doc] = new LockManager + object trigger extends CollectionTriggers[Doc] lazy val store: Store[Doc, Model] = loadStore() @@ -220,15 +223,17 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def list()(implicit transaction: Transaction[Doc]): List[Doc] = iterator.toList - def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false) + def modify(id: Id[Doc], + lock: Boolean = true, + deleteOnNone: Boolean = false) (f: Option[Doc] => Option[Doc]) - (implicit transaction: Transaction[Doc]): Option[Doc] = transaction.mayLock(id, lock) { - f(get(_ => model._id -> id)) match { + (implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(id), lock) { existing => + f(existing) match { case Some(doc) => upsert(doc) Some(doc) case None if deleteOnNone => - delete(_ => model._id -> id) + delete(id) None case None => None } diff --git a/core/src/main/scala/lightdb/lock/Lock.scala b/core/src/main/scala/lightdb/lock/Lock.scala new file mode 100644 index 00000000..f79e772f --- /dev/null +++ b/core/src/main/scala/lightdb/lock/Lock.scala @@ -0,0 +1,9 @@ +package lightdb.lock + +import java.util.concurrent.locks.ReentrantLock + +class Lock[V](value: => Option[V], val lock: ReentrantLock) { + private lazy val v: Option[V] = value + + def apply(): Option[V] = v +} \ No newline at end of file diff --git a/core/src/main/scala/lightdb/lock/LockManager.scala b/core/src/main/scala/lightdb/lock/LockManager.scala new file mode 100644 index 00000000..529f6034 --- /dev/null +++ b/core/src/main/scala/lightdb/lock/LockManager.scala @@ -0,0 +1,51 @@ +package lightdb.lock + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock + +class LockManager[K, V] { + private val locks = new ConcurrentHashMap[K, Lock[V]]() + + def apply(key: K, value: => Option[V], establishLock: Boolean = true) + (f: Option[V] => Option[V]): Option[V] = if (establishLock) { + val v = acquire(key, value) + try { + val modified = f(v) + release(key, modified) + } catch { + case t: Throwable => + release(key, v) + throw t + } + } else { + f(value) + } + + // Attempts to acquire a lock for a given K and V. + def acquire(key: K, value: => Option[V]): Option[V] = { + // Get or create the Lock object with the ReentrantLock. + val lock = locks.computeIfAbsent(key, _ => new Lock(value, new ReentrantLock)) + + // Acquire the underlying ReentrantLock. + lock.lock.lock() + lock() // Return the associated value after acquiring the lock. + } + + // Releases the lock for the given K and supplies the latest V. + def release(key: K, newValue: => Option[V]): Option[V] = { + val v: Option[V] = newValue + locks.compute(key, (_, existingLock) => { + // Update the value associated with the lock. + existingLock.lock.unlock() + + if (!existingLock.lock.hasQueuedThreads) { + // No other threads are waiting, so remove the lock entry. + null + } else { + // Other threads are waiting, so update the value but keep the lock. + new Lock(v, existingLock.lock) + } + }) + v + } +} diff --git a/core/src/main/scala/lightdb/transaction/Transaction.scala b/core/src/main/scala/lightdb/transaction/Transaction.scala index 6c3ed944..d3657ec1 100644 --- a/core/src/main/scala/lightdb/transaction/Transaction.scala +++ b/core/src/main/scala/lightdb/transaction/Transaction.scala @@ -1,49 +1,9 @@ package lightdb.transaction -import lightdb.Id import lightdb.doc.Document import lightdb.feature.FeatureSupport -import java.util.concurrent.ConcurrentHashMap -import scala.annotation.tailrec -import scala.concurrent.duration.{DurationInt, FiniteDuration} - final class Transaction[Doc <: Document[Doc]] extends FeatureSupport[TransactionKey] { transaction => - private var locks = Set.empty[Id[Doc]] - - def lock(id: Id[Doc], delay: FiniteDuration = 100.millis): Unit = { - Transaction.lock(id, this, delay) - transaction.synchronized { - locks += id - } - } - - def unlock(id: Id[Doc]): Unit = { - Transaction.unlock(id, this) - transaction.synchronized { - locks -= id - } - } - - def withLock[Return](id: Id[Doc], delay: FiniteDuration = 100.millis) - (f: => Return): Return = { - lock(id, delay) - try { - f - } finally { - unlock(id) - } - } - - def mayLock[Return](id: Id[Doc], - establishLock: Boolean = true, - delay: FiniteDuration = 100.millis) - (f: => Return): Return = if (establishLock) { - withLock(id, delay)(f) - } else { - f - } - def commit(): Unit = { features.foreach { case f: TransactionFeature => f.commit() @@ -63,37 +23,5 @@ final class Transaction[Doc <: Document[Doc]] extends FeatureSupport[Transaction case f: TransactionFeature => f.close() case _ => // Ignore } - locks.foreach(unlock) } -} - -object Transaction { - private lazy val locks = new ConcurrentHashMap[Id[_], Transaction[_]] - - @tailrec - private def lock[Doc <: Document[Doc]](id: Id[Doc], - transaction: Transaction[Doc], - delay: FiniteDuration): Unit = { - val existingTransaction = locks - .compute(id, (_, currentTransaction) => { - if (currentTransaction == null || currentTransaction == transaction) { - transaction - } else { - currentTransaction - } - }) - if (existingTransaction != transaction) { - Thread.sleep(delay.toMillis) - lock[Doc](id, transaction, delay) - } - } - - private def unlock[Doc <: Document[Doc]](id: Id[Doc], transaction: Transaction[Doc]): Unit = locks - .compute(id, (_, currentTransaction) => { - if (currentTransaction == transaction) { - null - } else { - currentTransaction - } - }) } \ No newline at end of file