diff --git a/build.sbt b/build.sbt index e7b5c9c..9c52b2c 100644 --- a/build.sbt +++ b/build.sbt @@ -104,6 +104,7 @@ lazy val core = crossProject(JVMPlatform) "com.outr" %% "scribe-slf4j" % scribeVersion, "org.locationtech.spatial4j" % "spatial4j" % spatial4JVersion, "org.locationtech.jts" % "jts-core" % jtsVersion, + "com.outr" %% "rapid-core" % rapidVersion, "org.scalatest" %%% "scalatest" % scalaTestVersion % Test ), libraryDependencies ++= ( diff --git a/core/src/main/scala/lightdb/store/Store.scala b/core/src/main/scala/lightdb/store/Store.scala index 82cf1e1..30b2a06 100644 --- a/core/src/main/scala/lightdb/store/Store.scala +++ b/core/src/main/scala/lightdb/store/Store.scala @@ -14,6 +14,7 @@ import lightdb.field.Field import lightdb.field.Field._ import lightdb.lock.LockManager import lightdb.trigger.CollectionTriggers +import rapid.Task import java.io.File import java.util.concurrent.ConcurrentHashMap @@ -39,59 +40,53 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name protected def toString(doc: Doc): String = JsonFormatter.Compact(doc.json(model.rw)) protected def fromString(string: String): Doc = JsonParser(string).as[Doc](model.rw) - lazy val hasSpatial: Boolean = fields.exists(_.isSpatial) + lazy val hasSpatial: Task[Boolean] = Task(fields.exists(_.isSpatial)) - final def createTransaction(): Transaction[Doc] = { - val t = new Transaction[Doc] - prepareTransaction(t) - t - } - - def prepareTransaction(transaction: Transaction[Doc]): Unit + def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] - def releaseTransaction(transaction: Transaction[Doc]): Unit = { + def releaseTransaction(transaction: Transaction[Doc]): Task[Unit] = Task { transaction.commit() } - def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit + def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Unit] - def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit + def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Unit] - def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean + def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] - def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] + def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Task[Option[Doc]] - def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Boolean + def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Task[Boolean] - def count(implicit transaction: Transaction[Doc]): Int + def count(implicit transaction: Transaction[Doc]): Task[Int] - def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] + def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] - def jsonIterator(implicit transaction: Transaction[Doc]): Iterator[Json] = iterator.map(_.json(model.rw)) + def jsonStream(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = stream.map(_.json(model.rw)) def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] + (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] def aggregate(query: AggregateQuery[Doc, Model]) - (implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] + (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] - def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Int + def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Task[Int] - def truncate()(implicit transaction: Transaction[Doc]): Int + def truncate()(implicit transaction: Transaction[Doc]): Task[Int] - def verify(): Boolean = false + def verify(): Task[Boolean] = Task.pure(false) - def reIndex(): Boolean = false + def reIndex(): Task[Boolean] = Task.pure(false) - def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Doc = get(model._id, id).getOrElse { + def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Doc] = get(model._id, id).map(_.getOrElse { throw DocNotFoundException(name, "_id", id) - } + }) def modify(id: Id[Doc], establishLock: Boolean = true, deleteOnNone: Boolean = false) (f: Option[Doc] => Option[Doc]) - (implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(idField, id), establishLock) { existing => + (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = this.lock(id, get(idField, id), establishLock) { existing => f(existing) match { case Some(doc) => upsert(doc) @@ -108,24 +103,20 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name def active: Int = set.size() - def apply[Return](f: Transaction[Doc] => Return): Return = { - val transaction = create() - try { - f(transaction) - } finally { - release(transaction) - } + def apply[Return](f: Transaction[Doc] => Task[Return]): Task[Return] = create().flatMap { transaction => + f(transaction).guarantee(release(transaction)) } - def create(): Transaction[Doc] = { + def create(): Task[Transaction[Doc]] = Task { if (Collection.LogTransactions) scribe.info(s"Creating new Transaction for $name") - val transaction = createTransaction() + val transaction = new Transaction[Doc] + prepareTransaction(transaction) set.add(transaction) trigger.transactionStart(transaction) transaction } - def release(transaction: Transaction[Doc]): Unit = { + def release(transaction: Transaction[Doc]): Task[Unit] = Task { if (Collection.LogTransactions) scribe.info(s"Releasing Transaction for $name") trigger.transactionEnd(transaction) releaseTransaction(transaction) @@ -133,7 +124,7 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name set.remove(transaction) } - def releaseAll(): Int = { + def releaseAll(): Task[Int] = Task { val list = set.iterator().asScala.toList list.foreach { transaction => release(transaction) @@ -142,7 +133,7 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name } } - def dispose(): Unit + def dispose(): Task[Unit] } object Store {