Skip to content

Commit

Permalink
Materialized work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Sep 2, 2024
1 parent 30d3fb0 commit 61f876a
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 25 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ benchmark/data/
/backup/
backup.zip
db/
backups/
backups/
metals.sbt
derby.log
*report-*.json
2 changes: 2 additions & 0 deletions core/src/main/scala/lightdb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
}
}

def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = search.docs.iterator

def toList(implicit transaction: Transaction[Doc]): List[Doc] = search.docs.list

def first(implicit transaction: Transaction[Doc]): Option[Doc] = search.docs.iterator.nextOption()
Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/lightdb/collection/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S
case _ => // Can't do validation
}

// Give the Model a chance to initialize
model.init(this)

// Verify the data is in-sync
verify()
}
Expand Down Expand Up @@ -163,21 +166,23 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S
}

def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Doc = {
store.insert(doc)
trigger.insert(doc)
store.insert(doc)
doc
}

def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Doc = {
store.upsert(doc)
trigger.upsert(doc)
store.upsert(doc)
doc
}

def insert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Seq[Doc] = docs.map(insert)

def upsert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Seq[Doc] = docs.map(upsert)

def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = store.exists(id)

def get[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Option[Doc] = {
val (field, value) = f(model)
store.get(field, value)
Expand Down Expand Up @@ -226,17 +231,13 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S

def delete[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Boolean = {
val (field, value) = f(model)
try {
store.delete(field, value)
} finally {
trigger.delete(field, value)
}
trigger.delete(field, value)
store.delete(field, value)
}

def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Boolean = try {
store.delete(ev(model)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]], id)
} finally {
def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Boolean = {
trigger.delete(ev(model)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]], id)
store.delete(ev(model)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]], id)
}

def count(implicit transaction: Transaction[Doc]): Int = store.count
Expand All @@ -245,10 +246,9 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S

lazy val query: Query[Doc, Model] = Query(this)

def truncate()(implicit transaction: Transaction[Doc]): Int = try {
store.truncate()
} finally {
def truncate()(implicit transaction: Transaction[Doc]): Int = {
trigger.truncate()
store.truncate()
}

def dispose(): Unit = try {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/lightdb/doc/DocumentModel.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lightdb.doc

import fabric.rw._
import lightdb.collection.Collection
import lightdb.filter.FilterBuilder
import lightdb.{Field, Id, Indexed, Tokenized, Unique, UniqueIndex}

Expand All @@ -15,6 +16,8 @@ trait DocumentModel[Doc <: Document[Doc]] {

def id(value: String = Unique()): Id[Doc] = Id(value)

def init[Model <: DocumentModel[Doc]](collection: Collection[Doc, Model]): Unit = {}

type F[V] = Field[Doc, V]
type I[V] = Indexed[Doc, V]
type U[V] = UniqueIndex[Doc, V]
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/lightdb/doc/MaterializedModel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package lightdb.doc

import lightdb.collection.Collection
import lightdb.transaction.Transaction
import lightdb.trigger.BasicCollectionTrigger

trait MaterializedModel[Doc <: Document[Doc], MaterialDoc <: Document[MaterialDoc], MaterialModel <: DocumentModel[MaterialDoc]] extends DocumentModel[Doc] { mm =>
def materialCollection: Collection[MaterialDoc, MaterialModel]

protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit
protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit
protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit

override def init[Model <: DocumentModel[Doc]](collection: Collection[Doc, Model]): Unit = {
super.init(collection)

materialCollection.trigger += new BasicCollectionTrigger[MaterialDoc, MaterialModel] {
override def collection: Collection[MaterialDoc, MaterialModel] = materialCollection

override protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit = mm.adding(doc)
override protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit = mm.modifying(oldDoc, newDoc)
override protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit = mm.removing(doc)
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/lightdb/store/MapStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class MapStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMode:
map += id(doc) -> doc
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = map.contains(id)

override def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] = {
if (field == idField) {
map.get(value.asInstanceOf[Id[Doc]])
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/lightdb/store/Store.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]] {

def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit

def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean

def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc]

def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Boolean
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/lightdb/store/split/SplitStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import lightdb.doc.{Document, DocumentModel}
import lightdb.materialized.MaterializedAggregate
import lightdb.store.{Conversion, Store, StoreMode}
import lightdb.transaction.{Transaction, TransactionKey}
import lightdb.{Field, Query, SearchResults, UniqueIndex}
import lightdb.{Field, Id, Query, SearchResults, UniqueIndex}

import scala.language.implicitConversions

Expand Down Expand Up @@ -34,6 +34,8 @@ case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](storage
searching.upsert(doc)
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = storage.exists(id)

override def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] = {
storage.get(field, value)
}
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package lightdb.trigger

import lightdb.UniqueIndex
import lightdb.collection.Collection
import lightdb.doc.{Document, DocumentModel}
import lightdb.transaction.Transaction

trait BasicCollectionTrigger[Doc <: Document[Doc], Model <: DocumentModel[Doc]] extends CollectionTrigger[Doc] {
def collection: Collection[Doc, Model]

protected def adding(doc: Doc)(implicit transaction: Transaction[Doc]): Unit

protected def modifying(oldDoc: Doc, newDoc: Doc)(implicit transaction: Transaction[Doc]): Unit

protected def removing(doc: Doc)(implicit transaction: Transaction[Doc]): Unit

override final def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = adding(doc)

override final def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = {
collection.get(doc._id) match {
case Some(current) => modifying(current, doc)
case None => adding(doc)
}
}

override final def delete[V](index: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Unit = {
collection.query.filter(_ => index === value).iterator.foreach(removing)
}

override final def truncate(): Unit = collection.transaction { implicit transaction =>
collection.iterator.foreach(removing)
}
}
57 changes: 47 additions & 10 deletions core/src/test/scala/spec/AbstractBasicSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package spec
import fabric.rw._
import lightdb.backup.{DatabaseBackup, DatabaseRestore}
import lightdb.collection.Collection
import lightdb.doc.{Document, DocumentModel, JsonConversion}
import lightdb.doc.{Document, DocumentModel, JsonConversion, MaterializedModel}
import lightdb.feature.DBFeatureKey
import lightdb.filter._
import lightdb.store.StoreManager
import lightdb.transaction.Transaction
import lightdb.upgrade.DatabaseUpgrade
import lightdb.{Field, Id, LightDB, Sort, StoredValue}
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -102,6 +103,9 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
ages should be(Set(101, 42, 89, 102, 53, 13, 2, 22, 12, 81, 35, 63, 99, 23, 30, 4, 21, 33, 11, 72, 15, 62))
}
}
/*"verify the AgeLinks is properly updated" in {
db.ageLinks.t.get(AgeLinks.id(30)).map(_.people) should be(Some(List(Id("yuri"), Id("wyatt"), Id("tori"))))
}*/
"query with aggregate functions" in {
if (aggregationSupported) {
db.people.transaction { implicit transaction =>
Expand Down Expand Up @@ -285,10 +289,10 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
}
"insert a lot more names" in {
db.people.transaction { implicit transaction =>
val p = (1 to 100_000).toList.map { index =>
val p = (1 to 1_000).toList.map { index =>
Person(
name = s"Unique Snowflake $index",
age = if (index > 10_000) 0 else index,
age = if (index > 100) 0 else index,
city = Some(City("Robotland")),
nicknames = Set("robot", s"sf$index")
)
Expand All @@ -298,7 +302,7 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
}
"verify the correct number of people exist in the database" in {
db.people.transaction { implicit transaction =>
db.people.count should be(100_024)
db.people.count should be(1_024)
}
}
"verify the correct count in query total" in {
Expand All @@ -311,8 +315,8 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
.search
.docs
results.list.length should be(100)
results.total should be(Some(100_000))
results.remaining should be(Some(100_000))
results.total should be(Some(1_000))
results.remaining should be(Some(1_000))
}
}
"verify the correct count in query total with offset" in {
Expand All @@ -325,13 +329,13 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
.search
.docs
results.list.length should be(100)
results.total should be(Some(100_000))
results.remaining should be(Some(99_900))
results.total should be(Some(1_000))
results.remaining should be(Some(900))
}
}
"truncate the collection" in {
db.people.transaction { implicit transaction =>
db.people.truncate() should be(100_024)
db.people.truncate() should be(1_024)
}
}
"verify the collection is empty" in {
Expand Down Expand Up @@ -370,6 +374,8 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
val startTime: StoredValue[Long] = stored[Long]("startTime", -1L)

val people: Collection[Person, Person.type] = collection(Person)
// TODO: Revisit this - performance is currently awful and transaction state causes overlapping dirty data
// val ageLinks: Collection[AgeLinks, AgeLinks.type] = collection(AgeLinks)

override def storeManager: StoreManager = spec.storeManager

Expand All @@ -383,7 +389,7 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
_id: Id[Person] = Person.id()) extends Document[Person]

object Person extends DocumentModel[Person] with JsonConversion[Person] {
implicit val rw: RW[Person] = RW.gen
override implicit val rw: RW[Person] = RW.gen

val name: I[String] = field.index("name", _.name)
val age: I[Int] = field.index("age", _.age)
Expand All @@ -398,6 +404,37 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec =>
implicit val rw: RW[City] = RW.gen
}

/*case class AgeLinks(age: Int, people: List[Id[Person]]) extends Document[AgeLinks] {
lazy val _id: Id[AgeLinks] = AgeLinks.id(age)
}
object AgeLinks extends MaterializedModel[AgeLinks, Person, Person.type] with JsonConversion[AgeLinks] {
override implicit val rw: RW[AgeLinks] = RW.gen
val age: F[Int] = field("age", _.age)
val people: F[List[Id[Person]]] = field("people", _.people)
override def materialCollection: Collection[Person, Person.type] = db.people
def id(age: Int): Id[AgeLinks] = Id(age.toString)
override protected def adding(doc: Person)(implicit transaction: Transaction[Person]): Unit = db.ageLinks.t.modify(AgeLinks.id(doc.age)) {
case Some(links) => Some(links.copy(people = (doc._id :: links.people).distinct))
case None => Some(AgeLinks(doc.age, List(doc._id)))
}
override protected def modifying(oldDoc: Person, newDoc: Person)(implicit transaction: Transaction[Person]): Unit = adding(newDoc)
override protected def removing(doc: Person)(implicit transaction: Transaction[Person]): Unit = db.ageLinks.t.modify(AgeLinks.id(doc.age)) {
case Some(links) =>
val l = links.copy(people = links.people.filterNot(_ == doc._id))
if (l.people.isEmpty) {
None
} else {
Some(l)
}
case None => None
}
}*/

object InitialSetupUpgrade extends DatabaseUpgrade {
override def applyToNew: Boolean = true

Expand Down
2 changes: 2 additions & 0 deletions halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
instance.put(id(doc).bytes, JsonFormatter.Compact(json).getBytes("UTF-8"))
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty

override def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] = {
if (field == idField) {
Option(instance.get(value.asInstanceOf[Id[Doc]].bytes)).map(bytes2Doc)
Expand Down
2 changes: 2 additions & 0 deletions lucene/src/main/scala/lightdb/lucene/LuceneStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
}
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty

override def get[V](field: UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Option[Doc] = {
val filter = Filter.Equals(field, value)
Expand Down
2 changes: 2 additions & 0 deletions mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: O

override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = map.put(doc._id.value, toString(doc))

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = map.containsKey(id.value)

override def get[V](field: UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Option[Doc] = {
if (field == idField) {
Expand Down
3 changes: 3 additions & 0 deletions redis/src/main/scala/lightdb/redis/RedisStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMod
override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit =
getInstance.hset(collection.name, doc._id.value, toString(doc))

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean =
getInstance.hexists(collection.name, id.value)

override def get[V](field: UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Option[Doc] = {
if (field == idField) {
Expand Down
2 changes: 2 additions & 0 deletions rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
db.put(doc._id.bytes, JsonFormatter.Compact(json).getBytes("UTF-8"))
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = db.keyExists(id.bytes)

override def get[V](field: UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Option[Doc] = {
if (field == idField) {
Expand Down
2 changes: 2 additions & 0 deletions sql/src/main/scala/lightdb/sql/SQLStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten
}
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty

override def get[V](field: UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Option[Doc] = {
val state = getState
Expand Down

0 comments on commit 61f876a

Please sign in to comment.