diff --git a/async/src/main/scala/lightdb/async/AsyncCollection.scala b/async/src/main/scala/lightdb/async/AsyncCollection.scala index a908d13f..0ea31b2f 100644 --- a/async/src/main/scala/lightdb/async/AsyncCollection.scala +++ b/async/src/main/scala/lightdb/async/AsyncCollection.scala @@ -61,9 +61,9 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un doc.flatMap(f) } - def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false) + def modify(id: Id[Doc], establishLock: Boolean = true, deleteOnNone: Boolean = false) (f: Option[Doc] => IO[Option[Doc]]) - (implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), lock) { existing => + (implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), establishLock) { existing => f(existing).flatMap { case Some(doc) => upsert(doc).map(doc => Some(doc)) case None if deleteOnNone => delete(id).map(_ => None) @@ -72,7 +72,7 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un } def getOrCreate(id: Id[Doc], create: => IO[Doc], lock: Boolean = true) - (implicit transaction: Transaction[Doc]): IO[Doc] = modify(id, lock = lock) { + (implicit transaction: Transaction[Doc]): IO[Doc] = modify(id, establishLock = lock) { case Some(doc) => IO.pure(Some(doc)) case None => create.map(Some.apply) }.map(_.get) diff --git a/async/src/main/scala/lightdb/async/AsyncQuery.scala b/async/src/main/scala/lightdb/async/AsyncQuery.scala index efb16580..d46bf5da 100644 --- a/async/src/main/scala/lightdb/async/AsyncQuery.scala +++ b/async/src/main/scala/lightdb/async/AsyncQuery.scala @@ -219,13 +219,41 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo apply(Conversion.Distance(f(collection.model), from, sort, radius)) } - def process(establishLock: Boolean = true) - (f: Doc => IO[Doc]) + /** + * Processes through each result record from the query modifying the data in the database. + * + * @param establishLock whether to establish an id lock to avoid concurrent modification (defaults to true) + * @param deleteOnNone whether to delete the record if the function returns None (defaults to true) + * @param safeModify whether to use safe modification. This results in loading the same object twice, but should never + * risk concurrent modification occurring. (defaults to true) + * @param maxConcurrent the number of concurrent threads to process with (defaults to 1 for single-threaded) + * @param f the processing function for records + */ + def process(establishLock: Boolean = true, + deleteOnNone: Boolean = true, + safeModify: Boolean = true, + maxConcurrent: Int = 1) + (f: Doc => IO[Option[Doc]]) (implicit transaction: Transaction[Doc]): IO[Int] = stream .docs - .evalMap { doc => - asyncCollection.withLock(doc._id, IO.pure(Some(doc)), establishLock) { current => - f(current.getOrElse(doc)).map(Some.apply) + .parEvalMap(maxConcurrent) { doc => + if (safeModify) { + asyncCollection.modify(doc._id, establishLock, deleteOnNone) { + case Some(doc) => f(doc) + case None => IO.pure(None) + } + } else { + asyncCollection.withLock(doc._id, IO.pure(Some(doc)), establishLock) { current => + val io = current match { + case Some(doc) => f(doc) + case None => IO.pure(None) + } + io.flatTap { + case Some(modified) if !current.contains(modified) => asyncCollection.upsert(modified) + case None if deleteOnNone => asyncCollection.delete(doc._id) + case _ => IO.unit + } + } } } .compile diff --git a/benchmark/charts/Get Each Record Multi-total-time.png b/benchmark/charts/Get Each Record Multi-total-time.png index 90c01d9a..c40b8def 100644 Binary files a/benchmark/charts/Get Each Record Multi-total-time.png and b/benchmark/charts/Get Each Record Multi-total-time.png differ diff --git a/benchmark/charts/Get Each Record-total-time.png b/benchmark/charts/Get Each Record-total-time.png index b6d04887..067973c0 100644 Binary files a/benchmark/charts/Get Each Record-total-time.png and b/benchmark/charts/Get Each Record-total-time.png differ diff --git a/benchmark/charts/Insert Records-total-time.png b/benchmark/charts/Insert Records-total-time.png index d818bf94..d6fa7aef 100644 Binary files a/benchmark/charts/Insert Records-total-time.png and b/benchmark/charts/Insert Records-total-time.png differ diff --git a/benchmark/charts/Search All Records Multi-total-time.png b/benchmark/charts/Search All Records Multi-total-time.png index 907944b0..3f4c08f8 100644 Binary files a/benchmark/charts/Search All Records Multi-total-time.png and b/benchmark/charts/Search All Records Multi-total-time.png differ diff --git a/benchmark/charts/Search All Records-total-time.png b/benchmark/charts/Search All Records-total-time.png index 8c9ef534..0571e76b 100644 Binary files a/benchmark/charts/Search All Records-total-time.png and b/benchmark/charts/Search All Records-total-time.png differ diff --git a/benchmark/charts/Search Each Record Multi-total-time.png b/benchmark/charts/Search Each Record Multi-total-time.png index 4c695a3a..8342dfae 100644 Binary files a/benchmark/charts/Search Each Record Multi-total-time.png and b/benchmark/charts/Search Each Record Multi-total-time.png differ diff --git a/benchmark/charts/Search Each Record-total-time.png b/benchmark/charts/Search Each Record-total-time.png index f1bac625..b5fb880a 100644 Binary files a/benchmark/charts/Search Each Record-total-time.png and b/benchmark/charts/Search Each Record-total-time.png differ diff --git a/benchmark/charts/Stream Records Multi-total-time.png b/benchmark/charts/Stream Records Multi-total-time.png index 3357cd60..117c63ed 100644 Binary files a/benchmark/charts/Stream Records Multi-total-time.png and b/benchmark/charts/Stream Records Multi-total-time.png differ diff --git a/benchmark/charts/Stream Records-total-time.png b/benchmark/charts/Stream Records-total-time.png index 11306cca..2b85de5a 100644 Binary files a/benchmark/charts/Stream Records-total-time.png and b/benchmark/charts/Stream Records-total-time.png differ diff --git a/benchmark/src/main/scala/benchmark/bench/Runner.scala b/benchmark/src/main/scala/benchmark/bench/Runner.scala index ebf89cb8..4a632eb9 100644 --- a/benchmark/src/main/scala/benchmark/bench/Runner.scala +++ b/benchmark/src/main/scala/benchmark/bench/Runner.scala @@ -7,6 +7,7 @@ import lightdb.h2.H2Store import lightdb.halodb.HaloDBStore import lightdb.lucene.LuceneStore import lightdb.postgresql.PostgreSQLStoreManager +import lightdb.rocksdb.RocksDBStore import lightdb.sql.SQLiteStore import lightdb.sql.connect.{HikariConnectionManager, SQLConfig} import lightdb.store.{MapStore, StoreMode} @@ -29,6 +30,7 @@ object Runner { "LightDB-HaloDB-SQLite" -> LightDBBench(SplitStoreManager(HaloDBStore, SQLiteStore)), "LightDB-Lucene" -> LightDBBench(LuceneStore), "LightDB-HaloDB-Lucene" -> LightDBBench(SplitStoreManager(HaloDBStore, LuceneStore, searchingMode = StoreMode.Indexes)), + "LightDB-RocksDB-Lucene" -> LightDBBench(SplitStoreManager(RocksDBStore, LuceneStore, searchingMode = StoreMode.Indexes)), "LightDB-H2" -> LightDBBench(H2Store), "LightDB-HaloDB-H2" -> LightDBBench(SplitStoreManager(HaloDBStore, H2Store, searchingMode = StoreMode.Indexes)), // "LightDB-PostgreSQL" -> LightDBBench(PostgreSQLStoreManager(HikariConnectionManager(SQLConfig( diff --git a/benchmark/src/main/scala/benchmark/bench/impl/LightDBBench.scala b/benchmark/src/main/scala/benchmark/bench/impl/LightDBBench.scala index 71806638..f32b17a5 100644 --- a/benchmark/src/main/scala/benchmark/bench/impl/LightDBBench.scala +++ b/benchmark/src/main/scala/benchmark/bench/impl/LightDBBench.scala @@ -15,7 +15,7 @@ import java.sql.ResultSet import scala.language.implicitConversions case class LightDBBench(storeManager: StoreManager) extends Bench { bench => - override def name: String = s"LightDB ${storeManager.getClass.getSimpleName.replace("$", "")}" + override def name: String = s"LightDB ${storeManager.name}" override def init(): Unit = DB.init() diff --git a/benchmark/src/main/scala/benchmark/bench/impl/MongoDBBench.scala b/benchmark/src/main/scala/benchmark/bench/impl/MongoDBBench.scala index a42fbb9f..cc19d35b 100644 --- a/benchmark/src/main/scala/benchmark/bench/impl/MongoDBBench.scala +++ b/benchmark/src/main/scala/benchmark/bench/impl/MongoDBBench.scala @@ -46,8 +46,7 @@ object MongoDBBench extends Bench { override protected def getEachRecord(idIterator: Iterator[String]): Unit = { idIterator.foreach { id => - val list = people.find(Filters.eq("id", id)).iterator().asScala.toList - val document = list.head + val document = people.find(Filters.eq("id", id)).first() val p = P( id = document.getString("id"), name = document.getString("name"), @@ -56,16 +55,12 @@ object MongoDBBench extends Bench { if (p.id != id) { scribe.warn(s"${p.id} was not $id") } - if (list.size > 1) { - scribe.warn(s"More than one result for $id") - } } } override protected def searchEachRecord(ageIterator: Iterator[Int]): Unit = { ageIterator.foreach { age => - val list = people.find(Filters.eq("age", age)).iterator().asScala.toList - val document = list.head + val document = people.find(Filters.eq("age", age)).first() val p = P( id = document.getString("id"), name = document.getString("name"), @@ -74,9 +69,6 @@ object MongoDBBench extends Bench { if (p.age != age) { scribe.warn(s"${p.age} was not $age") } - if (list.size > 1) { - scribe.warn(s"More than one result for $age") - } } } diff --git a/build.sbt b/build.sbt index 6aa057fc..cc38ca08 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,7 @@ val developerURL: String = "https://matthicks.com" name := projectName ThisBuild / organization := org -ThisBuild / version := "0.14.6-SNAPSHOT2" +ThisBuild / version := "0.15.0-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := allScalaVersions ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation") diff --git a/core/src/main/scala/lightdb/Query.scala b/core/src/main/scala/lightdb/Query.scala index 0ff60c8d..4caae674 100644 --- a/core/src/main/scala/lightdb/Query.scala +++ b/core/src/main/scala/lightdb/Query.scala @@ -24,7 +24,8 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: countTotal: Boolean = false, scoreDocs: Boolean = false, minDocScore: Option[Double] = None, - facets: List[FacetQuery[Doc]] = Nil) { query => + facets: List[FacetQuery[Doc]] = Nil) { + query => def scored: Query[Doc, Model] = copy(scoreDocs = true) def minDocScore(min: Double): Query[Doc, Model] = copy( @@ -124,10 +125,10 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: } def distance[G <: Geo](f: Model => Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean = true, - radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = { + from: Geo.Point, + sort: Boolean = true, + radius: Option[Distance] = None) + (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = { val field = f(collection.model) var q = Query.this if (sort) { @@ -140,16 +141,34 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: } } - def process(establishLock: Boolean = true) - (f: Doc => Doc) - (implicit transaction: Transaction[Doc]): Unit = if (establishLock) { - search.docs.iterator.foreach { doc => + /** + * Processes through each result record from the query modifying the data in the database. + * + * @param establishLock whether to establish an id lock to avoid concurrent modification (defaults to true) + * @param deleteOnNone whether to delete the record if the function returns None (defaults to true) + * @param safeModify whether to use safe modification. This results in loading the same object twice, but should never + * risk concurrent modification occurring. (defaults to true) + * @param f the processing function for records + */ + def process(establishLock: Boolean = true, + deleteOnNone: Boolean = true, + safeModify: Boolean = true) + (f: Doc => Option[Doc]) + (implicit transaction: Transaction[Doc]): Unit = search.docs.iterator.foreach { doc => + if (safeModify) { + collection.modify(doc._id, establishLock, deleteOnNone) { existing => + existing.flatMap(f) + } + } else { collection.lock(doc._id, Some(doc)) { current => - Some(f(current.getOrElse(doc))) + val result = f(current.getOrElse(doc)) + result match { + case Some(modified) => if (!current.contains(modified)) collection.upsert(modified) + case None => if (deleteOnNone) collection.delete(doc._id) + } + result } } - } else { - search.docs.iterator.foreach(f) } def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = search.docs.iterator @@ -167,7 +186,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: from: Geo.Point, sort: Boolean, radius: Option[Distance]) - (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = { + (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = { search(Conversion.Distance(field, from, sort, radius)) } diff --git a/core/src/main/scala/lightdb/collection/Collection.scala b/core/src/main/scala/lightdb/collection/Collection.scala index 365582c4..e68075a5 100644 --- a/core/src/main/scala/lightdb/collection/Collection.scala +++ b/core/src/main/scala/lightdb/collection/Collection.scala @@ -224,10 +224,10 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def list()(implicit transaction: Transaction[Doc]): List[Doc] = iterator.toList def modify(id: Id[Doc], - lock: Boolean = true, + establishLock: Boolean = true, deleteOnNone: Boolean = false) (f: Option[Doc] => Option[Doc]) - (implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(id), lock) { existing => + (implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(id), establishLock) { existing => f(existing) match { case Some(doc) => upsert(doc) @@ -239,8 +239,8 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S } } - def getOrCreate(id: Id[Doc], create: => Doc, lock: Boolean = true) - (implicit transaction: Transaction[Doc]): Doc = modify(id, lock = lock) { + def getOrCreate(id: Id[Doc], create: => Doc, establishLock: Boolean = true) + (implicit transaction: Transaction[Doc]): Doc = modify(id, establishLock = establishLock) { case Some(doc) => Some(doc) case None => Some(create) }.get diff --git a/core/src/main/scala/lightdb/lock/Lock.scala b/core/src/main/scala/lightdb/lock/Lock.scala index f79e772f..c015e180 100644 --- a/core/src/main/scala/lightdb/lock/Lock.scala +++ b/core/src/main/scala/lightdb/lock/Lock.scala @@ -1,8 +1,8 @@ package lightdb.lock -import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.Semaphore -class Lock[V](value: => Option[V], val lock: ReentrantLock) { +class Lock[V](value: => Option[V], val lock: Semaphore = new Semaphore(1, true)) { private lazy val v: Option[V] = value def apply(): Option[V] = v diff --git a/core/src/main/scala/lightdb/lock/LockManager.scala b/core/src/main/scala/lightdb/lock/LockManager.scala index 529f6034..d6713762 100644 --- a/core/src/main/scala/lightdb/lock/LockManager.scala +++ b/core/src/main/scala/lightdb/lock/LockManager.scala @@ -24,10 +24,10 @@ class LockManager[K, V] { // Attempts to acquire a lock for a given K and V. def acquire(key: K, value: => Option[V]): Option[V] = { // Get or create the Lock object with the ReentrantLock. - val lock = locks.computeIfAbsent(key, _ => new Lock(value, new ReentrantLock)) + val lock = locks.computeIfAbsent(key, _ => new Lock(value)) // Acquire the underlying ReentrantLock. - lock.lock.lock() + lock.lock.acquire() lock() // Return the associated value after acquiring the lock. } @@ -36,9 +36,9 @@ class LockManager[K, V] { val v: Option[V] = newValue locks.compute(key, (_, existingLock) => { // Update the value associated with the lock. - existingLock.lock.unlock() + existingLock.lock.release() - if (!existingLock.lock.hasQueuedThreads) { + if (existingLock.lock.availablePermits() > 0) { // No other threads are waiting, so remove the lock entry. null } else { diff --git a/core/src/main/scala/lightdb/store/StoreManager.scala b/core/src/main/scala/lightdb/store/StoreManager.scala index 080a9561..7b81f393 100644 --- a/core/src/main/scala/lightdb/store/StoreManager.scala +++ b/core/src/main/scala/lightdb/store/StoreManager.scala @@ -4,6 +4,8 @@ import lightdb.LightDB import lightdb.doc.{Document, DocumentModel} trait StoreManager { + lazy val name: String = getClass.getSimpleName.replace("$", "") + def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, name: String, storeMode: StoreMode): Store[Doc, Model] diff --git a/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala b/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala index 2ef71e9e..85f08154 100644 --- a/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala +++ b/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala @@ -7,9 +7,11 @@ import lightdb.store.{Store, StoreManager, StoreMode} case class SplitStoreManager(storage: StoreManager, searching: StoreManager, searchingMode: StoreMode = StoreMode.All) extends StoreManager { + override lazy val name: String = s"Split($storage, $searching)" + override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, - name: String, - storeMode: StoreMode): Store[Doc, Model] = SplitStore( + name: String, + storeMode: StoreMode): Store[Doc, Model] = SplitStore( storage = storage.create[Doc, Model](db, name, StoreMode.All), searching = searching.create[Doc, Model](db, name, searchingMode), storeMode = storeMode diff --git a/run_benchmarks.sh b/run_benchmarks.sh index bf1fcda4..1fbb29bb 100755 --- a/run_benchmarks.sh +++ b/run_benchmarks.sh @@ -4,7 +4,9 @@ set -e #declare -a arr=("SQLite" "H2" "Derby" "LightDB-SQLite" "LightDB-Map-SQLite" "LightDB-HaloDB-SQLite" "LightDB-Lucene" "LightDB-HaloDB-Lucene" "LightDB-H2" "LightDB-HaloDB-H2") #declare -a arr=("LightDB-SQLite" "LightDB-Map-SQLite" "LightDB-HaloDB-SQLite" "LightDB-Lucene" "LightDB-HaloDB-Lucene" "LightDB-H2" "LightDB-HaloDB-H2") -declare -a arr=("LightDB-Lucene" "LightDB-HaloDB-Lucene") +#declare -a arr=("LightDB-Lucene" "LightDB-HaloDB-Lucene") +#declare -a arr=("SQLite" "PostgreSQL" "H2" "Derby" "MongoDB" "LightDB-SQLite" "LightDB-Map-SQLite" "LightDB-HaloDB-SQLite" "LightDB-Lucene" "LightDB-HaloDB-Lucene" "LightDB-RocksDB-Lucene" "LightDB-H2" "LightDB-HaloDB-H2") +declare -a arr=("LightDB-H2" "LightDB-HaloDB-H2") for i in "${arr[@]}" do