From 43ce43a7f297579e3413dbdfe955fd65345fa055 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Sun, 16 Jun 2024 08:14:47 -0500 Subject: [PATCH] Preliminary get/set Store support --- next/src/main/scala/lightdb/LightDB.scala | 6 +- .../scala/lightdb/collection/Collection.scala | 87 ++++++++++++++++++- .../lightdb/document/DocumentListener.scala | 6 +- .../lightdb/document/DocumentModel.scala | 79 +---------------- .../lightdb/transaction/Transaction.scala | 3 +- next/src/test/scala/spec/BasicsSpec.scala | 12 ++- 6 files changed, 106 insertions(+), 87 deletions(-) diff --git a/next/src/main/scala/lightdb/LightDB.scala b/next/src/main/scala/lightdb/LightDB.scala index 9ef22c9e..6b7ba728 100644 --- a/next/src/main/scala/lightdb/LightDB.scala +++ b/next/src/main/scala/lightdb/LightDB.scala @@ -1,6 +1,7 @@ package lightdb import cats.effect.IO +import cats.implicits.catsSyntaxParallelSequence1 import lightdb.collection.Collection import lightdb.document.{Document, DocumentModel} import lightdb.store.StoreManager @@ -19,7 +20,10 @@ trait LightDB extends Initializable { c } - override protected def initialize(): IO[Unit] = ??? + override protected def initialize(): IO[Unit] = for { + _ <- collections.map(_.init).parSequence + _ <- IO.unit // TODO: Database upgrades and initialization + } yield () def dispose(): IO[Unit] = IO.unit } \ No newline at end of file diff --git a/next/src/main/scala/lightdb/collection/Collection.scala b/next/src/main/scala/lightdb/collection/Collection.scala index 23d35996..bb77ca20 100644 --- a/next/src/main/scala/lightdb/collection/Collection.scala +++ b/next/src/main/scala/lightdb/collection/Collection.scala @@ -1,12 +1,91 @@ package lightdb.collection import cats.effect.IO -import lightdb.LightDB -import lightdb.document.{Document, DocumentModel} +import lightdb.{Id, LightDB} +import lightdb.document.{Document, DocumentListener, DocumentModel} +import lightdb.transaction.Transaction import lightdb.util.Initializable +import cats.implicits._ case class Collection[D <: Document[D]](name: String, model: DocumentModel[D], - db: LightDB) extends Initializable { - override protected def initialize(): IO[Unit] = model.init(this) + db: LightDB) extends Initializable { collection => + private implicit class ListIO[R](list: List[IO[R]]) { + def ioSeq: IO[Unit] = if (model.parallel) { + list.parSequence.map(_ => ()) + } else { + list.sequence.map(_ => ()) + } + } + + private def recurseOption(doc: D, + invoke: (DocumentListener[D], D) => IO[Option[D]], + listeners: List[DocumentListener[D]] = model.listener()): IO[Option[D]] = listeners.headOption match { + case Some(l) => invoke(l, doc).flatMap { + case Some(v) => recurseOption(v, invoke, listeners.tail) + case None => IO.pure(None) + } + case None => IO.pure(Some(doc)) + } + + override protected def initialize(): IO[Unit] = for { + _ <- IO(model.collection = this) + _ <- db.storeManager[D](name).map(store => model.store = store) + _ <- model.listener().map(_.init(this)).ioSeq.map(_ => model._initialized.set(true)) + } yield () + + object transaction { + def apply[Return](f: Transaction[D] => IO[Return]): IO[Return] = create() + .flatMap { transaction => + f(transaction).guarantee(release(transaction)) + } + + private def create(): IO[Transaction[D]] = for { + transaction <- IO(Transaction[D](collection)) + _ <- model.listener().map(l => l.transactionStart(transaction)).ioSeq + } yield transaction + + private def release(transaction: Transaction[D]): IO[Unit] = + model.listener().map(l => l.transactionEnd(transaction)).ioSeq + } + + def apply(id: Id[D])(implicit transaction: Transaction[D]): IO[D] = model.store(id) + + def get(id: Id[D])(implicit transaction: Transaction[D]): IO[Option[D]] = model.store.get(id) + + final def set(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = { + recurseOption(doc, (l, d) => l.preSet(d, transaction)).flatMap { + case Some(d) => for { + _ <- model.store.set(d) + _ <- model.listener().map(l => l.postSet(d, transaction)).ioSeq + } yield Some(d) + case None => IO.pure(None) + } + } + + def stream(implicit transaction: Transaction[D]): fs2.Stream[IO, D] = model.store.stream + + def count(implicit transaction: Transaction[D]): IO[Int] = model.store.count + + def idStream(implicit transaction: Transaction[D]): fs2.Stream[IO, Id[D]] = model.store.idStream + + final def delete(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = { + recurseOption(doc, (l, d) => l.preDelete(d, transaction)).flatMap { + case Some(d) => for { + _ <- model.store.delete(d._id) + _ <- model.listener().map(l => l.postDelete(d, transaction)).ioSeq + } yield Some(d) + case None => IO.pure(None) + } + } + + def truncate()(implicit transaction: Transaction[D]): IO[Unit] = model.listener() + .map(l => l.truncate(transaction)) + .ioSeq + .map(_ => ()) + + def dispose(): IO[Unit] = model.listener() + .map(l => l.dispose()) + .ioSeq + .map(_ => ()) } \ No newline at end of file diff --git a/next/src/main/scala/lightdb/document/DocumentListener.scala b/next/src/main/scala/lightdb/document/DocumentListener.scala index 6d58b16c..3e5c51c9 100644 --- a/next/src/main/scala/lightdb/document/DocumentListener.scala +++ b/next/src/main/scala/lightdb/document/DocumentListener.scala @@ -1,8 +1,6 @@ package lightdb.document import cats.effect.IO -import fabric.Json -import lightdb.{Id, LightDB} import lightdb.collection.Collection import lightdb.transaction.Transaction import moduload.Priority @@ -12,6 +10,10 @@ trait DocumentListener[D <: Document[D]] { def init(collection: Collection[D]): IO[Unit] = IO.unit + def transactionStart(transaction: Transaction[D]): IO[Unit] = IO.unit + + def transactionEnd(transaction: Transaction[D]): IO[Unit] = IO.unit + def preSet(doc: D, transaction: Transaction[D]): IO[Option[D]] = IO.pure(Some(doc)) def postSet(doc: D, transaction: Transaction[D]): IO[Unit] = IO.unit diff --git a/next/src/main/scala/lightdb/document/DocumentModel.scala b/next/src/main/scala/lightdb/document/DocumentModel.scala index 0ed6c0b2..43ac3758 100644 --- a/next/src/main/scala/lightdb/document/DocumentModel.scala +++ b/next/src/main/scala/lightdb/document/DocumentModel.scala @@ -1,23 +1,21 @@ package lightdb.document import cats.effect.IO -import cats.implicits.{catsSyntaxParallelSequence1, toTraverseOps} import lightdb.util.Unique import lightdb.Id import lightdb.collection.Collection import lightdb.store.Store -import lightdb.transaction.Transaction import java.util.concurrent.atomic.AtomicBoolean trait DocumentModel[D <: Document[D]] { - private val _initialized = new AtomicBoolean(false) - private var collection: Collection[D] = _ - private var store: Store[D] = _ + private[lightdb] val _initialized = new AtomicBoolean(false) + private[lightdb] var collection: Collection[D] = _ + private[lightdb] var store: Store[D] = _ final def initialized: Boolean = _initialized.get() - protected def parallel: Boolean = true + def parallel: Boolean = true object listener { private var list = List.empty[DocumentListener[D]] @@ -34,74 +32,5 @@ trait DocumentModel[D <: Document[D]] { def apply(): List[DocumentListener[D]] = list } - private implicit class ListIO[R](list: List[IO[R]]) { - def ioSeq: IO[Unit] = if (parallel) { - list.parSequence.map(_ => ()) - } else { - list.sequence.map(_ => ()) - } - } - - private def recurseOption(doc: D, - invoke: (DocumentListener[D], D) => IO[Option[D]], - listeners: List[DocumentListener[D]] = listener()): IO[Option[D]] = listeners.headOption match { - case Some(l) => invoke(l, doc).flatMap { - case Some(v) => recurseOption(v, invoke, listeners.tail) - case None => IO.pure(None) - } - case None => IO.pure(Some(doc)) - } - - private[lightdb] def init(collection: Collection[D]): IO[Unit] = { - this.collection = collection - for { - _ <- collection.db.storeManager[D](collection.name).map(store => this.store = store) - _ <- listener() - .map(_.init(collection)) - .ioSeq - .map(_ => _initialized.set(true)) - } yield () - } - def id(value: String = Unique()): Id[D] = Id(value) - - def apply(id: Id[D])(implicit transaction: Transaction[D]): IO[D] = store(id) - - def get(id: Id[D])(implicit transaction: Transaction[D]): IO[Option[D]] = store.get(id) - - final def set(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = { - recurseOption(doc, (l, d) => l.preSet(d, transaction)).flatMap { - case Some(d) => for { - _ <- store.set(d) - _ <- listener().map(l => l.postSet(d, transaction)).ioSeq - } yield Some(d) - case None => IO.pure(None) - } - } - - def stream(implicit transaction: Transaction[D]): fs2.Stream[IO, D] = store.stream - - def count(implicit transaction: Transaction[D]): IO[Int] = store.count - - def idStream(implicit transaction: Transaction[D]): fs2.Stream[IO, Id[D]] = store.idStream - - final def delete(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = { - recurseOption(doc, (l, d) => l.preDelete(d, transaction)).flatMap { - case Some(d) => for { - _ <- store.delete(d._id) - _ <- listener().map(l => l.postDelete(d, transaction)).ioSeq - } yield Some(d) - case None => IO.pure(None) - } - } - - def truncate()(implicit transaction: Transaction[D]): IO[Unit] = listener() - .map(l => l.truncate(transaction)) - .ioSeq - .map(_ => ()) - - def dispose(): IO[Unit] = listener() - .map(l => l.dispose()) - .ioSeq - .map(_ => ()) } \ No newline at end of file diff --git a/next/src/main/scala/lightdb/transaction/Transaction.scala b/next/src/main/scala/lightdb/transaction/Transaction.scala index b0bb0642..512d7bf9 100644 --- a/next/src/main/scala/lightdb/transaction/Transaction.scala +++ b/next/src/main/scala/lightdb/transaction/Transaction.scala @@ -1,5 +1,6 @@ package lightdb.transaction +import lightdb.collection.Collection import lightdb.document.Document -trait Transaction[D <: Document[D]] \ No newline at end of file +case class Transaction[D <: Document[D]](collection: Collection[D]) \ No newline at end of file diff --git a/next/src/test/scala/spec/BasicsSpec.scala b/next/src/test/scala/spec/BasicsSpec.scala index a6814c56..812d8e6e 100644 --- a/next/src/test/scala/spec/BasicsSpec.scala +++ b/next/src/test/scala/spec/BasicsSpec.scala @@ -17,11 +17,15 @@ class BasicsSpec extends AsyncWordSpec with AsyncIOSpec with Matchers { DB.init } "insert the first record" in { - DB.people.set(amy) + DB.people.transaction { implicit transaction => + DB.people.set(amy).map(o => o should not be None) + } } "retrieve the first record by id" in { - DB.people(amy._id).map { p => - p should be(amy) + DB.people.transaction { implicit transaction => + DB.people(amy._id).map { p => + p should be(amy) + } } } "dispose the database" in { @@ -30,7 +34,7 @@ class BasicsSpec extends AsyncWordSpec with AsyncIOSpec with Matchers { } object DB extends LightDB { - val people: Collection[Person] = collection(Person) + val people: Collection[Person] = collection("people", Person) override def storeManager: StoreManager = AtomicMapStore }