Skip to content

Commit

Permalink
Major upgrade of internals to avoid the strange two-way dependency be…
Browse files Browse the repository at this point in the history
…tween collection and store.
  • Loading branch information
darkfrog26 committed Dec 6, 2024
1 parent fe29aa2 commit 7695076
Show file tree
Hide file tree
Showing 32 changed files with 390 additions and 359 deletions.
2 changes: 1 addition & 1 deletion all/src/test/scala/spec/AirportSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class AirportSpec extends AnyWordSpec with Matchers {
}

object DB extends LightDB {
override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore, searchingMode = StoreMode.Indexes)
override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore)

lazy val directory: Option[Path] = Some(Path.of("db/AirportSpec"))

Expand Down
4 changes: 2 additions & 2 deletions all/src/test/scala/spec/HaloDBAndLuceneSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import lightdb.store.split.SplitStoreManager
class HaloDBAndLuceneSpec extends AbstractBasicSpec {
override protected def filterBuilderSupported: Boolean = true

override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore, searchingMode = StoreMode.Indexes)
}
override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore)
}
2 changes: 1 addition & 1 deletion all/src/test/scala/spec/RocksDBAndLuceneSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import lightdb.store.split.SplitStoreManager
class RocksDBAndLuceneSpec extends AbstractBasicSpec {
override protected def filterBuilderSupported: Boolean = true

override def storeManager: StoreManager = SplitStoreManager(RocksDBStore, LuceneStore, searchingMode = StoreMode.Indexes)
override def storeManager: StoreManager = SplitStoreManager(RocksDBStore, LuceneStore)
}
10 changes: 5 additions & 5 deletions async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]
filter: Option[AggregateFilter[Doc]] = None,
sort: List[(AggregateFunction[_, _, Doc], SortDirection)] = Nil) {
def filter(f: Model => AggregateFilter[Doc], and: Boolean = false): AsyncAggregateQuery[Doc, Model] = {
val filter = f(query.collection.model)
val filter = f(query.model)
if (and && this.filter.nonEmpty) {
copy(filter = Some(this.filter.get && filter))
} else {
Expand All @@ -21,7 +21,7 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]
}

def filters(f: Model => List[AggregateFilter[Doc]]): AsyncAggregateQuery[Doc, Model] = {
val filters = f(query.collection.model)
val filters = f(query.model)
if (filters.nonEmpty) {
var filter = filters.head
filters.tail.foreach { f =>
Expand All @@ -35,7 +35,7 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]

def sort(f: Model => AggregateFunction[_, _, Doc],
direction: SortDirection = SortDirection.Ascending): AsyncAggregateQuery[Doc, Model] = copy(
sort = sort ::: List((f(query.collection.model), direction))
sort = sort ::: List((f(query.model), direction))
)

private lazy val aggregateQuery = AggregateQuery(
Expand All @@ -46,10 +46,10 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]
)

def count(implicit transaction: Transaction[Doc]): IO[Int] =
IO.blocking(query.collection.store.aggregateCount(aggregateQuery))
IO.blocking(query.store.aggregateCount(aggregateQuery))

def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedAggregate[Doc, Model]] = {
val iterator = query.collection.store.aggregate(aggregateQuery)
val iterator = query.store.aggregate(aggregateQuery)
fs2.Stream.fromBlockingIterator[IO](iterator, 100)
}

Expand Down
7 changes: 2 additions & 5 deletions async/src/main/scala/lightdb/async/AsyncLightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ trait AsyncLightDB extends FeatureSupport[DBFeatureKey] { db =>

def collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model,
name: Option[String] = None,
store: Option[Store[Doc, Model]] = None,
storeManager: Option[StoreManager] = None,
maxInsertBatch: Int = 1_000_000,
cacheQueries: Boolean = Collection.DefaultCacheQueries): AsyncCollection[Doc, Model] =
AsyncCollection(underlying.collection[Doc, Model](model, name, store, storeManager, maxInsertBatch, cacheQueries))
storeManager: Option[StoreManager] = None): AsyncCollection[Doc, Model] =
AsyncCollection(underlying.collection[Doc, Model](model, name, storeManager))

def reIndex(): IO[Int] = fs2.Stream(underlying.collections: _*)
.covary[IO]
Expand Down
2 changes: 1 addition & 1 deletion async/src/main/scala/lightdb/async/AsyncQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo
facets: List[FacetQuery[Doc]] = Nil) { query =>
protected def collection: Collection[Doc, Model] = asyncCollection.underlying

def toQuery: Query[Doc, Model] = Query[Doc, Model](collection, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore, facets)
def toQuery: Query[Doc, Model] = Query[Doc, Model](asyncCollection.underlying.model, asyncCollection.underlying.store, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore, facets)

def scored: AsyncQuery[Doc, Model] = copy(scoreDocs = true)

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ val developerURL: String = "https://matthicks.com"

name := projectName
ThisBuild / organization := org
ThisBuild / version := "1.0.0"
ThisBuild / version := "1.1.0-SNAPSHOT"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := allScalaVersions
ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation")
Expand Down
19 changes: 3 additions & 16 deletions core/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,14 @@ trait LightDB extends Initializable with FeatureSupport[DBFeatureKey] {
*
* @param model the model to use for this collection
* @param name the collection's name (defaults to None meaning it will be generated based on the model name)
* @param store specify the store. If this is not set, the database's storeManager will be used to create one
* @param storeManager specify the StoreManager. If this is not set, the database's storeManager will be used.
* @param maxInsertBatch the maximum number of inserts to include in a batch. Defaults to 1 million.
* @param cacheQueries whether to cache queries in memory. This improves performance when running the same queries
* with different parameters fairly drastically, but consumes a lot of memory if many queries are
* executed in a single transaction.
*/
def collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model,
name: Option[String] = None,
store: Option[Store[Doc, Model]] = None,
storeManager: Option[StoreManager] = None,
maxInsertBatch: Int = 1_000_000,
cacheQueries: Boolean = Collection.DefaultCacheQueries): Collection[Doc, Model] = {
storeManager: Option[StoreManager] = None): Collection[Doc, Model] = {
val n = name.getOrElse(model.getClass.getSimpleName.replace("$", ""))
val s = () => store match {
case Some(store) => store
case None =>
val sm = storeManager.getOrElse(this.storeManager)
sm.create[Doc, Model](this, n, StoreMode.All)
}
val c = Collection[Doc, Model](n, model, s, maxInsertBatch, cacheQueries)
val store = storeManager.getOrElse(this.storeManager).create[Doc, Model](this, model, n, StoreMode.All())
val c = Collection[Doc, Model](n, model, store)
synchronized {
_collections = c :: _collections
}
Expand Down
48 changes: 25 additions & 23 deletions core/src/main/scala/lightdb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import lightdb.field.{Field, IndexingState}
import lightdb.filter._
import lightdb.materialized.{MaterializedAndDoc, MaterializedIndex}
import lightdb.spatial.{DistanceAndDoc, Geo}
import lightdb.store.{Conversion, StoreMode}
import lightdb.store.{Conversion, Store, StoreMode}
import lightdb.transaction.Transaction
import lightdb.util.GroupedIterator

case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: Collection[Doc, Model],
case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model,
store: Store[Doc, Model],
filter: Option[Filter[Doc]] = None,
sort: List[Sort] = Nil,
offset: Int = 0,
Expand All @@ -36,7 +37,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
def clearFilters: Query[Doc, Model] = copy(filter = None)

def filter(f: Model => Filter[Doc]): Query[Doc, Model] = {
val filter = f(collection.model)
val filter = f(model)
val combined = this.filter match {
case Some(current) => current && filter
case None => filter
Expand All @@ -48,15 +49,15 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
path: List[String] = Nil,
childrenLimit: Option[Int] = Some(10),
dimsLimit: Option[Int] = Some(10)): Query[Doc, Model] = {
val facetField = f(collection.model)
val facetField = f(model)
val facetQuery = FacetQuery(facetField, path, childrenLimit, dimsLimit)
copy(facets = facetQuery :: facets)
}

def facets(f: Model => List[FacetField[Doc]],
childrenLimit: Option[Int] = Some(10),
dimsLimit: Option[Int] = Some(10)): Query[Doc, Model] = {
val facetFields = f(collection.model)
val facetFields = f(model)
val facetQueries = facetFields.map(ff => FacetQuery(ff, Nil, childrenLimit, dimsLimit))
copy(facets = facets ::: facetQueries)
}
Expand All @@ -76,19 +77,20 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
object search {
def apply[V](conversion: Conversion[Doc, V])
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = {
val storeMode = collection.store.storeMode
if (Query.Validation || (Query.WarnFilteringWithoutIndex && storeMode == StoreMode.All)) {
val notIndexed = filter.toList.flatMap(_.fields(collection.model)).filter(!_.indexed)
storeMode match {
case StoreMode.Indexes => if (notIndexed.nonEmpty) {
val storeMode = store.storeMode
if (Query.Validation || (Query.WarnFilteringWithoutIndex && storeMode.isAll)) {
val notIndexed = filter.toList.flatMap(_.fields(model)).filter(!_.indexed)
if (storeMode.isIndexes) {
if (notIndexed.nonEmpty) {
throw NonIndexedFieldException(query, notIndexed)
}
case StoreMode.All => if (Query.WarnFilteringWithoutIndex && notIndexed.nonEmpty) {
} else {
if (Query.WarnFilteringWithoutIndex && notIndexed.nonEmpty) {
scribe.warn(s"Inefficient query filtering on non-indexed field(s): ${notIndexed.map(_.name).mkString(", ")}")
}
}
}
collection.store.doSearch(
store.doSearch(
query = query,
conversion = conversion
)
Expand All @@ -98,25 +100,25 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:

def value[F](f: Model => Field[Doc, F])
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, F] =
apply(Conversion.Value(f(collection.model)))
apply(Conversion.Value(f(model)))

def id(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, Id[Doc]] =
value(m => m._id)

def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, Json] =
apply(Conversion.Json(f(collection.model)))
apply(Conversion.Json(f(model)))

def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, T] =
apply(Conversion.Converted(f))

def materialized(f: Model => List[Field[Doc, _]])
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, MaterializedIndex[Doc, Model]] = {
val fields = f(collection.model)
val fields = f(model)
apply(Conversion.Materialized(fields))
}

def indexes()(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, MaterializedIndex[Doc, Model]] = {
val fields = collection.model.fields.filter(_.indexed)
val fields = model.fields.filter(_.indexed)
apply(Conversion.Materialized(fields))
}

Expand All @@ -129,7 +131,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
sort: Boolean = true,
radius: Option[Distance] = None)
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = {
val field = f(collection.model)
val field = f(model)
var q = Query.this
if (sort) {
q = q.clearSort.sort(Sort.ByDistance(field, from))
Expand All @@ -156,15 +158,15 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
(f: Doc => Option[Doc])
(implicit transaction: Transaction[Doc]): Unit = search.docs.iterator.foreach { doc =>
if (safeModify) {
collection.modify(doc._id, establishLock, deleteOnNone) { existing =>
store.modify(doc._id, establishLock, deleteOnNone) { existing =>
existing.flatMap(f)
}
} else {
collection.lock(doc._id, Some(doc)) { current =>
store.lock(doc._id, Some(doc)) { current =>
val result = f(current.getOrElse(doc))
result match {
case Some(modified) => if (!current.contains(modified)) collection.upsert(modified)
case None => if (deleteOnNone) collection.delete(doc._id)
case Some(modified) => if (!current.contains(modified)) store.upsert(modified)
case None => if (deleteOnNone) store.delete(store.idField, doc._id)
}
result
}
Expand All @@ -191,12 +193,12 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
}

def aggregate(f: Model => List[AggregateFunction[_, _, Doc]]): AggregateQuery[Doc, Model] =
AggregateQuery(this, f(collection.model))
AggregateQuery(this, f(model))

def grouped[F](f: Model => Field[Doc, F],
direction: SortDirection = SortDirection.Ascending)
(implicit transaction: Transaction[Doc]): GroupedIterator[Doc, F] = {
val field = f(collection.model)
val field = f(model)
val state = new IndexingState
val iterator = sort(Sort.ByField(field, direction))
.search
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/lightdb/aggregate/AggregateQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](que
filter: Option[AggregateFilter[Doc]] = None,
sort: List[(AggregateFunction[_, _, Doc], SortDirection)] = Nil) {
def filter(f: Model => AggregateFilter[Doc], and: Boolean = false): AggregateQuery[Doc, Model] = {
val filter = f(query.collection.model)
val filter = f(query.model)
if (and && this.filter.nonEmpty) {
copy(filter = Some(this.filter.get && filter))
} else {
Expand All @@ -19,7 +19,7 @@ case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](que
}

def filters(f: Model => List[AggregateFilter[Doc]]): AggregateQuery[Doc, Model] = {
val filters = f(query.collection.model)
val filters = f(query.model)
if (filters.nonEmpty) {
var filter = filters.head
filters.tail.foreach { f =>
Expand All @@ -33,14 +33,14 @@ case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](que

def sort(f: Model => AggregateFunction[_, _, Doc],
direction: SortDirection = SortDirection.Ascending): AggregateQuery[Doc, Model] = copy(
sort = sort ::: List((f(query.collection.model), direction))
sort = sort ::: List((f(query.model), direction))
)

def count(implicit transaction: Transaction[Doc]): Int =
query.collection.store.aggregateCount(this)
query.store.aggregateCount(this)

def iterator(implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] =
query.collection.store.aggregate(this)
query.store.aggregate(this)

def toList(implicit transaction: Transaction[Doc]): List[MaterializedAggregate[Doc, Model]] = iterator.toList
}
Loading

0 comments on commit 7695076

Please sign in to comment.