Skip to content

Commit

Permalink
Working toward full async
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 27, 2024
1 parent 58ae85b commit b059914
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 38 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ lazy val core = crossProject(JVMPlatform)
"com.outr" %% "scribe-slf4j" % scribeVersion,
"org.locationtech.spatial4j" % "spatial4j" % spatial4JVersion,
"org.locationtech.jts" % "jts-core" % jtsVersion,
"com.outr" %% "rapid-core" % rapidVersion,
"org.scalatest" %%% "scalatest" % scalaTestVersion % Test
),
libraryDependencies ++= (
Expand Down
67 changes: 29 additions & 38 deletions core/src/main/scala/lightdb/store/Store.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import lightdb.field.Field
import lightdb.field.Field._
import lightdb.lock.LockManager
import lightdb.trigger.CollectionTriggers
import rapid.Task

import java.io.File
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -39,59 +40,53 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name
protected def toString(doc: Doc): String = JsonFormatter.Compact(doc.json(model.rw))
protected def fromString(string: String): Doc = JsonParser(string).as[Doc](model.rw)

lazy val hasSpatial: Boolean = fields.exists(_.isSpatial)
lazy val hasSpatial: Task[Boolean] = Task(fields.exists(_.isSpatial))

final def createTransaction(): Transaction[Doc] = {
val t = new Transaction[Doc]
prepareTransaction(t)
t
}

def prepareTransaction(transaction: Transaction[Doc]): Unit
def prepareTransaction(transaction: Transaction[Doc]): Task[Unit]

def releaseTransaction(transaction: Transaction[Doc]): Unit = {
def releaseTransaction(transaction: Transaction[Doc]): Task[Unit] = Task {
transaction.commit()
}

def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit
def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Unit]

def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit
def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Unit]

def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean
def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean]

def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc]
def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Task[Option[Doc]]

def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Boolean
def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Task[Boolean]

def count(implicit transaction: Transaction[Doc]): Int
def count(implicit transaction: Transaction[Doc]): Task[Int]

def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc]
def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc]

def jsonIterator(implicit transaction: Transaction[Doc]): Iterator[Json] = iterator.map(_.json(model.rw))
def jsonStream(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = stream.map(_.json(model.rw))

def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V])
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V]
(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]]

def aggregate(query: AggregateQuery[Doc, Model])
(implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]]
(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]]

def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Int
def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Task[Int]

def truncate()(implicit transaction: Transaction[Doc]): Int
def truncate()(implicit transaction: Transaction[Doc]): Task[Int]

def verify(): Boolean = false
def verify(): Task[Boolean] = Task.pure(false)

def reIndex(): Boolean = false
def reIndex(): Task[Boolean] = Task.pure(false)

def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Doc = get(model._id, id).getOrElse {
def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Doc] = get(model._id, id).map(_.getOrElse {
throw DocNotFoundException(name, "_id", id)
}
})

def modify(id: Id[Doc],
establishLock: Boolean = true,
deleteOnNone: Boolean = false)
(f: Option[Doc] => Option[Doc])
(implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(idField, id), establishLock) { existing =>
(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = this.lock(id, get(idField, id), establishLock) { existing =>
f(existing) match {
case Some(doc) =>
upsert(doc)
Expand All @@ -108,32 +103,28 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name

def active: Int = set.size()

def apply[Return](f: Transaction[Doc] => Return): Return = {
val transaction = create()
try {
f(transaction)
} finally {
release(transaction)
}
def apply[Return](f: Transaction[Doc] => Task[Return]): Task[Return] = create().flatMap { transaction =>
f(transaction).guarantee(release(transaction))
}

def create(): Transaction[Doc] = {
def create(): Task[Transaction[Doc]] = Task {
if (Collection.LogTransactions) scribe.info(s"Creating new Transaction for $name")
val transaction = createTransaction()
val transaction = new Transaction[Doc]
prepareTransaction(transaction)
set.add(transaction)
trigger.transactionStart(transaction)
transaction
}

def release(transaction: Transaction[Doc]): Unit = {
def release(transaction: Transaction[Doc]): Task[Unit] = Task {
if (Collection.LogTransactions) scribe.info(s"Releasing Transaction for $name")
trigger.transactionEnd(transaction)
releaseTransaction(transaction)
transaction.close()
set.remove(transaction)
}

def releaseAll(): Int = {
def releaseAll(): Task[Int] = Task {
val list = set.iterator().asScala.toList
list.foreach { transaction =>
release(transaction)
Expand All @@ -142,7 +133,7 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name
}
}

def dispose(): Unit
def dispose(): Task[Unit]
}

object Store {
Expand Down

0 comments on commit b059914

Please sign in to comment.