From 41075c954a24483083f4332040925c344c9597d6 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Tue, 23 Apr 2024 11:55:35 -0500 Subject: [PATCH] Added atomic support to database --- .../scala/spec/SimpleHaloAndSQLiteSpec.scala | 26 +++++ build.sbt | 2 + core/src/main/scala/lightdb/Collection.scala | 96 ++++++++++++++----- .../src/main/scala/lightdb/IndexedLinks.scala | 6 +- .../main/scala/lightdb/RecordDocument.scala | 2 - .../lightdb/RecordDocumentCollection.scala | 9 +- core/src/main/scala/lightdb/Store.scala | 20 ++-- .../main/scala/lightdb/sqlite/SQLData.scala | 6 ++ .../scala/lightdb/sqlite/SQLiteSupport.scala | 5 +- 9 files changed, 130 insertions(+), 42 deletions(-) create mode 100644 sqlite/src/main/scala/lightdb/sqlite/SQLData.scala diff --git a/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala b/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala index a46b7c5c..bdb5e5cf 100644 --- a/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala +++ b/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala @@ -150,6 +150,32 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche } } } + "verify the number of records" in { + Person.index.count().map { size => + size should be(2) + } + } + "modify John" in { + Person.modify(id1) { + case Some(john) => IO(Some(john.copy(name = "Johnny Doe"))) + case None => throw new RuntimeException("John not found!") + }.map { person => + person.get.name should be("Johnny Doe") + } + } + "commit modified data" in { + Person.commit() + } + "verify the number of records has not changed after modify" in { + Person.index.count().map { size => + size should be(2) + } + } + "verify John was modified" in { + Person(id1).map { person => + person.name should be("Johnny Doe") + } + } "delete John" in { Person.delete(id1).map { deleted => deleted should not be empty diff --git a/build.sbt b/build.sbt index 69428087..ce9512dc 100644 --- a/build.sbt +++ b/build.sbt @@ -54,6 +54,7 @@ val fs2Version: String = "3.10.2" val scribeVersion: String = "3.13.2" val luceneVersion: String = "9.10.0" val sqliteVersion: String = "3.45.3.0" +val keysemaphoreVersion: String = "0.3.0-M1" val scalaTestVersion: String = "3.2.18" val catsEffectTestingVersion: String = "1.5.0" @@ -77,6 +78,7 @@ lazy val core = project.in(file("core")) "org.typelevel" %% "fabric-io" % fabricVersion, "co.fs2" %% "fs2-core" % fs2Version, "com.outr" %% "scribe-slf4j" % scribeVersion, + "io.chrisdavenport" %% "keysemaphore" % keysemaphoreVersion, "org.scalatest" %% "scalatest" % scalaTestVersion % Test, "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test ), diff --git a/core/src/main/scala/lightdb/Collection.scala b/core/src/main/scala/lightdb/Collection.scala index 66010490..d11c8e3e 100644 --- a/core/src/main/scala/lightdb/Collection.scala +++ b/core/src/main/scala/lightdb/Collection.scala @@ -1,30 +1,49 @@ package lightdb import cats.effect.IO +import cats.effect.unsafe.implicits.global import cats.implicits.{catsSyntaxApplicativeByName, toTraverseOps} +import fabric.Json import fabric.rw.RW +import io.chrisdavenport.keysemaphore.KeySemaphore import lightdb.index._ +sealed trait DocLock[D <: Document[D]] extends Any + +object DocLock { + case class Set[D <: Document[D]](id: Id[D]) extends DocLock[D] + class Empty[D <: Document[D]] extends DocLock[D] +} + abstract class Collection[D <: Document[D]](val collectionName: String, protected[lightdb] val db: LightDB, - val autoCommit: Boolean = false) { + val autoCommit: Boolean = false, + val atomic: Boolean = true) { type Field[F] = IndexedField[F, D] implicit val rw: RW[D] protected lazy val store: Store = db.createStoreInternal(collectionName) + // Id-level locking + private lazy val sem = KeySemaphore.of[IO, Id[D]](_ => 1L).unsafeRunSync() + private var _indexedLinks = List.empty[IndexedLinks[_, D]] def idStream: fs2.Stream[IO, Id[D]] = store.keyStream - def stream: fs2.Stream[IO, D] = store.streamJson[D] + def stream: fs2.Stream[IO, D] = store.streamJsonDocs[D] /** - * Called before set + * Called before preSetJson and before the data is set to the database */ protected def preSet(doc: D): IO[D] = IO.pure(doc) + /** + * Called after preSet and before the data is set to the database + */ + protected def preSetJson(json: Json): IO[Json] = IO.pure(json) + /** * Called after set */ @@ -42,34 +61,65 @@ abstract class Collection[D <: Document[D]](val collectionName: String, _ <- commit().whenA(autoCommit) } yield () - def set(doc: D): IO[D] = preSet(doc) - .flatMap(store.putJson(_)(rw)) - .flatMap { doc => - postSet(doc).map(_ => doc) - } - def modify(id: Id[D])(f: Option[D] => IO[Option[D]]): IO[Option[D]] = get(id).flatMap { option => - f(option).flatMap { - case Some(doc) => set(doc).map(Some.apply) - case None => IO.pure(None) - } + def set(doc: D)(implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[D] = withLock(doc._id) { _ => + for { + modified <- preSet(doc) + json <- preSetJson(rw.read(doc)) + _ <- store.putJson(doc._id, json) + _ <- postSet(doc) + } yield modified } - def delete(id: Id[D]): IO[Option[D]] = for { - modifiedId <- preDelete(id) - deleted <- get(modifiedId).flatMap { - case Some(d) => store.delete(id).map(_ => Some(d)) - case None => IO.pure(None) + + def withLock[Return](id: Id[D])(f: DocLock[D] => IO[Return]) + (implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[Return] = { + if (atomic && existingLock.isInstanceOf[DocLock.Empty[_]]) { + val lock: DocLock[D] = existingLock match { + case DocLock.Set(currentId) => + assert(currentId == id, s"Different Id used for lock! Existing: $currentId, New: $id") + existingLock + case _ => DocLock.Set[D](id) + } + val s = sem(id) + s + .acquire + .flatMap(_ => f(lock)) + .guarantee(s.release) + } else { + f(existingLock) } - _ <- deleted match { - case Some(doc) => postDelete(doc) - case None => IO.unit + } + + def modify(id: Id[D]) + (f: Option[D] => IO[Option[D]]) + (implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[Option[D]] = withLock(id) { implicit lock => + get(id).flatMap { option => + f(option).flatMap { + case Some(doc) => set(doc)(lock).map(Some.apply) + case None => IO.pure(None) + } } - } yield deleted + } + def delete(id: Id[D]) + (implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[Option[D]] = withLock(id) { implicit lock => + for { + modifiedId <- preDelete(id) + deleted <- get(modifiedId).flatMap { + case Some(d) => store.delete(id).map(_ => Some(d)) + case None => IO.pure(None) + } + _ <- deleted match { + case Some(doc) => postDelete(doc) + case None => IO.unit + } + } yield deleted + } + def truncate(): IO[Unit] = for { _ <- store.truncate() _ <- _indexedLinks.map(_.store.truncate()).sequence } yield () - def get(id: Id[D]): IO[Option[D]] = store.getJson(id) + def get(id: Id[D]): IO[Option[D]] = store.getJsonDoc(id) def apply(id: Id[D]): IO[D] = get(id) .map(_.getOrElse(throw new RuntimeException(s"$id not found in $collectionName"))) diff --git a/core/src/main/scala/lightdb/IndexedLinks.scala b/core/src/main/scala/lightdb/IndexedLinks.scala index b93d0f43..4feac601 100644 --- a/core/src/main/scala/lightdb/IndexedLinks.scala +++ b/core/src/main/scala/lightdb/IndexedLinks.scala @@ -42,7 +42,7 @@ case class IndexedLinks[V, D <: Document[D]](name: String, val id = Id[IndexedLink[D]](key) IndexedLink(_id = id, links = List(doc._id)) } - _ <- store.putJson(updated) + _ <- store.putJsonDoc(updated) } yield () } @@ -61,7 +61,7 @@ case class IndexedLinks[V, D <: Document[D]](name: String, case None => None } _ <- updated match { - case Some(l) => store.putJson(l) + case Some(l) => store.putJsonDoc(l) case None => IO.unit } } yield () @@ -70,7 +70,7 @@ case class IndexedLinks[V, D <: Document[D]](name: String, protected[lightdb] def link(value: V): IO[Option[IndexedLink[D]]] = { val key = createKey(value) val id = Id[IndexedLink[D]](key) - store.getJson(id) + store.getJsonDoc(id) } def queryIds(value: V): fs2.Stream[IO, Id[D]] = { diff --git a/core/src/main/scala/lightdb/RecordDocument.scala b/core/src/main/scala/lightdb/RecordDocument.scala index 48af64ff..84cffbfe 100644 --- a/core/src/main/scala/lightdb/RecordDocument.scala +++ b/core/src/main/scala/lightdb/RecordDocument.scala @@ -3,6 +3,4 @@ package lightdb trait RecordDocument[D <: RecordDocument[D]] extends Document[D] { def created: Long def modified: Long - - def modify(): D } \ No newline at end of file diff --git a/core/src/main/scala/lightdb/RecordDocumentCollection.scala b/core/src/main/scala/lightdb/RecordDocumentCollection.scala index d1fb1fb5..234bfc62 100644 --- a/core/src/main/scala/lightdb/RecordDocumentCollection.scala +++ b/core/src/main/scala/lightdb/RecordDocumentCollection.scala @@ -1,7 +1,12 @@ package lightdb import cats.effect.IO +import fabric.Json abstract class RecordDocumentCollection[D <: RecordDocument[D]](collectionName: String, db: LightDB) extends Collection[D](collectionName, db) { - override protected def preSet(doc: D): IO[D] = super.preSet(doc.modify()) -} + override protected def preSetJson(json: Json): IO[Json] = IO { + json.modify("modified") { _ => + System.currentTimeMillis() + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/lightdb/Store.scala b/core/src/main/scala/lightdb/Store.scala index 6899686f..cfbcabc3 100644 --- a/core/src/main/scala/lightdb/Store.scala +++ b/core/src/main/scala/lightdb/Store.scala @@ -1,6 +1,7 @@ package lightdb import cats.effect.IO +import fabric.Json import fabric.io.{JsonFormatter, JsonParser} import fabric.rw._ @@ -21,27 +22,28 @@ trait Store { def dispose(): IO[Unit] - def streamJson[D: RW]: fs2.Stream[IO, D] = stream[D].map { + def streamJsonDocs[D: RW]: fs2.Stream[IO, D] = stream[D].map { case (_, bytes) => val jsonString = bytes.string val json = JsonParser(jsonString) json.as[D] } - def getJson[D: RW](id: Id[D]): IO[Option[D]] = get(id) + def getJsonDoc[D: RW](id: Id[D]): IO[Option[D]] = get(id) .map(_.map { bytes => val jsonString = bytes.string val json = JsonParser(jsonString) json.as[D] }) - def putJson[D <: Document[D]](doc: D) - (implicit rw: RW[D]): IO[D] = IO { - val json = doc.json - JsonFormatter.Compact(json) - }.flatMap { jsonString => - put(doc._id, jsonString.getBytes).map(_ => doc) - } + def putJsonDoc[D <: Document[D]](doc: D) + (implicit rw: RW[D]): IO[D] = IO(doc.json) + .flatMap(json => putJson(doc._id, json).map(_ => doc)) + + def putJson[D <: Document[D]](id: Id[D], json: Json): IO[Unit] = IO(JsonFormatter.Compact(json)) + .flatMap { jsonString => + put(id, jsonString.getBytes).map(_ => ()) + } def truncate(): IO[Unit] = keyStream[Any] .evalMap { id => diff --git a/sqlite/src/main/scala/lightdb/sqlite/SQLData.scala b/sqlite/src/main/scala/lightdb/sqlite/SQLData.scala new file mode 100644 index 00000000..6dc3f319 --- /dev/null +++ b/sqlite/src/main/scala/lightdb/sqlite/SQLData.scala @@ -0,0 +1,6 @@ +package lightdb.sqlite + +import cats.effect.IO +import lightdb.{Document, Id} + +case class SQLData[D <: Document[D]](ids: List[Id[D]], lookup: Option[Id[D] => IO[D]]) diff --git a/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala b/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala index 69309b43..7c66a577 100644 --- a/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala +++ b/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala @@ -9,6 +9,7 @@ import lightdb.util.FlushingBacklog import java.nio.file.Path import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Types} +// TODO: Solve for uncommitted records being deleted leading to them being recreated at commit trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] { private lazy val path: Path = db.directory.resolve(collectionName).resolve("sqlite.db") // TODO: Should each collection have a connection? @@ -149,6 +150,4 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] { override def dispose(): IO[Unit] = super.dispose().map { _ => connection.close() } -} - -case class SQLData[D <: Document[D]](ids: List[Id[D]], lookup: Option[Id[D] => IO[D]]) \ No newline at end of file +} \ No newline at end of file