From 61f876a345d8005e66fbd0c08628285494e47d09 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Mon, 2 Sep 2024 10:16:30 -0500 Subject: [PATCH] Materialized work in progress --- .gitignore | 5 +- core/src/main/scala/lightdb/Query.scala | 2 + .../scala/lightdb/collection/Collection.scala | 26 ++++----- .../scala/lightdb/doc/DocumentModel.scala | 3 + .../scala/lightdb/doc/MaterializedModel.scala | 25 ++++++++ .../main/scala/lightdb/store/MapStore.scala | 2 + core/src/main/scala/lightdb/store/Store.scala | 2 + .../lightdb/store/split/SplitStore.scala | 4 +- .../trigger/BasicCollectionTrigger.scala | 33 +++++++++++ .../test/scala/spec/AbstractBasicSpec.scala | 57 +++++++++++++++---- .../scala/lightdb/halodb/HaloDBStore.scala | 2 + .../scala/lightdb/lucene/LuceneStore.scala | 2 + .../main/scala/lightdb/mapdb/MapDBStore.scala | 2 + .../main/scala/lightdb/redis/RedisStore.scala | 3 + .../scala/lightdb/rocksdb/RocksDBStore.scala | 2 + sql/src/main/scala/lightdb/sql/SQLStore.scala | 2 + 16 files changed, 147 insertions(+), 25 deletions(-) create mode 100644 core/src/main/scala/lightdb/doc/MaterializedModel.scala create mode 100644 core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala diff --git a/.gitignore b/.gitignore index c09f2e58..7641d269 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,7 @@ benchmark/data/ /backup/ backup.zip db/ -backups/ \ No newline at end of file +backups/ +metals.sbt +derby.log +*report-*.json \ No newline at end of file diff --git a/core/src/main/scala/lightdb/Query.scala b/core/src/main/scala/lightdb/Query.scala index 0478b016..d46cb5e6 100644 --- a/core/src/main/scala/lightdb/Query.scala +++ b/core/src/main/scala/lightdb/Query.scala @@ -110,6 +110,8 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: } } + def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = search.docs.iterator + def toList(implicit transaction: Transaction[Doc]): List[Doc] = search.docs.list def first(implicit transaction: Transaction[Doc]): Option[Doc] = search.docs.iterator.nextOption() diff --git a/core/src/main/scala/lightdb/collection/Collection.scala b/core/src/main/scala/lightdb/collection/Collection.scala index 9c90ed7b..df1376ca 100644 --- a/core/src/main/scala/lightdb/collection/Collection.scala +++ b/core/src/main/scala/lightdb/collection/Collection.scala @@ -41,6 +41,9 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S case _ => // Can't do validation } + // Give the Model a chance to initialize + model.init(this) + // Verify the data is in-sync verify() } @@ -163,14 +166,14 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S } def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Doc = { - store.insert(doc) trigger.insert(doc) + store.insert(doc) doc } def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Doc = { - store.upsert(doc) trigger.upsert(doc) + store.upsert(doc) doc } @@ -178,6 +181,8 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def upsert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Seq[Doc] = docs.map(upsert) + def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = store.exists(id) + def get[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Option[Doc] = { val (field, value) = f(model) store.get(field, value) @@ -226,17 +231,13 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def delete[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Boolean = { val (field, value) = f(model) - try { - store.delete(field, value) - } finally { - trigger.delete(field, value) - } + trigger.delete(field, value) + store.delete(field, value) } - def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Boolean = try { - store.delete(ev(model)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]], id) - } finally { + def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Boolean = { trigger.delete(ev(model)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]], id) + store.delete(ev(model)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]], id) } def count(implicit transaction: Transaction[Doc]): Int = store.count @@ -245,10 +246,9 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S lazy val query: Query[Doc, Model] = Query(this) - def truncate()(implicit transaction: Transaction[Doc]): Int = try { - store.truncate() - } finally { + def truncate()(implicit transaction: Transaction[Doc]): Int = { trigger.truncate() + store.truncate() } def dispose(): Unit = try { diff --git a/core/src/main/scala/lightdb/doc/DocumentModel.scala b/core/src/main/scala/lightdb/doc/DocumentModel.scala index 3f7e1141..9fd332fa 100644 --- a/core/src/main/scala/lightdb/doc/DocumentModel.scala +++ b/core/src/main/scala/lightdb/doc/DocumentModel.scala @@ -1,6 +1,7 @@ package lightdb.doc import fabric.rw._ +import lightdb.collection.Collection import lightdb.filter.FilterBuilder import lightdb.{Field, Id, Indexed, Tokenized, Unique, UniqueIndex} @@ -15,6 +16,8 @@ trait DocumentModel[Doc <: Document[Doc]] { def id(value: String = Unique()): Id[Doc] = Id(value) + def init[Model <: DocumentModel[Doc]](collection: Collection[Doc, Model]): Unit = {} + type F[V] = Field[Doc, V] type I[V] = Indexed[Doc, V] type U[V] = UniqueIndex[Doc, V] diff --git a/core/src/main/scala/lightdb/doc/MaterializedModel.scala b/core/src/main/scala/lightdb/doc/MaterializedModel.scala new file mode 100644 index 00000000..d54c6329 --- /dev/null +++ b/core/src/main/scala/lightdb/doc/MaterializedModel.scala @@ -0,0 +1,25 @@ +package lightdb.doc + +import lightdb.collection.Collection +import lightdb.transaction.Transaction +import lightdb.trigger.BasicCollectionTrigger + +trait MaterializedModel[Doc <: Document[Doc], MaterialDoc <: Document[MaterialDoc], MaterialModel <: DocumentModel[MaterialDoc]] extends DocumentModel[Doc] { mm => + def materialCollection: Collection[MaterialDoc, MaterialModel] + + protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit + protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit + protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit + + override def init[Model <: DocumentModel[Doc]](collection: Collection[Doc, Model]): Unit = { + super.init(collection) + + materialCollection.trigger += new BasicCollectionTrigger[MaterialDoc, MaterialModel] { + override def collection: Collection[MaterialDoc, MaterialModel] = materialCollection + + override protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit = mm.adding(doc) + override protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit = mm.modifying(oldDoc, newDoc) + override protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit = mm.removing(doc) + } + } +} diff --git a/core/src/main/scala/lightdb/store/MapStore.scala b/core/src/main/scala/lightdb/store/MapStore.scala index ecbbc903..2fffc3ab 100644 --- a/core/src/main/scala/lightdb/store/MapStore.scala +++ b/core/src/main/scala/lightdb/store/MapStore.scala @@ -24,6 +24,8 @@ class MapStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMode: map += id(doc) -> doc } + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = map.contains(id) + override def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] = { if (field == idField) { map.get(value.asInstanceOf[Id[Doc]]) diff --git a/core/src/main/scala/lightdb/store/Store.scala b/core/src/main/scala/lightdb/store/Store.scala index 7bb1b689..3bd57a49 100644 --- a/core/src/main/scala/lightdb/store/Store.scala +++ b/core/src/main/scala/lightdb/store/Store.scala @@ -47,6 +47,8 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]] { def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit + def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean + def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Boolean diff --git a/core/src/main/scala/lightdb/store/split/SplitStore.scala b/core/src/main/scala/lightdb/store/split/SplitStore.scala index a97ee388..1f583aa1 100644 --- a/core/src/main/scala/lightdb/store/split/SplitStore.scala +++ b/core/src/main/scala/lightdb/store/split/SplitStore.scala @@ -6,7 +6,7 @@ import lightdb.doc.{Document, DocumentModel} import lightdb.materialized.MaterializedAggregate import lightdb.store.{Conversion, Store, StoreMode} import lightdb.transaction.{Transaction, TransactionKey} -import lightdb.{Field, Query, SearchResults, UniqueIndex} +import lightdb.{Field, Id, Query, SearchResults, UniqueIndex} import scala.language.implicitConversions @@ -34,6 +34,8 @@ case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](storage searching.upsert(doc) } + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = storage.exists(id) + override def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] = { storage.get(field, value) } diff --git a/core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala b/core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala new file mode 100644 index 00000000..855e5cab --- /dev/null +++ b/core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala @@ -0,0 +1,33 @@ +package lightdb.trigger + +import lightdb.UniqueIndex +import lightdb.collection.Collection +import lightdb.doc.{Document, DocumentModel} +import lightdb.transaction.Transaction + +trait BasicCollectionTrigger[Doc <: Document[Doc], Model <: DocumentModel[Doc]] extends CollectionTrigger[Doc] { + def collection: Collection[Doc, Model] + + protected def adding(doc: Doc)(implicit transaction: Transaction[Doc]): Unit + + protected def modifying(oldDoc: Doc, newDoc: Doc)(implicit transaction: Transaction[Doc]): Unit + + protected def removing(doc: Doc)(implicit transaction: Transaction[Doc]): Unit + + override final def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = adding(doc) + + override final def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = { + collection.get(doc._id) match { + case Some(current) => modifying(current, doc) + case None => adding(doc) + } + } + + override final def delete[V](index: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Unit = { + collection.query.filter(_ => index === value).iterator.foreach(removing) + } + + override final def truncate(): Unit = collection.transaction { implicit transaction => + collection.iterator.foreach(removing) + } +} diff --git a/core/src/test/scala/spec/AbstractBasicSpec.scala b/core/src/test/scala/spec/AbstractBasicSpec.scala index dd48dddc..4782158b 100644 --- a/core/src/test/scala/spec/AbstractBasicSpec.scala +++ b/core/src/test/scala/spec/AbstractBasicSpec.scala @@ -3,10 +3,11 @@ package spec import fabric.rw._ import lightdb.backup.{DatabaseBackup, DatabaseRestore} import lightdb.collection.Collection -import lightdb.doc.{Document, DocumentModel, JsonConversion} +import lightdb.doc.{Document, DocumentModel, JsonConversion, MaterializedModel} import lightdb.feature.DBFeatureKey import lightdb.filter._ import lightdb.store.StoreManager +import lightdb.transaction.Transaction import lightdb.upgrade.DatabaseUpgrade import lightdb.{Field, Id, LightDB, Sort, StoredValue} import org.scalatest.matchers.should.Matchers @@ -102,6 +103,9 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => ages should be(Set(101, 42, 89, 102, 53, 13, 2, 22, 12, 81, 35, 63, 99, 23, 30, 4, 21, 33, 11, 72, 15, 62)) } } + /*"verify the AgeLinks is properly updated" in { + db.ageLinks.t.get(AgeLinks.id(30)).map(_.people) should be(Some(List(Id("yuri"), Id("wyatt"), Id("tori")))) + }*/ "query with aggregate functions" in { if (aggregationSupported) { db.people.transaction { implicit transaction => @@ -285,10 +289,10 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => } "insert a lot more names" in { db.people.transaction { implicit transaction => - val p = (1 to 100_000).toList.map { index => + val p = (1 to 1_000).toList.map { index => Person( name = s"Unique Snowflake $index", - age = if (index > 10_000) 0 else index, + age = if (index > 100) 0 else index, city = Some(City("Robotland")), nicknames = Set("robot", s"sf$index") ) @@ -298,7 +302,7 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => } "verify the correct number of people exist in the database" in { db.people.transaction { implicit transaction => - db.people.count should be(100_024) + db.people.count should be(1_024) } } "verify the correct count in query total" in { @@ -311,8 +315,8 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => .search .docs results.list.length should be(100) - results.total should be(Some(100_000)) - results.remaining should be(Some(100_000)) + results.total should be(Some(1_000)) + results.remaining should be(Some(1_000)) } } "verify the correct count in query total with offset" in { @@ -325,13 +329,13 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => .search .docs results.list.length should be(100) - results.total should be(Some(100_000)) - results.remaining should be(Some(99_900)) + results.total should be(Some(1_000)) + results.remaining should be(Some(900)) } } "truncate the collection" in { db.people.transaction { implicit transaction => - db.people.truncate() should be(100_024) + db.people.truncate() should be(1_024) } } "verify the collection is empty" in { @@ -370,6 +374,8 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => val startTime: StoredValue[Long] = stored[Long]("startTime", -1L) val people: Collection[Person, Person.type] = collection(Person) + // TODO: Revisit this - performance is currently awful and transaction state causes overlapping dirty data +// val ageLinks: Collection[AgeLinks, AgeLinks.type] = collection(AgeLinks) override def storeManager: StoreManager = spec.storeManager @@ -383,7 +389,7 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => _id: Id[Person] = Person.id()) extends Document[Person] object Person extends DocumentModel[Person] with JsonConversion[Person] { - implicit val rw: RW[Person] = RW.gen + override implicit val rw: RW[Person] = RW.gen val name: I[String] = field.index("name", _.name) val age: I[Int] = field.index("age", _.age) @@ -398,6 +404,37 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => implicit val rw: RW[City] = RW.gen } + /*case class AgeLinks(age: Int, people: List[Id[Person]]) extends Document[AgeLinks] { + lazy val _id: Id[AgeLinks] = AgeLinks.id(age) + } + + object AgeLinks extends MaterializedModel[AgeLinks, Person, Person.type] with JsonConversion[AgeLinks] { + override implicit val rw: RW[AgeLinks] = RW.gen + + val age: F[Int] = field("age", _.age) + val people: F[List[Id[Person]]] = field("people", _.people) + + override def materialCollection: Collection[Person, Person.type] = db.people + + def id(age: Int): Id[AgeLinks] = Id(age.toString) + + override protected def adding(doc: Person)(implicit transaction: Transaction[Person]): Unit = db.ageLinks.t.modify(AgeLinks.id(doc.age)) { + case Some(links) => Some(links.copy(people = (doc._id :: links.people).distinct)) + case None => Some(AgeLinks(doc.age, List(doc._id))) + } + override protected def modifying(oldDoc: Person, newDoc: Person)(implicit transaction: Transaction[Person]): Unit = adding(newDoc) + override protected def removing(doc: Person)(implicit transaction: Transaction[Person]): Unit = db.ageLinks.t.modify(AgeLinks.id(doc.age)) { + case Some(links) => + val l = links.copy(people = links.people.filterNot(_ == doc._id)) + if (l.people.isEmpty) { + None + } else { + Some(l) + } + case None => None + } + }*/ + object InitialSetupUpgrade extends DatabaseUpgrade { override def applyToNew: Boolean = true diff --git a/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala b/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala index a4eedfbe..685536f2 100644 --- a/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala +++ b/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala @@ -48,6 +48,8 @@ class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: instance.put(id(doc).bytes, JsonFormatter.Compact(json).getBytes("UTF-8")) } + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty + override def get[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Option[Doc] = { if (field == idField) { Option(instance.get(value.asInstanceOf[Id[Doc]].bytes)).map(bytes2Doc) diff --git a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala index 865b38f1..5bb4d645 100644 --- a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala +++ b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala @@ -114,6 +114,8 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: } } + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty + override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Option[Doc] = { val filter = Filter.Equals(field, value) diff --git a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala index d586586b..142788ef 100644 --- a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala +++ b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala @@ -36,6 +36,8 @@ class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: O override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = map.put(doc._id.value, toString(doc)) + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = map.containsKey(id.value) + override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Option[Doc] = { if (field == idField) { diff --git a/redis/src/main/scala/lightdb/redis/RedisStore.scala b/redis/src/main/scala/lightdb/redis/RedisStore.scala index 666c83fb..7123d499 100644 --- a/redis/src/main/scala/lightdb/redis/RedisStore.scala +++ b/redis/src/main/scala/lightdb/redis/RedisStore.scala @@ -40,6 +40,9 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMod override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = getInstance.hset(collection.name, doc._id.value, toString(doc)) + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = + getInstance.hexists(collection.name, id.value) + override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Option[Doc] = { if (field == idField) { diff --git a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala index e3570ca4..fad41fbc 100644 --- a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala +++ b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala @@ -35,6 +35,8 @@ class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: db.put(doc._id.bytes, JsonFormatter.Compact(json).getBytes("UTF-8")) } + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = db.keyExists(id.bytes) + override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Option[Doc] = { if (field == idField) { diff --git a/sql/src/main/scala/lightdb/sql/SQLStore.scala b/sql/src/main/scala/lightdb/sql/SQLStore.scala index b75fed1d..0835b885 100644 --- a/sql/src/main/scala/lightdb/sql/SQLStore.scala +++ b/sql/src/main/scala/lightdb/sql/SQLStore.scala @@ -164,6 +164,8 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } } + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty + override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Option[Doc] = { val state = getState