Skip to content

Commit

Permalink
Complete re-write of locking mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Nov 9, 2024
1 parent 1e6ea1e commit 66c4c10
Show file tree
Hide file tree
Showing 22 changed files with 93 additions and 46 deletions.
6 changes: 3 additions & 3 deletions async/src/main/scala/lightdb/async/AsyncCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un
doc.flatMap(f)
}

def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false)
def modify(id: Id[Doc], establishLock: Boolean = true, deleteOnNone: Boolean = false)
(f: Option[Doc] => IO[Option[Doc]])
(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), lock) { existing =>
(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), establishLock) { existing =>
f(existing).flatMap {
case Some(doc) => upsert(doc).map(doc => Some(doc))
case None if deleteOnNone => delete(id).map(_ => None)
Expand All @@ -72,7 +72,7 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un
}

def getOrCreate(id: Id[Doc], create: => IO[Doc], lock: Boolean = true)
(implicit transaction: Transaction[Doc]): IO[Doc] = modify(id, lock = lock) {
(implicit transaction: Transaction[Doc]): IO[Doc] = modify(id, establishLock = lock) {
case Some(doc) => IO.pure(Some(doc))
case None => create.map(Some.apply)
}.map(_.get)
Expand Down
38 changes: 33 additions & 5 deletions async/src/main/scala/lightdb/async/AsyncQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,41 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo
apply(Conversion.Distance(f(collection.model), from, sort, radius))
}

def process(establishLock: Boolean = true)
(f: Doc => IO[Doc])
/**
* Processes through each result record from the query modifying the data in the database.
*
* @param establishLock whether to establish an id lock to avoid concurrent modification (defaults to true)
* @param deleteOnNone whether to delete the record if the function returns None (defaults to true)
* @param safeModify whether to use safe modification. This results in loading the same object twice, but should never
* risk concurrent modification occurring. (defaults to true)
* @param maxConcurrent the number of concurrent threads to process with (defaults to 1 for single-threaded)
* @param f the processing function for records
*/
def process(establishLock: Boolean = true,
deleteOnNone: Boolean = true,
safeModify: Boolean = true,
maxConcurrent: Int = 1)
(f: Doc => IO[Option[Doc]])
(implicit transaction: Transaction[Doc]): IO[Int] = stream
.docs
.evalMap { doc =>
asyncCollection.withLock(doc._id, IO.pure(Some(doc)), establishLock) { current =>
f(current.getOrElse(doc)).map(Some.apply)
.parEvalMap(maxConcurrent) { doc =>
if (safeModify) {
asyncCollection.modify(doc._id, establishLock, deleteOnNone) {
case Some(doc) => f(doc)
case None => IO.pure(None)
}
} else {
asyncCollection.withLock(doc._id, IO.pure(Some(doc)), establishLock) { current =>
val io = current match {
case Some(doc) => f(doc)
case None => IO.pure(None)
}
io.flatTap {
case Some(modified) if !current.contains(modified) => asyncCollection.upsert(modified)
case None if deleteOnNone => asyncCollection.delete(doc._id)
case _ => IO.unit
}
}
}
}
.compile
Expand Down
Binary file modified benchmark/charts/Get Each Record Multi-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Get Each Record-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Insert Records-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Search All Records Multi-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Search All Records-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Search Each Record Multi-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Search Each Record-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Stream Records Multi-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified benchmark/charts/Stream Records-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions benchmark/src/main/scala/benchmark/bench/Runner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import lightdb.h2.H2Store
import lightdb.halodb.HaloDBStore
import lightdb.lucene.LuceneStore
import lightdb.postgresql.PostgreSQLStoreManager
import lightdb.rocksdb.RocksDBStore
import lightdb.sql.SQLiteStore
import lightdb.sql.connect.{HikariConnectionManager, SQLConfig}
import lightdb.store.{MapStore, StoreMode}
Expand All @@ -29,6 +30,7 @@ object Runner {
"LightDB-HaloDB-SQLite" -> LightDBBench(SplitStoreManager(HaloDBStore, SQLiteStore)),
"LightDB-Lucene" -> LightDBBench(LuceneStore),
"LightDB-HaloDB-Lucene" -> LightDBBench(SplitStoreManager(HaloDBStore, LuceneStore, searchingMode = StoreMode.Indexes)),
"LightDB-RocksDB-Lucene" -> LightDBBench(SplitStoreManager(RocksDBStore, LuceneStore, searchingMode = StoreMode.Indexes)),
"LightDB-H2" -> LightDBBench(H2Store),
"LightDB-HaloDB-H2" -> LightDBBench(SplitStoreManager(HaloDBStore, H2Store, searchingMode = StoreMode.Indexes)),
// "LightDB-PostgreSQL" -> LightDBBench(PostgreSQLStoreManager(HikariConnectionManager(SQLConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.sql.ResultSet
import scala.language.implicitConversions

case class LightDBBench(storeManager: StoreManager) extends Bench { bench =>
override def name: String = s"LightDB ${storeManager.getClass.getSimpleName.replace("$", "")}"
override def name: String = s"LightDB ${storeManager.name}"

override def init(): Unit = DB.init()

Expand Down
12 changes: 2 additions & 10 deletions benchmark/src/main/scala/benchmark/bench/impl/MongoDBBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ object MongoDBBench extends Bench {

override protected def getEachRecord(idIterator: Iterator[String]): Unit = {
idIterator.foreach { id =>
val list = people.find(Filters.eq("id", id)).iterator().asScala.toList
val document = list.head
val document = people.find(Filters.eq("id", id)).first()
val p = P(
id = document.getString("id"),
name = document.getString("name"),
Expand All @@ -56,16 +55,12 @@ object MongoDBBench extends Bench {
if (p.id != id) {
scribe.warn(s"${p.id} was not $id")
}
if (list.size > 1) {
scribe.warn(s"More than one result for $id")
}
}
}

override protected def searchEachRecord(ageIterator: Iterator[Int]): Unit = {
ageIterator.foreach { age =>
val list = people.find(Filters.eq("age", age)).iterator().asScala.toList
val document = list.head
val document = people.find(Filters.eq("age", age)).first()
val p = P(
id = document.getString("id"),
name = document.getString("name"),
Expand All @@ -74,9 +69,6 @@ object MongoDBBench extends Bench {
if (p.age != age) {
scribe.warn(s"${p.age} was not $age")
}
if (list.size > 1) {
scribe.warn(s"More than one result for $age")
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ val developerURL: String = "https://matthicks.com"

name := projectName
ThisBuild / organization := org
ThisBuild / version := "0.14.6-SNAPSHOT2"
ThisBuild / version := "0.15.0-SNAPSHOT"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := allScalaVersions
ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation")
Expand Down
45 changes: 32 additions & 13 deletions core/src/main/scala/lightdb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
countTotal: Boolean = false,
scoreDocs: Boolean = false,
minDocScore: Option[Double] = None,
facets: List[FacetQuery[Doc]] = Nil) { query =>
facets: List[FacetQuery[Doc]] = Nil) {
query =>
def scored: Query[Doc, Model] = copy(scoreDocs = true)

def minDocScore(min: Double): Query[Doc, Model] = copy(
Expand Down Expand Up @@ -124,10 +125,10 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
}

def distance[G <: Geo](f: Model => Field[Doc, List[G]],
from: Geo.Point,
sort: Boolean = true,
radius: Option[Distance] = None)
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = {
from: Geo.Point,
sort: Boolean = true,
radius: Option[Distance] = None)
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = {
val field = f(collection.model)
var q = Query.this
if (sort) {
Expand All @@ -140,16 +141,34 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
}
}

def process(establishLock: Boolean = true)
(f: Doc => Doc)
(implicit transaction: Transaction[Doc]): Unit = if (establishLock) {
search.docs.iterator.foreach { doc =>
/**
* Processes through each result record from the query modifying the data in the database.
*
* @param establishLock whether to establish an id lock to avoid concurrent modification (defaults to true)
* @param deleteOnNone whether to delete the record if the function returns None (defaults to true)
* @param safeModify whether to use safe modification. This results in loading the same object twice, but should never
* risk concurrent modification occurring. (defaults to true)
* @param f the processing function for records
*/
def process(establishLock: Boolean = true,
deleteOnNone: Boolean = true,
safeModify: Boolean = true)
(f: Doc => Option[Doc])
(implicit transaction: Transaction[Doc]): Unit = search.docs.iterator.foreach { doc =>
if (safeModify) {
collection.modify(doc._id, establishLock, deleteOnNone) { existing =>
existing.flatMap(f)
}
} else {
collection.lock(doc._id, Some(doc)) { current =>
Some(f(current.getOrElse(doc)))
val result = f(current.getOrElse(doc))
result match {
case Some(modified) => if (!current.contains(modified)) collection.upsert(modified)
case None => if (deleteOnNone) collection.delete(doc._id)
}
result
}
}
} else {
search.docs.iterator.foreach(f)
}

def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = search.docs.iterator
Expand All @@ -167,7 +186,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
from: Geo.Point,
sort: Boolean,
radius: Option[Distance])
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = {
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = {
search(Conversion.Distance(field, from, sort, radius))
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/lightdb/collection/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S
def list()(implicit transaction: Transaction[Doc]): List[Doc] = iterator.toList

def modify(id: Id[Doc],
lock: Boolean = true,
establishLock: Boolean = true,
deleteOnNone: Boolean = false)
(f: Option[Doc] => Option[Doc])
(implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(id), lock) { existing =>
(implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(id), establishLock) { existing =>
f(existing) match {
case Some(doc) =>
upsert(doc)
Expand All @@ -239,8 +239,8 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S
}
}

def getOrCreate(id: Id[Doc], create: => Doc, lock: Boolean = true)
(implicit transaction: Transaction[Doc]): Doc = modify(id, lock = lock) {
def getOrCreate(id: Id[Doc], create: => Doc, establishLock: Boolean = true)
(implicit transaction: Transaction[Doc]): Doc = modify(id, establishLock = establishLock) {
case Some(doc) => Some(doc)
case None => Some(create)
}.get
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/lightdb/lock/Lock.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package lightdb.lock

import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.Semaphore

class Lock[V](value: => Option[V], val lock: ReentrantLock) {
class Lock[V](value: => Option[V], val lock: Semaphore = new Semaphore(1, true)) {
private lazy val v: Option[V] = value

def apply(): Option[V] = v
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/lightdb/lock/LockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ class LockManager[K, V] {
// Attempts to acquire a lock for a given K and V.
def acquire(key: K, value: => Option[V]): Option[V] = {
// Get or create the Lock object with the ReentrantLock.
val lock = locks.computeIfAbsent(key, _ => new Lock(value, new ReentrantLock))
val lock = locks.computeIfAbsent(key, _ => new Lock(value))

// Acquire the underlying ReentrantLock.
lock.lock.lock()
lock.lock.acquire()
lock() // Return the associated value after acquiring the lock.
}

Expand All @@ -36,9 +36,9 @@ class LockManager[K, V] {
val v: Option[V] = newValue
locks.compute(key, (_, existingLock) => {
// Update the value associated with the lock.
existingLock.lock.unlock()
existingLock.lock.release()

if (!existingLock.lock.hasQueuedThreads) {
if (existingLock.lock.availablePermits() > 0) {
// No other threads are waiting, so remove the lock entry.
null
} else {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/lightdb/store/StoreManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import lightdb.LightDB
import lightdb.doc.{Document, DocumentModel}

trait StoreManager {
lazy val name: String = getClass.getSimpleName.replace("$", "")

def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB,
name: String,
storeMode: StoreMode): Store[Doc, Model]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import lightdb.store.{Store, StoreManager, StoreMode}
case class SplitStoreManager(storage: StoreManager,
searching: StoreManager,
searchingMode: StoreMode = StoreMode.All) extends StoreManager {
override lazy val name: String = s"Split($storage, $searching)"

override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB,
name: String,
storeMode: StoreMode): Store[Doc, Model] = SplitStore(
name: String,
storeMode: StoreMode): Store[Doc, Model] = SplitStore(
storage = storage.create[Doc, Model](db, name, StoreMode.All),
searching = searching.create[Doc, Model](db, name, searchingMode),
storeMode = storeMode
Expand Down
4 changes: 3 additions & 1 deletion run_benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ set -e

#declare -a arr=("SQLite" "H2" "Derby" "LightDB-SQLite" "LightDB-Map-SQLite" "LightDB-HaloDB-SQLite" "LightDB-Lucene" "LightDB-HaloDB-Lucene" "LightDB-H2" "LightDB-HaloDB-H2")
#declare -a arr=("LightDB-SQLite" "LightDB-Map-SQLite" "LightDB-HaloDB-SQLite" "LightDB-Lucene" "LightDB-HaloDB-Lucene" "LightDB-H2" "LightDB-HaloDB-H2")
declare -a arr=("LightDB-Lucene" "LightDB-HaloDB-Lucene")
#declare -a arr=("LightDB-Lucene" "LightDB-HaloDB-Lucene")
#declare -a arr=("SQLite" "PostgreSQL" "H2" "Derby" "MongoDB" "LightDB-SQLite" "LightDB-Map-SQLite" "LightDB-HaloDB-SQLite" "LightDB-Lucene" "LightDB-HaloDB-Lucene" "LightDB-RocksDB-Lucene" "LightDB-H2" "LightDB-HaloDB-H2")
declare -a arr=("LightDB-H2" "LightDB-HaloDB-H2")

for i in "${arr[@]}"
do
Expand Down

0 comments on commit 66c4c10

Please sign in to comment.