Skip to content

Commit

Permalink
Preliminary get/set Store support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jun 16, 2024
1 parent bce8947 commit 43ce43a
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 87 deletions.
6 changes: 5 additions & 1 deletion next/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lightdb

import cats.effect.IO
import cats.implicits.catsSyntaxParallelSequence1
import lightdb.collection.Collection
import lightdb.document.{Document, DocumentModel}
import lightdb.store.StoreManager
Expand All @@ -19,7 +20,10 @@ trait LightDB extends Initializable {
c
}

override protected def initialize(): IO[Unit] = ???
override protected def initialize(): IO[Unit] = for {
_ <- collections.map(_.init).parSequence
_ <- IO.unit // TODO: Database upgrades and initialization
} yield ()

def dispose(): IO[Unit] = IO.unit
}
87 changes: 83 additions & 4 deletions next/src/main/scala/lightdb/collection/Collection.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,91 @@
package lightdb.collection

import cats.effect.IO
import lightdb.LightDB
import lightdb.document.{Document, DocumentModel}
import lightdb.{Id, LightDB}
import lightdb.document.{Document, DocumentListener, DocumentModel}
import lightdb.transaction.Transaction
import lightdb.util.Initializable
import cats.implicits._

case class Collection[D <: Document[D]](name: String,
model: DocumentModel[D],
db: LightDB) extends Initializable {
override protected def initialize(): IO[Unit] = model.init(this)
db: LightDB) extends Initializable { collection =>
private implicit class ListIO[R](list: List[IO[R]]) {
def ioSeq: IO[Unit] = if (model.parallel) {
list.parSequence.map(_ => ())
} else {
list.sequence.map(_ => ())
}
}

private def recurseOption(doc: D,
invoke: (DocumentListener[D], D) => IO[Option[D]],
listeners: List[DocumentListener[D]] = model.listener()): IO[Option[D]] = listeners.headOption match {
case Some(l) => invoke(l, doc).flatMap {
case Some(v) => recurseOption(v, invoke, listeners.tail)
case None => IO.pure(None)
}
case None => IO.pure(Some(doc))
}

override protected def initialize(): IO[Unit] = for {
_ <- IO(model.collection = this)
_ <- db.storeManager[D](name).map(store => model.store = store)
_ <- model.listener().map(_.init(this)).ioSeq.map(_ => model._initialized.set(true))
} yield ()

object transaction {
def apply[Return](f: Transaction[D] => IO[Return]): IO[Return] = create()
.flatMap { transaction =>
f(transaction).guarantee(release(transaction))
}

private def create(): IO[Transaction[D]] = for {
transaction <- IO(Transaction[D](collection))
_ <- model.listener().map(l => l.transactionStart(transaction)).ioSeq
} yield transaction

private def release(transaction: Transaction[D]): IO[Unit] =
model.listener().map(l => l.transactionEnd(transaction)).ioSeq
}

def apply(id: Id[D])(implicit transaction: Transaction[D]): IO[D] = model.store(id)

def get(id: Id[D])(implicit transaction: Transaction[D]): IO[Option[D]] = model.store.get(id)

final def set(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = {
recurseOption(doc, (l, d) => l.preSet(d, transaction)).flatMap {
case Some(d) => for {
_ <- model.store.set(d)
_ <- model.listener().map(l => l.postSet(d, transaction)).ioSeq
} yield Some(d)
case None => IO.pure(None)
}
}

def stream(implicit transaction: Transaction[D]): fs2.Stream[IO, D] = model.store.stream

def count(implicit transaction: Transaction[D]): IO[Int] = model.store.count

def idStream(implicit transaction: Transaction[D]): fs2.Stream[IO, Id[D]] = model.store.idStream

final def delete(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = {
recurseOption(doc, (l, d) => l.preDelete(d, transaction)).flatMap {
case Some(d) => for {
_ <- model.store.delete(d._id)
_ <- model.listener().map(l => l.postDelete(d, transaction)).ioSeq
} yield Some(d)
case None => IO.pure(None)
}
}

def truncate()(implicit transaction: Transaction[D]): IO[Unit] = model.listener()
.map(l => l.truncate(transaction))
.ioSeq
.map(_ => ())

def dispose(): IO[Unit] = model.listener()
.map(l => l.dispose())
.ioSeq
.map(_ => ())
}
6 changes: 4 additions & 2 deletions next/src/main/scala/lightdb/document/DocumentListener.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package lightdb.document

import cats.effect.IO
import fabric.Json
import lightdb.{Id, LightDB}
import lightdb.collection.Collection
import lightdb.transaction.Transaction
import moduload.Priority
Expand All @@ -12,6 +10,10 @@ trait DocumentListener[D <: Document[D]] {

def init(collection: Collection[D]): IO[Unit] = IO.unit

def transactionStart(transaction: Transaction[D]): IO[Unit] = IO.unit

def transactionEnd(transaction: Transaction[D]): IO[Unit] = IO.unit

def preSet(doc: D, transaction: Transaction[D]): IO[Option[D]] = IO.pure(Some(doc))

def postSet(doc: D, transaction: Transaction[D]): IO[Unit] = IO.unit
Expand Down
79 changes: 4 additions & 75 deletions next/src/main/scala/lightdb/document/DocumentModel.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package lightdb.document

import cats.effect.IO
import cats.implicits.{catsSyntaxParallelSequence1, toTraverseOps}
import lightdb.util.Unique
import lightdb.Id
import lightdb.collection.Collection
import lightdb.store.Store
import lightdb.transaction.Transaction

import java.util.concurrent.atomic.AtomicBoolean

trait DocumentModel[D <: Document[D]] {
private val _initialized = new AtomicBoolean(false)
private var collection: Collection[D] = _
private var store: Store[D] = _
private[lightdb] val _initialized = new AtomicBoolean(false)
private[lightdb] var collection: Collection[D] = _
private[lightdb] var store: Store[D] = _

final def initialized: Boolean = _initialized.get()

protected def parallel: Boolean = true
def parallel: Boolean = true

object listener {
private var list = List.empty[DocumentListener[D]]
Expand All @@ -34,74 +32,5 @@ trait DocumentModel[D <: Document[D]] {
def apply(): List[DocumentListener[D]] = list
}

private implicit class ListIO[R](list: List[IO[R]]) {
def ioSeq: IO[Unit] = if (parallel) {
list.parSequence.map(_ => ())
} else {
list.sequence.map(_ => ())
}
}

private def recurseOption(doc: D,
invoke: (DocumentListener[D], D) => IO[Option[D]],
listeners: List[DocumentListener[D]] = listener()): IO[Option[D]] = listeners.headOption match {
case Some(l) => invoke(l, doc).flatMap {
case Some(v) => recurseOption(v, invoke, listeners.tail)
case None => IO.pure(None)
}
case None => IO.pure(Some(doc))
}

private[lightdb] def init(collection: Collection[D]): IO[Unit] = {
this.collection = collection
for {
_ <- collection.db.storeManager[D](collection.name).map(store => this.store = store)
_ <- listener()
.map(_.init(collection))
.ioSeq
.map(_ => _initialized.set(true))
} yield ()
}

def id(value: String = Unique()): Id[D] = Id(value)

def apply(id: Id[D])(implicit transaction: Transaction[D]): IO[D] = store(id)

def get(id: Id[D])(implicit transaction: Transaction[D]): IO[Option[D]] = store.get(id)

final def set(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = {
recurseOption(doc, (l, d) => l.preSet(d, transaction)).flatMap {
case Some(d) => for {
_ <- store.set(d)
_ <- listener().map(l => l.postSet(d, transaction)).ioSeq
} yield Some(d)
case None => IO.pure(None)
}
}

def stream(implicit transaction: Transaction[D]): fs2.Stream[IO, D] = store.stream

def count(implicit transaction: Transaction[D]): IO[Int] = store.count

def idStream(implicit transaction: Transaction[D]): fs2.Stream[IO, Id[D]] = store.idStream

final def delete(doc: D)(implicit transaction: Transaction[D]): IO[Option[D]] = {
recurseOption(doc, (l, d) => l.preDelete(d, transaction)).flatMap {
case Some(d) => for {
_ <- store.delete(d._id)
_ <- listener().map(l => l.postDelete(d, transaction)).ioSeq
} yield Some(d)
case None => IO.pure(None)
}
}

def truncate()(implicit transaction: Transaction[D]): IO[Unit] = listener()
.map(l => l.truncate(transaction))
.ioSeq
.map(_ => ())

def dispose(): IO[Unit] = listener()
.map(l => l.dispose())
.ioSeq
.map(_ => ())
}
3 changes: 2 additions & 1 deletion next/src/main/scala/lightdb/transaction/Transaction.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package lightdb.transaction

import lightdb.collection.Collection
import lightdb.document.Document

trait Transaction[D <: Document[D]]
case class Transaction[D <: Document[D]](collection: Collection[D])
12 changes: 8 additions & 4 deletions next/src/test/scala/spec/BasicsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ class BasicsSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
DB.init
}
"insert the first record" in {
DB.people.set(amy)
DB.people.transaction { implicit transaction =>
DB.people.set(amy).map(o => o should not be None)
}
}
"retrieve the first record by id" in {
DB.people(amy._id).map { p =>
p should be(amy)
DB.people.transaction { implicit transaction =>
DB.people(amy._id).map { p =>
p should be(amy)
}
}
}
"dispose the database" in {
Expand All @@ -30,7 +34,7 @@ class BasicsSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
}

object DB extends LightDB {
val people: Collection[Person] = collection(Person)
val people: Collection[Person] = collection("people", Person)

override def storeManager: StoreManager = AtomicMapStore
}
Expand Down

0 comments on commit 43ce43a

Please sign in to comment.