Skip to content

Commit

Permalink
Added atomic support to database
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Apr 23, 2024
1 parent 8d1e438 commit 41075c9
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 42 deletions.
26 changes: 26 additions & 0 deletions all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,32 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche
}
}
}
"verify the number of records" in {
Person.index.count().map { size =>
size should be(2)
}
}
"modify John" in {
Person.modify(id1) {
case Some(john) => IO(Some(john.copy(name = "Johnny Doe")))
case None => throw new RuntimeException("John not found!")
}.map { person =>
person.get.name should be("Johnny Doe")
}
}
"commit modified data" in {
Person.commit()
}
"verify the number of records has not changed after modify" in {
Person.index.count().map { size =>
size should be(2)
}
}
"verify John was modified" in {
Person(id1).map { person =>
person.name should be("Johnny Doe")
}
}
"delete John" in {
Person.delete(id1).map { deleted =>
deleted should not be empty
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ val fs2Version: String = "3.10.2"
val scribeVersion: String = "3.13.2"
val luceneVersion: String = "9.10.0"
val sqliteVersion: String = "3.45.3.0"
val keysemaphoreVersion: String = "0.3.0-M1"

val scalaTestVersion: String = "3.2.18"
val catsEffectTestingVersion: String = "1.5.0"
Expand All @@ -77,6 +78,7 @@ lazy val core = project.in(file("core"))
"org.typelevel" %% "fabric-io" % fabricVersion,
"co.fs2" %% "fs2-core" % fs2Version,
"com.outr" %% "scribe-slf4j" % scribeVersion,
"io.chrisdavenport" %% "keysemaphore" % keysemaphoreVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test
),
Expand Down
96 changes: 73 additions & 23 deletions core/src/main/scala/lightdb/Collection.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,49 @@
package lightdb

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits.{catsSyntaxApplicativeByName, toTraverseOps}
import fabric.Json
import fabric.rw.RW
import io.chrisdavenport.keysemaphore.KeySemaphore
import lightdb.index._

sealed trait DocLock[D <: Document[D]] extends Any

object DocLock {
case class Set[D <: Document[D]](id: Id[D]) extends DocLock[D]
class Empty[D <: Document[D]] extends DocLock[D]
}

abstract class Collection[D <: Document[D]](val collectionName: String,
protected[lightdb] val db: LightDB,
val autoCommit: Boolean = false) {
val autoCommit: Boolean = false,
val atomic: Boolean = true) {
type Field[F] = IndexedField[F, D]

implicit val rw: RW[D]

protected lazy val store: Store = db.createStoreInternal(collectionName)

// Id-level locking
private lazy val sem = KeySemaphore.of[IO, Id[D]](_ => 1L).unsafeRunSync()

private var _indexedLinks = List.empty[IndexedLinks[_, D]]

def idStream: fs2.Stream[IO, Id[D]] = store.keyStream

def stream: fs2.Stream[IO, D] = store.streamJson[D]
def stream: fs2.Stream[IO, D] = store.streamJsonDocs[D]

/**
* Called before set
* Called before preSetJson and before the data is set to the database
*/
protected def preSet(doc: D): IO[D] = IO.pure(doc)

/**
* Called after preSet and before the data is set to the database
*/
protected def preSetJson(json: Json): IO[Json] = IO.pure(json)

/**
* Called after set
*/
Expand All @@ -42,34 +61,65 @@ abstract class Collection[D <: Document[D]](val collectionName: String,
_ <- commit().whenA(autoCommit)
} yield ()

def set(doc: D): IO[D] = preSet(doc)
.flatMap(store.putJson(_)(rw))
.flatMap { doc =>
postSet(doc).map(_ => doc)
}
def modify(id: Id[D])(f: Option[D] => IO[Option[D]]): IO[Option[D]] = get(id).flatMap { option =>
f(option).flatMap {
case Some(doc) => set(doc).map(Some.apply)
case None => IO.pure(None)
}
def set(doc: D)(implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[D] = withLock(doc._id) { _ =>
for {
modified <- preSet(doc)
json <- preSetJson(rw.read(doc))
_ <- store.putJson(doc._id, json)
_ <- postSet(doc)
} yield modified
}
def delete(id: Id[D]): IO[Option[D]] = for {
modifiedId <- preDelete(id)
deleted <- get(modifiedId).flatMap {
case Some(d) => store.delete(id).map(_ => Some(d))
case None => IO.pure(None)

def withLock[Return](id: Id[D])(f: DocLock[D] => IO[Return])
(implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[Return] = {
if (atomic && existingLock.isInstanceOf[DocLock.Empty[_]]) {
val lock: DocLock[D] = existingLock match {
case DocLock.Set(currentId) =>
assert(currentId == id, s"Different Id used for lock! Existing: $currentId, New: $id")
existingLock
case _ => DocLock.Set[D](id)
}
val s = sem(id)
s
.acquire
.flatMap(_ => f(lock))
.guarantee(s.release)
} else {
f(existingLock)
}
_ <- deleted match {
case Some(doc) => postDelete(doc)
case None => IO.unit
}

def modify(id: Id[D])
(f: Option[D] => IO[Option[D]])
(implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[Option[D]] = withLock(id) { implicit lock =>
get(id).flatMap { option =>
f(option).flatMap {
case Some(doc) => set(doc)(lock).map(Some.apply)
case None => IO.pure(None)
}
}
} yield deleted
}
def delete(id: Id[D])
(implicit existingLock: DocLock[D] = new DocLock.Empty[D]): IO[Option[D]] = withLock(id) { implicit lock =>
for {
modifiedId <- preDelete(id)
deleted <- get(modifiedId).flatMap {
case Some(d) => store.delete(id).map(_ => Some(d))
case None => IO.pure(None)
}
_ <- deleted match {
case Some(doc) => postDelete(doc)
case None => IO.unit
}
} yield deleted
}

def truncate(): IO[Unit] = for {
_ <- store.truncate()
_ <- _indexedLinks.map(_.store.truncate()).sequence
} yield ()

def get(id: Id[D]): IO[Option[D]] = store.getJson(id)
def get(id: Id[D]): IO[Option[D]] = store.getJsonDoc(id)
def apply(id: Id[D]): IO[D] = get(id)
.map(_.getOrElse(throw new RuntimeException(s"$id not found in $collectionName")))

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/lightdb/IndexedLinks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class IndexedLinks[V, D <: Document[D]](name: String,
val id = Id[IndexedLink[D]](key)
IndexedLink(_id = id, links = List(doc._id))
}
_ <- store.putJson(updated)
_ <- store.putJsonDoc(updated)
} yield ()
}

Expand All @@ -61,7 +61,7 @@ case class IndexedLinks[V, D <: Document[D]](name: String,
case None => None
}
_ <- updated match {
case Some(l) => store.putJson(l)
case Some(l) => store.putJsonDoc(l)
case None => IO.unit
}
} yield ()
Expand All @@ -70,7 +70,7 @@ case class IndexedLinks[V, D <: Document[D]](name: String,
protected[lightdb] def link(value: V): IO[Option[IndexedLink[D]]] = {
val key = createKey(value)
val id = Id[IndexedLink[D]](key)
store.getJson(id)
store.getJsonDoc(id)
}

def queryIds(value: V): fs2.Stream[IO, Id[D]] = {
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/lightdb/RecordDocument.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,4 @@ package lightdb
trait RecordDocument[D <: RecordDocument[D]] extends Document[D] {
def created: Long
def modified: Long

def modify(): D
}
9 changes: 7 additions & 2 deletions core/src/main/scala/lightdb/RecordDocumentCollection.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package lightdb

import cats.effect.IO
import fabric.Json

abstract class RecordDocumentCollection[D <: RecordDocument[D]](collectionName: String, db: LightDB) extends Collection[D](collectionName, db) {
override protected def preSet(doc: D): IO[D] = super.preSet(doc.modify())
}
override protected def preSetJson(json: Json): IO[Json] = IO {
json.modify("modified") { _ =>
System.currentTimeMillis()
}
}
}
20 changes: 11 additions & 9 deletions core/src/main/scala/lightdb/Store.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lightdb

import cats.effect.IO
import fabric.Json
import fabric.io.{JsonFormatter, JsonParser}
import fabric.rw._

Expand All @@ -21,27 +22,28 @@ trait Store {

def dispose(): IO[Unit]

def streamJson[D: RW]: fs2.Stream[IO, D] = stream[D].map {
def streamJsonDocs[D: RW]: fs2.Stream[IO, D] = stream[D].map {
case (_, bytes) =>
val jsonString = bytes.string
val json = JsonParser(jsonString)
json.as[D]
}

def getJson[D: RW](id: Id[D]): IO[Option[D]] = get(id)
def getJsonDoc[D: RW](id: Id[D]): IO[Option[D]] = get(id)
.map(_.map { bytes =>
val jsonString = bytes.string
val json = JsonParser(jsonString)
json.as[D]
})

def putJson[D <: Document[D]](doc: D)
(implicit rw: RW[D]): IO[D] = IO {
val json = doc.json
JsonFormatter.Compact(json)
}.flatMap { jsonString =>
put(doc._id, jsonString.getBytes).map(_ => doc)
}
def putJsonDoc[D <: Document[D]](doc: D)
(implicit rw: RW[D]): IO[D] = IO(doc.json)
.flatMap(json => putJson(doc._id, json).map(_ => doc))

def putJson[D <: Document[D]](id: Id[D], json: Json): IO[Unit] = IO(JsonFormatter.Compact(json))
.flatMap { jsonString =>
put(id, jsonString.getBytes).map(_ => ())
}

def truncate(): IO[Unit] = keyStream[Any]
.evalMap { id =>
Expand Down
6 changes: 6 additions & 0 deletions sqlite/src/main/scala/lightdb/sqlite/SQLData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package lightdb.sqlite

import cats.effect.IO
import lightdb.{Document, Id}

case class SQLData[D <: Document[D]](ids: List[Id[D]], lookup: Option[Id[D] => IO[D]])
5 changes: 2 additions & 3 deletions sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import lightdb.util.FlushingBacklog
import java.nio.file.Path
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Types}

// TODO: Solve for uncommitted records being deleted leading to them being recreated at commit
trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
private lazy val path: Path = db.directory.resolve(collectionName).resolve("sqlite.db")
// TODO: Should each collection have a connection?
Expand Down Expand Up @@ -149,6 +150,4 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
override def dispose(): IO[Unit] = super.dispose().map { _ =>
connection.close()
}
}

case class SQLData[D <: Document[D]](ids: List[Id[D]], lookup: Option[Id[D] => IO[D]])
}

0 comments on commit 41075c9

Please sign in to comment.