Skip to content

Commit

Permalink
ValueStore completed and StoredValue improved
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jun 8, 2024
1 parent edbfdc1 commit 42e7065
Show file tree
Hide file tree
Showing 7 changed files with 289,957 additions and 40 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.bsp/
target/
testdb/
airports/
benchmark/imdb/
benchmark/data/
backup/
Expand Down
3,376 changes: 3,376 additions & 0 deletions all/src/test/resources/airports.csv

Large diffs are not rendered by default.

286,464 changes: 286,464 additions & 0 deletions all/src/test/resources/flights.csv

Large diffs are not rendered by default.

28 changes: 26 additions & 2 deletions all/src/test/scala/spec/AirportSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.effect.testing.scalatest.AsyncIOSpec
import fabric.rw.RW
import lightdb.halo.HaloDBSupport
import lightdb.lucene.LuceneSupport
import lightdb.{Document, FacetStore, Id, LightDB, StoredValue}
import lightdb.{Document, ValueStore, Id, LightDB, Persistence, StoredValue, Unique}
import lightdb.model.Collection
import lightdb.upgrade.DatabaseUpgrade
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -57,6 +57,12 @@ class AirportSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
count should be(3375)
}
}
"validate airport references" in {
Flight.airportReferences.facet(Airport.id("JFK")).map { facet =>
facet.count should be(4826)
facet.ids.size should be(4826)
}
}
// TODO: Support traversals
/*"get all airport names reachable directly from LAX following edges" in {
val lax = Airport.id("LAX")
Expand Down Expand Up @@ -125,7 +131,17 @@ class AirportSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {

val name: I[String] = index.one[String]("name", _.name)
val vip: I[Boolean] = index.one[Boolean]("vip", _.vip)
val vipKeys: FacetStore[String, Airport] = FacetStore[String, Airport]("vipKeys", doc => if (doc.vip) Some(doc._id.value) else None, this, cached = true)
val vipKeys: ValueStore[String, Airport] = ValueStore[String, Airport]("vipKeys", doc => if (doc.vip) List(doc._id.value) else Nil, this, persistence = Persistence.Cached)

override def id(value: String = Unique()): Id[Airport] = {
val index = value.indexOf('/')
val v = if (index != -1) {
value.substring(index + 1)
} else {
value
}
Id(v)
}
}

case class Flight(from: Id[Airport],
Expand All @@ -146,6 +162,14 @@ class AirportSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {

object Flight extends Collection[Flight]("flights", DB) {
override implicit val rw: RW[Flight] = RW.gen

val airportReferences: ValueStore[Id[Airport], Flight] = ValueStore[Id[Airport], Flight](
key = "airportReferences",
createV = f => List(f.from, f.to),
collection = this,
includeIds = true,
persistence = Persistence.Memory
)
}

object DataImportUpgrade extends DatabaseUpgrade {
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,24 @@ abstract class LightDB {
object stored {
def apply[T](key: String,
default: => T,
cache: Boolean = true,
persistence: Persistence = Persistence.Stored,
collection: Collection[KeyValue] = backingStore)
(implicit rw: RW[T]): StoredValue[T] = StoredValue[T](key, collection, () => default, cache = cache)
(implicit rw: RW[T]): StoredValue[T] = StoredValue[T](
key = key,
collection = collection,
default = () => default,
persistence = persistence
)

def opt[T](key: String,
cache: Boolean = true,
persistence: Persistence = Persistence.Stored,
collection: Collection[KeyValue] = backingStore)
(implicit rw: RW[T]): StoredValue[Option[T]] = StoredValue[Option[T]](key, collection, () => None, cache = cache)
(implicit rw: RW[T]): StoredValue[Option[T]] = StoredValue[Option[T]](
key = key,
collection = collection,
default = () => None,
persistence = persistence
)
}

private def doUpgrades(upgrades: List[DatabaseUpgrade],
Expand Down
70 changes: 55 additions & 15 deletions core/src/main/scala/lightdb/StoredValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,81 @@ import lightdb.model.Collection
case class StoredValue[T](key: String,
collection: Collection[KeyValue],
default: () => T,
cache: Boolean)(implicit rw: RW[T]) {
persistence: Persistence)(implicit rw: RW[T]) {
private lazy val id = Id[KeyValue](key)

private var cached: Option[T] = None

def get(): IO[T] = cached match {
case Some(t) => IO.pure(t)
case None if persistence == Persistence.Memory =>
val t = default()
cached = Some(t)
IO.pure(t)
case None => collection.get(id).map {
case Some(kv) => kv.value.as[T]
case None => default()
}.map { t =>
if (cache) cached = Some(t)
if (persistence != Persistence.Stored) {
cached = Some(t)
}
t
}
}

def exists(): IO[Boolean] = collection.get(id).map(_.nonEmpty)

def set(value: T): IO[T] = collection
.set(KeyValue(id, value.asJson))
.map { _ =>
if (cache) cached = Some(value)
value
}
def set(value: T): IO[T] = if (persistence == Persistence.Memory) {
cached = Some(value)
IO.pure(value)
} else {
collection
.set(KeyValue(id, value.asJson))
.map { _ =>
if (persistence != Persistence.Stored) {
cached = Some(value)
}
value
}
}

// TODO: Figure out why withLock is causing it to get stuck
def modify(f: T => IO[T]): IO[T] = //collection.withLock(id) { implicit lock =>
for {
current <- get()
modified <- f(current)
_ <- set(modified).whenA(current != modified)
} yield modified
def modify(f: T => IO[T]): IO[T] = { //collection.withLock(id) { implicit lock =>
if (persistence == Persistence.Memory) {
get().flatMap(f).map { value =>
cached = Some(value)
value
}
} else {
for {
current <- get()
modified <- f(current)
_ <- set(modified).whenA(current != modified)
} yield modified
}
}
//}

def clear(): IO[Unit] = collection.delete(id).map { _ =>
if (cache) cached = None
cached = None
}
}

sealed trait Persistence

object Persistence {
/**
* Stored on disk only
*/
case object Stored extends Persistence

/**
* Stored on disk and cached in memory
*/
case object Cached extends Persistence

/**
* Stored in memory only
*/
case object Memory extends Persistence
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
package lightdb

import cats.effect.IO
import cats.implicits.toTraverseOps
import fabric.rw.RW
import lightdb.model.{AbstractCollection, DocumentAction}

case class FacetStore[V, D <: Document[D]](key: String,
createV: D => Option[V],
case class ValueStore[V, D <: Document[D]](key: String,
createV: D => List[V],
collection: AbstractCollection[D],
cached: Boolean = false,
includeIds: Boolean = false)
includeIds: Boolean = false,
persistence: Persistence = Persistence.Stored)
(implicit rw: RW[V]) {
private implicit val facetRW: RW[Facet] = RW.gen

private lazy val stored = collection.db.stored[Map[V, Facet]](
key = s"${collection.collectionName}.valueStore.$key",
default = Map.empty,
cache = cached
persistence = persistence
)

def facets: IO[Map[V, Facet]] = stored.get()

def facet(v: V): IO[Facet] = facets.map(_.getOrElse(v, Facet()))

def values: IO[Set[V]] = facets.map(_.keySet)

collection.postSet.add((_: DocumentAction, doc: D, _: AbstractCollection[D]) => {
(createV(doc) match {
case Some(v) =>
scribe.info(s"Modifying $v")
stored.modify { map => IO {
createV(doc).map { v =>
stored.modify { map =>
IO {
var facet = map.getOrElse(v, Facet())
var ids = facet.ids
if (includeIds) {
Expand All @@ -35,19 +37,19 @@ case class FacetStore[V, D <: Document[D]](key: String,
ids = Set.empty
}
facet = facet.copy(facet.count + 1, ids)
scribe.info(s"Finished modifying $v")
map + (v -> facet)
}}
case None => IO.unit
}).map(_ => Some(doc))
}
}
}.sequence.map(_ => Some(doc))
})
collection.postDelete.add((_: DocumentAction, doc: D, _: AbstractCollection[D]) => {
createV(doc) match {
case Some(v) => stored.modify { map => IO {
map - v
}}.map(_ => Some(doc))
case None => IO.pure(Some(doc))
}
createV(doc).map { v =>
stored.modify { map =>
IO {
map - v
}
}
}.sequence.map(_ => Some(doc))
})
collection.truncateActions += stored.clear()

Expand Down

0 comments on commit 42e7065

Please sign in to comment.