diff --git a/all/src/test/scala/spec/AirportSpec.scala b/all/src/test/scala/spec/AirportSpec.scala index f426f771..8a8b7144 100644 --- a/all/src/test/scala/spec/AirportSpec.scala +++ b/all/src/test/scala/spec/AirportSpec.scala @@ -110,7 +110,7 @@ class AirportSpec extends AnyWordSpec with Matchers { } object DB extends LightDB { - override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore, searchingMode = StoreMode.Indexes) + override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore) lazy val directory: Option[Path] = Some(Path.of("db/AirportSpec")) diff --git a/all/src/test/scala/spec/HaloDBAndLuceneSpec.scala b/all/src/test/scala/spec/HaloDBAndLuceneSpec.scala index b0b293b8..41eee70e 100644 --- a/all/src/test/scala/spec/HaloDBAndLuceneSpec.scala +++ b/all/src/test/scala/spec/HaloDBAndLuceneSpec.scala @@ -8,5 +8,5 @@ import lightdb.store.split.SplitStoreManager class HaloDBAndLuceneSpec extends AbstractBasicSpec { override protected def filterBuilderSupported: Boolean = true - override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore, searchingMode = StoreMode.Indexes) -} + override def storeManager: StoreManager = SplitStoreManager(HaloDBStore, LuceneStore) +} \ No newline at end of file diff --git a/all/src/test/scala/spec/RocksDBAndLuceneSpec.scala b/all/src/test/scala/spec/RocksDBAndLuceneSpec.scala index fe81ca44..65f88258 100644 --- a/all/src/test/scala/spec/RocksDBAndLuceneSpec.scala +++ b/all/src/test/scala/spec/RocksDBAndLuceneSpec.scala @@ -9,5 +9,5 @@ import lightdb.store.split.SplitStoreManager class RocksDBAndLuceneSpec extends AbstractBasicSpec { override protected def filterBuilderSupported: Boolean = true - override def storeManager: StoreManager = SplitStoreManager(RocksDBStore, LuceneStore, searchingMode = StoreMode.Indexes) + override def storeManager: StoreManager = SplitStoreManager(RocksDBStore, LuceneStore) } diff --git a/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala b/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala index 460a5fb2..cbe451bc 100644 --- a/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala +++ b/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala @@ -12,7 +12,7 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc] filter: Option[AggregateFilter[Doc]] = None, sort: List[(AggregateFunction[_, _, Doc], SortDirection)] = Nil) { def filter(f: Model => AggregateFilter[Doc], and: Boolean = false): AsyncAggregateQuery[Doc, Model] = { - val filter = f(query.collection.model) + val filter = f(query.model) if (and && this.filter.nonEmpty) { copy(filter = Some(this.filter.get && filter)) } else { @@ -21,7 +21,7 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc] } def filters(f: Model => List[AggregateFilter[Doc]]): AsyncAggregateQuery[Doc, Model] = { - val filters = f(query.collection.model) + val filters = f(query.model) if (filters.nonEmpty) { var filter = filters.head filters.tail.foreach { f => @@ -35,7 +35,7 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc] def sort(f: Model => AggregateFunction[_, _, Doc], direction: SortDirection = SortDirection.Ascending): AsyncAggregateQuery[Doc, Model] = copy( - sort = sort ::: List((f(query.collection.model), direction)) + sort = sort ::: List((f(query.model), direction)) ) private lazy val aggregateQuery = AggregateQuery( @@ -46,10 +46,10 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc] ) def count(implicit transaction: Transaction[Doc]): IO[Int] = - IO.blocking(query.collection.store.aggregateCount(aggregateQuery)) + IO.blocking(query.store.aggregateCount(aggregateQuery)) def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedAggregate[Doc, Model]] = { - val iterator = query.collection.store.aggregate(aggregateQuery) + val iterator = query.store.aggregate(aggregateQuery) fs2.Stream.fromBlockingIterator[IO](iterator, 100) } diff --git a/async/src/main/scala/lightdb/async/AsyncLightDB.scala b/async/src/main/scala/lightdb/async/AsyncLightDB.scala index 8a0be544..6dd114cb 100644 --- a/async/src/main/scala/lightdb/async/AsyncLightDB.scala +++ b/async/src/main/scala/lightdb/async/AsyncLightDB.scala @@ -69,11 +69,8 @@ trait AsyncLightDB extends FeatureSupport[DBFeatureKey] { db => def collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model, name: Option[String] = None, - store: Option[Store[Doc, Model]] = None, - storeManager: Option[StoreManager] = None, - maxInsertBatch: Int = 1_000_000, - cacheQueries: Boolean = Collection.DefaultCacheQueries): AsyncCollection[Doc, Model] = - AsyncCollection(underlying.collection[Doc, Model](model, name, store, storeManager, maxInsertBatch, cacheQueries)) + storeManager: Option[StoreManager] = None): AsyncCollection[Doc, Model] = + AsyncCollection(underlying.collection[Doc, Model](model, name, storeManager)) def reIndex(): IO[Int] = fs2.Stream(underlying.collections: _*) .covary[IO] diff --git a/async/src/main/scala/lightdb/async/AsyncQuery.scala b/async/src/main/scala/lightdb/async/AsyncQuery.scala index d46bf5da..c1fc7651 100644 --- a/async/src/main/scala/lightdb/async/AsyncQuery.scala +++ b/async/src/main/scala/lightdb/async/AsyncQuery.scala @@ -28,7 +28,7 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo facets: List[FacetQuery[Doc]] = Nil) { query => protected def collection: Collection[Doc, Model] = asyncCollection.underlying - def toQuery: Query[Doc, Model] = Query[Doc, Model](collection, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore, facets) + def toQuery: Query[Doc, Model] = Query[Doc, Model](asyncCollection.underlying.model, asyncCollection.underlying.store, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore, facets) def scored: AsyncQuery[Doc, Model] = copy(scoreDocs = true) diff --git a/build.sbt b/build.sbt index ebc0d1c0..2c3d0dca 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,7 @@ val developerURL: String = "https://matthicks.com" name := projectName ThisBuild / organization := org -ThisBuild / version := "1.0.0" +ThisBuild / version := "1.1.0-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := allScalaVersions ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation") diff --git a/core/src/main/scala/lightdb/LightDB.scala b/core/src/main/scala/lightdb/LightDB.scala index 04759a7f..748e202d 100644 --- a/core/src/main/scala/lightdb/LightDB.scala +++ b/core/src/main/scala/lightdb/LightDB.scala @@ -102,27 +102,14 @@ trait LightDB extends Initializable with FeatureSupport[DBFeatureKey] { * * @param model the model to use for this collection * @param name the collection's name (defaults to None meaning it will be generated based on the model name) - * @param store specify the store. If this is not set, the database's storeManager will be used to create one * @param storeManager specify the StoreManager. If this is not set, the database's storeManager will be used. - * @param maxInsertBatch the maximum number of inserts to include in a batch. Defaults to 1 million. - * @param cacheQueries whether to cache queries in memory. This improves performance when running the same queries - * with different parameters fairly drastically, but consumes a lot of memory if many queries are - * executed in a single transaction. */ def collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model, name: Option[String] = None, - store: Option[Store[Doc, Model]] = None, - storeManager: Option[StoreManager] = None, - maxInsertBatch: Int = 1_000_000, - cacheQueries: Boolean = Collection.DefaultCacheQueries): Collection[Doc, Model] = { + storeManager: Option[StoreManager] = None): Collection[Doc, Model] = { val n = name.getOrElse(model.getClass.getSimpleName.replace("$", "")) - val s = () => store match { - case Some(store) => store - case None => - val sm = storeManager.getOrElse(this.storeManager) - sm.create[Doc, Model](this, n, StoreMode.All) - } - val c = Collection[Doc, Model](n, model, s, maxInsertBatch, cacheQueries) + val store = storeManager.getOrElse(this.storeManager).create[Doc, Model](this, model, n, StoreMode.All()) + val c = Collection[Doc, Model](n, model, store) synchronized { _collections = c :: _collections } diff --git a/core/src/main/scala/lightdb/Query.scala b/core/src/main/scala/lightdb/Query.scala index 4caae674..4e3581f9 100644 --- a/core/src/main/scala/lightdb/Query.scala +++ b/core/src/main/scala/lightdb/Query.scala @@ -12,11 +12,12 @@ import lightdb.field.{Field, IndexingState} import lightdb.filter._ import lightdb.materialized.{MaterializedAndDoc, MaterializedIndex} import lightdb.spatial.{DistanceAndDoc, Geo} -import lightdb.store.{Conversion, StoreMode} +import lightdb.store.{Conversion, Store, StoreMode} import lightdb.transaction.Transaction import lightdb.util.GroupedIterator -case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: Collection[Doc, Model], +case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model, + store: Store[Doc, Model], filter: Option[Filter[Doc]] = None, sort: List[Sort] = Nil, offset: Int = 0, @@ -36,7 +37,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: def clearFilters: Query[Doc, Model] = copy(filter = None) def filter(f: Model => Filter[Doc]): Query[Doc, Model] = { - val filter = f(collection.model) + val filter = f(model) val combined = this.filter match { case Some(current) => current && filter case None => filter @@ -48,7 +49,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: path: List[String] = Nil, childrenLimit: Option[Int] = Some(10), dimsLimit: Option[Int] = Some(10)): Query[Doc, Model] = { - val facetField = f(collection.model) + val facetField = f(model) val facetQuery = FacetQuery(facetField, path, childrenLimit, dimsLimit) copy(facets = facetQuery :: facets) } @@ -56,7 +57,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: def facets(f: Model => List[FacetField[Doc]], childrenLimit: Option[Int] = Some(10), dimsLimit: Option[Int] = Some(10)): Query[Doc, Model] = { - val facetFields = f(collection.model) + val facetFields = f(model) val facetQueries = facetFields.map(ff => FacetQuery(ff, Nil, childrenLimit, dimsLimit)) copy(facets = facets ::: facetQueries) } @@ -76,19 +77,20 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: object search { def apply[V](conversion: Conversion[Doc, V]) (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = { - val storeMode = collection.store.storeMode - if (Query.Validation || (Query.WarnFilteringWithoutIndex && storeMode == StoreMode.All)) { - val notIndexed = filter.toList.flatMap(_.fields(collection.model)).filter(!_.indexed) - storeMode match { - case StoreMode.Indexes => if (notIndexed.nonEmpty) { + val storeMode = store.storeMode + if (Query.Validation || (Query.WarnFilteringWithoutIndex && storeMode.isAll)) { + val notIndexed = filter.toList.flatMap(_.fields(model)).filter(!_.indexed) + if (storeMode.isIndexes) { + if (notIndexed.nonEmpty) { throw NonIndexedFieldException(query, notIndexed) } - case StoreMode.All => if (Query.WarnFilteringWithoutIndex && notIndexed.nonEmpty) { + } else { + if (Query.WarnFilteringWithoutIndex && notIndexed.nonEmpty) { scribe.warn(s"Inefficient query filtering on non-indexed field(s): ${notIndexed.map(_.name).mkString(", ")}") } } } - collection.store.doSearch( + store.doSearch( query = query, conversion = conversion ) @@ -98,25 +100,25 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: def value[F](f: Model => Field[Doc, F]) (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, F] = - apply(Conversion.Value(f(collection.model))) + apply(Conversion.Value(f(model))) def id(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, Id[Doc]] = value(m => m._id) def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, Json] = - apply(Conversion.Json(f(collection.model))) + apply(Conversion.Json(f(model))) def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, T] = apply(Conversion.Converted(f)) def materialized(f: Model => List[Field[Doc, _]]) (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, MaterializedIndex[Doc, Model]] = { - val fields = f(collection.model) + val fields = f(model) apply(Conversion.Materialized(fields)) } def indexes()(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, MaterializedIndex[Doc, Model]] = { - val fields = collection.model.fields.filter(_.indexed) + val fields = model.fields.filter(_.indexed) apply(Conversion.Materialized(fields)) } @@ -129,7 +131,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: sort: Boolean = true, radius: Option[Distance] = None) (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, DistanceAndDoc[Doc]] = { - val field = f(collection.model) + val field = f(model) var q = Query.this if (sort) { q = q.clearSort.sort(Sort.ByDistance(field, from)) @@ -156,15 +158,15 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: (f: Doc => Option[Doc]) (implicit transaction: Transaction[Doc]): Unit = search.docs.iterator.foreach { doc => if (safeModify) { - collection.modify(doc._id, establishLock, deleteOnNone) { existing => + store.modify(doc._id, establishLock, deleteOnNone) { existing => existing.flatMap(f) } } else { - collection.lock(doc._id, Some(doc)) { current => + store.lock(doc._id, Some(doc)) { current => val result = f(current.getOrElse(doc)) result match { - case Some(modified) => if (!current.contains(modified)) collection.upsert(modified) - case None => if (deleteOnNone) collection.delete(doc._id) + case Some(modified) => if (!current.contains(modified)) store.upsert(modified) + case None => if (deleteOnNone) store.delete(store.idField, doc._id) } result } @@ -191,12 +193,12 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: } def aggregate(f: Model => List[AggregateFunction[_, _, Doc]]): AggregateQuery[Doc, Model] = - AggregateQuery(this, f(collection.model)) + AggregateQuery(this, f(model)) def grouped[F](f: Model => Field[Doc, F], direction: SortDirection = SortDirection.Ascending) (implicit transaction: Transaction[Doc]): GroupedIterator[Doc, F] = { - val field = f(collection.model) + val field = f(model) val state = new IndexingState val iterator = sort(Sort.ByField(field, direction)) .search diff --git a/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala b/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala index 195e9dfc..340f7b31 100644 --- a/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala +++ b/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala @@ -10,7 +10,7 @@ case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](que filter: Option[AggregateFilter[Doc]] = None, sort: List[(AggregateFunction[_, _, Doc], SortDirection)] = Nil) { def filter(f: Model => AggregateFilter[Doc], and: Boolean = false): AggregateQuery[Doc, Model] = { - val filter = f(query.collection.model) + val filter = f(query.model) if (and && this.filter.nonEmpty) { copy(filter = Some(this.filter.get && filter)) } else { @@ -19,7 +19,7 @@ case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](que } def filters(f: Model => List[AggregateFilter[Doc]]): AggregateQuery[Doc, Model] = { - val filters = f(query.collection.model) + val filters = f(query.model) if (filters.nonEmpty) { var filter = filters.head filters.tail.foreach { f => @@ -33,14 +33,14 @@ case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](que def sort(f: Model => AggregateFunction[_, _, Doc], direction: SortDirection = SortDirection.Ascending): AggregateQuery[Doc, Model] = copy( - sort = sort ::: List((f(query.collection.model), direction)) + sort = sort ::: List((f(query.model), direction)) ) def count(implicit transaction: Transaction[Doc]): Int = - query.collection.store.aggregateCount(this) + query.store.aggregateCount(this) def iterator(implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] = - query.collection.store.aggregate(this) + query.store.aggregate(this) def toList(implicit transaction: Transaction[Doc]): List[MaterializedAggregate[Doc, Model]] = iterator.toList } diff --git a/core/src/main/scala/lightdb/collection/Collection.scala b/core/src/main/scala/lightdb/collection/Collection.scala index e68075a5..a9dda0fe 100644 --- a/core/src/main/scala/lightdb/collection/Collection.scala +++ b/core/src/main/scala/lightdb/collection/Collection.scala @@ -18,18 +18,12 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, model: Model, - loadStore: () => Store[Doc, Model], - maxInsertBatch: Int = 1_000_000, - cacheQueries: Boolean = Collection.DefaultCacheQueries) extends Initializable { collection => - lazy val lock: LockManager[Id[Doc], Doc] = new LockManager + store: Store[Doc, Model]) extends Initializable { collection => + def lock: LockManager[Id[Doc], Doc] = store.lock - object trigger extends CollectionTriggers[Doc] - - lazy val store: Store[Doc, Model] = loadStore() + def trigger: store.trigger.type = store.trigger override protected def initialize(): Unit = { - store.init(this) - model match { case jc: JsonConversion[_] => val fieldNames = model.fields.map(_.name).toSet @@ -60,44 +54,7 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def reIndex(): Boolean = store.reIndex() - object transaction { - private val set = ConcurrentHashMap.newKeySet[Transaction[Doc]] - - def active: Int = set.size() - - def apply[Return](f: Transaction[Doc] => Return): Return = { - val transaction = create() - try { - f(transaction) - } finally { - release(transaction) - } - } - - def create(): Transaction[Doc] = { - if (Collection.LogTransactions) scribe.info(s"Creating new Transaction for $name") - val transaction = store.createTransaction() - set.add(transaction) - trigger.transactionStart(transaction) - transaction - } - - def release(transaction: Transaction[Doc]): Unit = { - if (Collection.LogTransactions) scribe.info(s"Releasing Transaction for $name") - trigger.transactionEnd(transaction) - store.releaseTransaction(transaction) - transaction.close() - set.remove(transaction) - } - - def releaseAll(): Int = { - val list = set.iterator().asScala.toList - list.foreach { transaction => - release(transaction) - } - list.size - } - } + def transaction: store.transaction.type = store.transaction /** * Convenience feature for simple one-off operations removing the need to manually create a transaction around it. @@ -216,10 +173,7 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S .iterator .flatMap(get) - def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Doc = - store.get(model._id, id).getOrElse { - throw DocNotFoundException(name, "_id", id) - } + def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Doc = store(id) def list()(implicit transaction: Transaction[Doc]): List[Doc] = iterator.toList @@ -227,17 +181,7 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S establishLock: Boolean = true, deleteOnNone: Boolean = false) (f: Option[Doc] => Option[Doc]) - (implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(id), establishLock) { existing => - f(existing) match { - case Some(doc) => - upsert(doc) - Some(doc) - case None if deleteOnNone => - delete(id) - None - case None => None - } - } + (implicit transaction: Transaction[Doc]): Option[Doc] = store.modify(id, establishLock, deleteOnNone)(f) def getOrCreate(id: Id[Doc], create: => Doc, establishLock: Boolean = true) (implicit transaction: Transaction[Doc]): Doc = modify(id, establishLock = establishLock) { @@ -260,7 +204,7 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = store.iterator - lazy val query: Query[Doc, Model] = Query(this) + lazy val query: Query[Doc, Model] = Query(model, store) def truncate()(implicit transaction: Transaction[Doc]): Int = { trigger.truncate() @@ -279,6 +223,7 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S } object Collection { - var DefaultCacheQueries: Boolean = false + var CacheQueries: Boolean = false + var MaxInsertBatch: Int = 1_000_000 var LogTransactions: Boolean = false } \ No newline at end of file diff --git a/core/src/main/scala/lightdb/store/InMemoryIndexes.scala b/core/src/main/scala/lightdb/store/InMemoryIndexes.scala index bf5fac72..5a0bf979 100644 --- a/core/src/main/scala/lightdb/store/InMemoryIndexes.scala +++ b/core/src/main/scala/lightdb/store/InMemoryIndexes.scala @@ -14,15 +14,11 @@ trait InMemoryIndexes[Doc <: Document[Doc], Model <: DocumentModel[Doc]] extends }: _*) private lazy val indexes = indexMap.values.toList - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - - // Populate indexes - collection.transaction { implicit transaction => - val state = new IndexingState - collection.iterator.foreach { doc => - indexes.foreach(_.set(doc, state)) - } + // Populate indexes + transaction { implicit transaction => + val state = new IndexingState + iterator.foreach { doc => + indexes.foreach(_.set(doc, state)) } } diff --git a/core/src/main/scala/lightdb/store/MapStore.scala b/core/src/main/scala/lightdb/store/MapStore.scala index f1f14a0b..cca3d096 100644 --- a/core/src/main/scala/lightdb/store/MapStore.scala +++ b/core/src/main/scala/lightdb/store/MapStore.scala @@ -8,13 +8,11 @@ import lightdb.doc.{Document, DocumentModel} import lightdb.materialized.MaterializedAggregate import lightdb.transaction.Transaction -class MapStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMode: StoreMode) extends Store[Doc, Model] { +class MapStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + val storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) { private var map = Map.empty[Id[Doc], Doc] - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - } - override def prepareTransaction(transaction: Transaction[Doc]): Unit = () override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = synchronized { @@ -75,6 +73,7 @@ class MapStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMode: object MapStore extends StoreManager { override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = new MapStore[Doc, Model](storeMode) + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = new MapStore[Doc, Model](name, model, storeMode) } \ No newline at end of file diff --git a/core/src/main/scala/lightdb/store/Store.scala b/core/src/main/scala/lightdb/store/Store.scala index d884c3b8..82cf1e18 100644 --- a/core/src/main/scala/lightdb/store/Store.scala +++ b/core/src/main/scala/lightdb/store/Store.scala @@ -9,33 +9,38 @@ import lightdb.doc.{Document, DocumentModel} import lightdb.materialized.MaterializedAggregate import lightdb.transaction.Transaction import lightdb._ +import lightdb.error.DocNotFoundException import lightdb.field.Field import lightdb.field.Field._ +import lightdb.lock.LockManager +import lightdb.trigger.CollectionTriggers import java.io.File +import java.util.concurrent.ConcurrentHashMap +import scala.jdk.CollectionConverters.IteratorHasAsScala -abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]] { - protected var collection: Collection[Doc, Model] = _ - +abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name: String, + model: Model) { protected def id(doc: Doc): Id[Doc] = doc.asInstanceOf[Document[_]]._id.asInstanceOf[Id[Doc]] - protected lazy val idField: UniqueIndex[Doc, Id[Doc]] = collection.model._id + lazy val idField: UniqueIndex[Doc, Id[Doc]] = model._id + + lazy val lock: LockManager[Id[Doc], Doc] = new LockManager + + object trigger extends CollectionTriggers[Doc] - def storeMode: StoreMode + def storeMode: StoreMode[Doc, Model] - protected lazy val fields: List[Field[Doc, _]] = collection.model.fields match { - case fields if storeMode == StoreMode.Indexes => fields.filter(_.isInstanceOf[Indexed[_, _]]) - case fields => fields + protected lazy val fields: List[Field[Doc, _]] = if (storeMode.isIndexes) { + model.fields.filter(_.isInstanceOf[Indexed[_, _]]) + } else { + model.fields } - protected def toString(doc: Doc): String = JsonFormatter.Compact(doc.json(collection.model.rw)) - protected def fromString(string: String): Doc = JsonParser(string).as[Doc](collection.model.rw) + protected def toString(doc: Doc): String = JsonFormatter.Compact(doc.json(model.rw)) + protected def fromString(string: String): Doc = JsonParser(string).as[Doc](model.rw) lazy val hasSpatial: Boolean = fields.exists(_.isSpatial) - def init(collection: Collection[Doc, Model]): Unit = { - this.collection = collection - } - final def createTransaction(): Transaction[Doc] = { val t = new Transaction[Doc] prepareTransaction(t) @@ -62,7 +67,7 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]] { def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] - def jsonIterator(implicit transaction: Transaction[Doc]): Iterator[Json] = iterator.map(_.json(collection.model.rw)) + def jsonIterator(implicit transaction: Transaction[Doc]): Iterator[Json] = iterator.map(_.json(model.rw)) def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] @@ -78,6 +83,65 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]] { def reIndex(): Boolean = false + def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Doc = get(model._id, id).getOrElse { + throw DocNotFoundException(name, "_id", id) + } + + def modify(id: Id[Doc], + establishLock: Boolean = true, + deleteOnNone: Boolean = false) + (f: Option[Doc] => Option[Doc]) + (implicit transaction: Transaction[Doc]): Option[Doc] = this.lock(id, get(idField, id), establishLock) { existing => + f(existing) match { + case Some(doc) => + upsert(doc) + Some(doc) + case None if deleteOnNone => + delete(idField, id) + None + case None => None + } + } + + object transaction { + private val set = ConcurrentHashMap.newKeySet[Transaction[Doc]] + + def active: Int = set.size() + + def apply[Return](f: Transaction[Doc] => Return): Return = { + val transaction = create() + try { + f(transaction) + } finally { + release(transaction) + } + } + + def create(): Transaction[Doc] = { + if (Collection.LogTransactions) scribe.info(s"Creating new Transaction for $name") + val transaction = createTransaction() + set.add(transaction) + trigger.transactionStart(transaction) + transaction + } + + def release(transaction: Transaction[Doc]): Unit = { + if (Collection.LogTransactions) scribe.info(s"Releasing Transaction for $name") + trigger.transactionEnd(transaction) + releaseTransaction(transaction) + transaction.close() + set.remove(transaction) + } + + def releaseAll(): Int = { + val list = set.iterator().asScala.toList + list.foreach { transaction => + release(transaction) + } + list.size + } + } + def dispose(): Unit } diff --git a/core/src/main/scala/lightdb/store/StoreManager.scala b/core/src/main/scala/lightdb/store/StoreManager.scala index 7b81f393..d9ffa965 100644 --- a/core/src/main/scala/lightdb/store/StoreManager.scala +++ b/core/src/main/scala/lightdb/store/StoreManager.scala @@ -7,6 +7,7 @@ trait StoreManager { lazy val name: String = getClass.getSimpleName.replace("$", "") def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, - name: String, - storeMode: StoreMode): Store[Doc, Model] + model: Model, + name: String, + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] } \ No newline at end of file diff --git a/core/src/main/scala/lightdb/store/StoreMode.scala b/core/src/main/scala/lightdb/store/StoreMode.scala index 058df980..be8225c2 100644 --- a/core/src/main/scala/lightdb/store/StoreMode.scala +++ b/core/src/main/scala/lightdb/store/StoreMode.scala @@ -1,8 +1,17 @@ package lightdb.store -sealed trait StoreMode +import lightdb.doc.{Document, DocumentModel} + +sealed trait StoreMode[Doc <: Document[Doc], Model <: DocumentModel[Doc]] { + def isAll: Boolean = false + def isIndexes: Boolean = false +} object StoreMode { - case object All extends StoreMode - case object Indexes extends StoreMode + case class All[Doc <: Document[Doc], Model <: DocumentModel[Doc]]() extends StoreMode[Doc, Model] { + override def isAll: Boolean = true + } + case class Indexes[Doc <: Document[Doc], Model <: DocumentModel[Doc]](storage: Store[Doc, Model]) extends StoreMode[Doc, Model] { + override def isIndexes: Boolean = true + } } \ No newline at end of file diff --git a/core/src/main/scala/lightdb/store/split/SplitStore.scala b/core/src/main/scala/lightdb/store/split/SplitStore.scala index c7dd2b0b..af9b9d3a 100644 --- a/core/src/main/scala/lightdb/store/split/SplitStore.scala +++ b/core/src/main/scala/lightdb/store/split/SplitStore.scala @@ -12,15 +12,11 @@ import lightdb.field.Field._ import scala.language.implicitConversions -case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](storage: Store[Doc, Model], +case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](override val name: String, + model: Model, + storage: Store[Doc, Model], searching: Store[Doc, Model], - storeMode: StoreMode) extends Store[Doc, Model] { - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - storage.init(collection) - searching.init(collection) - } - + storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) { override def prepareTransaction(transaction: Transaction[Doc]): Unit = { storage.prepareTransaction(transaction) searching.prepareTransaction(transaction) @@ -82,20 +78,20 @@ case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](storage searching.truncate() } - override def verify(): Boolean = collection.transaction { implicit transaction => + override def verify(): Boolean = transaction { implicit transaction => val storageCount = storage.count val searchCount = searching.count if (storageCount != searchCount) { - scribe.warn(s"${collection.name} out of sync! Storage Count: $storageCount, Search Count: $searchCount. Re-Indexing...") + scribe.warn(s"$name out of sync! Storage Count: $storageCount, Search Count: $searchCount. Re-Indexing...") reIndexInternal() - scribe.info(s"${collection.name} re-indexed successfully!") + scribe.info(s"$name re-indexed successfully!") true } else { false } } - override def reIndex(): Boolean = collection.transaction { implicit transaction => + override def reIndex(): Boolean = transaction { implicit transaction => reIndexInternal() true } diff --git a/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala b/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala index 85f08154..e9395036 100644 --- a/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala +++ b/core/src/main/scala/lightdb/store/split/SplitStoreManager.scala @@ -6,14 +6,20 @@ import lightdb.store.{Store, StoreManager, StoreMode} case class SplitStoreManager(storage: StoreManager, searching: StoreManager, - searchingMode: StoreMode = StoreMode.All) extends StoreManager { + searchIndexAll: Boolean = false) extends StoreManager { override lazy val name: String = s"Split($storage, $searching)" override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = SplitStore( - storage = storage.create[Doc, Model](db, name, StoreMode.All), - searching = searching.create[Doc, Model](db, name, searchingMode), - storeMode = storeMode - ) + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = { + val storage = this.storage.create[Doc, Model](db, model, name, StoreMode.All()) + SplitStore( + name = name, + model = model, + storage = storage, + searching = searching.create[Doc, Model](db, model, name, if (searchIndexAll) StoreMode.All() else StoreMode.Indexes(storage)), + storeMode = storeMode + ) + } } diff --git a/core/src/main/scala/lightdb/util/Aggregator.scala b/core/src/main/scala/lightdb/util/Aggregator.scala index 459c4c3a..0c3c8617 100644 --- a/core/src/main/scala/lightdb/util/Aggregator.scala +++ b/core/src/main/scala/lightdb/util/Aggregator.scala @@ -14,7 +14,7 @@ import lightdb.transaction.Transaction * Convenience class to stream aggregation for Stores that don't directly support aggregation */ object Aggregator { - def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](query: AggregateQuery[Doc, Model], collection: Collection[Doc, Model]) + def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](query: AggregateQuery[Doc, Model], model: Model) (implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] = { val fields = query.functions.map(_.field).distinct val groupFields = query.functions.filter(_.`type` == AggregateType.Group).map(_.field) @@ -112,7 +112,7 @@ object Aggregator { } key -> map } - var list = groups.toList.map(t => MaterializedAggregate[Doc, Model](Obj(t._2), collection.model)) + var list = groups.toList.map(t => MaterializedAggregate[Doc, Model](Obj(t._2), model)) query.sort.reverse.foreach { case (f, direction) => list = list.sortBy(_.json(f.name))(if (direction == Ascending) JsonOrdering else JsonOrdering.reverse) diff --git a/duckdb/src/main/scala/lightdb/duckdb/DuckDBStore.scala b/duckdb/src/main/scala/lightdb/duckdb/DuckDBStore.scala index 59ff97a6..44428e15 100644 --- a/duckdb/src/main/scala/lightdb/duckdb/DuckDBStore.scala +++ b/duckdb/src/main/scala/lightdb/duckdb/DuckDBStore.scala @@ -11,9 +11,11 @@ import org.duckdb.DuckDBConnection import java.nio.file.Path import java.sql.Connection -class DuckDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connectionManager: ConnectionManager, +class DuckDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + val connectionManager: ConnectionManager, val connectionShared: Boolean, - val storeMode: StoreMode) extends SQLStore[Doc, Model] { + val storeMode: StoreMode[Doc, Model]) extends SQLStore[Doc, Model](name, model) { // TODO: Use DuckDB's Appender for better performance /*override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = { fields.zipWithIndex.foreach { @@ -56,8 +58,10 @@ object DuckDBStore extends StoreManager { )) } - def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](file: Option[Path], storeMode: StoreMode): DuckDBStore[Doc, Model] = { + def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, model: Model, file: Option[Path], storeMode: StoreMode[Doc, Model]): DuckDBStore[Doc, Model] = { new DuckDBStore[Doc, Model]( + name = name, + model = model, connectionManager = singleConnectionManager(file), connectionShared = false, storeMode = storeMode @@ -65,15 +69,18 @@ object DuckDBStore extends StoreManager { } override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = { + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = { db.get(SQLDatabase.Key) match { case Some(sqlDB) => new DuckDBStore[Doc, Model]( + name = name, + model = model, connectionManager = sqlDB.connectionManager, connectionShared = true, storeMode ) - case None => apply[Doc, Model](db.directory.map(_.resolve(s"$name.duckdb")), storeMode) + case None => apply[Doc, Model](name, model, db.directory.map(_.resolve(s"$name.duckdb")), storeMode) } } } \ No newline at end of file diff --git a/h2/src/main/scala/lightdb/h2/H2Store.scala b/h2/src/main/scala/lightdb/h2/H2Store.scala index 39de4d95..0127b624 100644 --- a/h2/src/main/scala/lightdb/h2/H2Store.scala +++ b/h2/src/main/scala/lightdb/h2/H2Store.scala @@ -11,9 +11,11 @@ import java.nio.file.Path import java.sql.Connection // TODO: Look into http://www.h2gis.org/docs/1.5.0/quickstart/ for spatial support -class H2Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connectionManager: ConnectionManager, +class H2Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + val connectionManager: ConnectionManager, val connectionShared: Boolean, - val storeMode: StoreMode) extends SQLStore[Doc, Model] { + val storeMode: StoreMode[Doc, Model]) extends SQLStore[Doc, Model](name, model) { override protected def upsertPrefix: String = "MERGE" protected def tables(connection: Connection): Set[String] = { @@ -40,24 +42,31 @@ object H2Store extends StoreManager { jdbcUrl = s"jdbc:h2:${file.map(_.toFile.getCanonicalPath).map(p => s"file:$p").getOrElse(s"test:${Unique()}")};NON_KEYWORDS=VALUE,USER,SEARCH" ) - def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](file: Option[Path], - storeMode: StoreMode): H2Store[Doc, Model] = + def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + file: Option[Path], + storeMode: StoreMode[Doc, Model]): H2Store[Doc, Model] = new H2Store[Doc, Model]( + name = name, + model = model, connectionManager = SingleConnectionManager(config(file)), connectionShared = false, storeMode = storeMode ) override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = { + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = { db.get(SQLDatabase.Key) match { case Some(sqlDB) => new H2Store[Doc, Model]( + name = name, + model = model, connectionManager = sqlDB.connectionManager, connectionShared = true, storeMode ) - case None => apply[Doc, Model](db.directory.map(_.resolve(s"$name.h2")), storeMode) + case None => apply[Doc, Model](name, model, db.directory.map(_.resolve(s"$name.h2")), storeMode) } } } \ No newline at end of file diff --git a/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala b/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala index c074f688..0480eb85 100644 --- a/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala +++ b/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala @@ -16,11 +16,13 @@ import lightdb.transaction.Transaction import java.nio.file.{Files, Path} import scala.jdk.CollectionConverters._ -class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: Path, - val storeMode: StoreMode, +class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + directory: Path, + val storeMode: StoreMode[Doc, Model], indexThreads: Int = Runtime.getRuntime.availableProcessors(), - maxFileSize: Int = 1024 * 1024 * 1024) extends Store[Doc, Model] { - private lazy val instance: HaloDB = { + maxFileSize: Int = 1024 * 1024 * 1024) extends Store[Doc, Model](name, model) { + private val instance: HaloDB = { val opts = new HaloDBOptions opts.setBuildIndexThreads(indexThreads) opts.setMaxFileSize(maxFileSize) @@ -36,17 +38,12 @@ class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: HaloDB.open(directory.toAbsolutePath.toString, opts) } - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - instance - } - override def prepareTransaction(transaction: Transaction[Doc]): Unit = () override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = upsert(doc) override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = { - val json = doc.json(collection.model.rw) + val json = doc.json(model.rw) instance.put(id(doc).bytes, JsonFormatter.Compact(json).getBytes("UTF-8")) } @@ -62,7 +59,7 @@ class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: private def bytes2Doc(bytes: Array[Byte]): Doc = { val json = bytes2Json(bytes) - json.as[Doc](collection.model.rw) + json.as[Doc](model.rw) } private def bytes2Json(bytes: Array[Byte]): Json = { @@ -112,7 +109,8 @@ class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: object HaloDBStore extends StoreManager { override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = - new HaloDBStore[Doc, Model](db.directory.get.resolve(name), storeMode) + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = + new HaloDBStore[Doc, Model](name, model, db.directory.get.resolve(name), storeMode) } \ No newline at end of file diff --git a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala index 6bf374c1..2f2af59b 100644 --- a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala +++ b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala @@ -33,7 +33,10 @@ import java.nio.file.{Files, Path} import scala.language.implicitConversions import scala.util.Try -class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: Option[Path], val storeMode: StoreMode) extends Store[Doc, Model] { +class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + directory: Option[Path], + val storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) { IndexSearcher.setMaxClauseCount(10_000_000) private lazy val index = Index(directory) @@ -55,20 +58,16 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: doc } - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - - directory.foreach { path => - if (Files.exists(path)) { - val directory = FSDirectory.open(path) - val reader = DirectoryReader.open(directory) - reader.leaves().forEach { leaf => - val dataVersion = leaf.reader().asInstanceOf[SegmentReader].getSegmentInfo.info.getVersion - val latest = Version.LATEST - if (latest != dataVersion) { - // TODO: Support re-indexing - scribe.warn(s"Data Version: $dataVersion, Latest Version: $latest") - } + directory.foreach { path => + if (Files.exists(path)) { + val directory = FSDirectory.open(path) + val reader = DirectoryReader.open(directory) + reader.leaves().forEach { leaf => + val dataVersion = leaf.reader().asInstanceOf[SegmentReader].getSegmentInfo.info.getVersion + val latest = Version.LATEST + if (latest != dataVersion) { + // TODO: Support re-indexing + scribe.warn(s"Data Version: $dataVersion, Latest Version: $latest") } } } @@ -139,7 +138,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: } private def createLuceneFields(field: Field[Doc, _], doc: Doc, state: IndexingState): List[LuceneField] = { - def fs: LuceneField.Store = if (storeMode == StoreMode.All || field.indexed) LuceneField.Store.YES else LuceneField.Store.NO + def fs: LuceneField.Store = if (storeMode.isAll || field.indexed) LuceneField.Store.YES else LuceneField.Store.NO val json = field.getJson(doc, state) var fields = List.empty[LuceneField] def add(field: LuceneField): Unit = fields = field :: fields @@ -220,7 +219,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Option[Doc] = { val filter = Filter.Equals(field, value) - val query = Query[Doc, Model](collection, filter = Some(filter), limit = Some(1)) + val query = Query[Doc, Model](model, this, filter = Some(filter), limit = Some(1)) doSearch[Doc](query, Conversion.Doc()).list.headOption } @@ -234,7 +233,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: state.indexSearcher.count(new MatchAllDocsQuery) override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = - doSearch[Doc](Query[Doc, Model](collection), Conversion.Doc()).iterator + doSearch[Doc](Query[Doc, Model](model, this), Conversion.Doc()).iterator override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = { @@ -334,22 +333,23 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: } } def value[F](scoreDoc: ScoreDoc, field: Field[Doc, F]): F = jsonField[F](scoreDoc, field).as[F](field.rw) - def loadScoreDoc(scoreDoc: ScoreDoc): (Doc, Double) = if (storeMode == StoreMode.All) { - collection.model match { - case c: JsonConversion[Doc] => - val o = obj(fields.map(f => f.name -> jsonField(scoreDoc, f)): _*) - c.convertFromJson(o) -> scoreDoc.score.toDouble - case _ => - val map = fields.map { field => - field.name -> value(scoreDoc, field) - }.toMap - collection.model.map2Doc(map) -> scoreDoc.score.toDouble - } - } else { - val docId = scoreDoc.doc - val id = Id[Doc](storedFields.document(docId).get("_id")) - val score = scoreDoc.score.toDouble - collection(id)(transaction) -> score + def loadScoreDoc(scoreDoc: ScoreDoc): (Doc, Double) = storeMode match { + case StoreMode.All() => + model match { + case c: JsonConversion[Doc] => + val o = obj(fields.map(f => f.name -> jsonField(scoreDoc, f)): _*) + c.convertFromJson(o) -> scoreDoc.score.toDouble + case _ => + val map = fields.map { field => + field.name -> value(scoreDoc, field) + }.toMap + model.map2Doc(map) -> scoreDoc.score.toDouble + } + case StoreMode.Indexes(storage) => + val docId = scoreDoc.doc + val id = Id[Doc](storedFields.document(docId).get("_id")) + val score = scoreDoc.score.toDouble + storage(id) -> score } def docIterator(): Iterator[(Doc, Double)] = scoreDocs.iterator.map(loadScoreDoc) def jsonIterator(fields: List[Field[Doc, _]]): Iterator[(ScoreDoc, Json, Double)] = { @@ -370,22 +370,22 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: case (doc, score) => c(doc) -> score } case Conversion.Materialized(fields) => jsonIterator(fields).map { - case (_, json, score) => MaterializedIndex[Doc, Model](json, collection.model).asInstanceOf[V] -> score + case (_, json, score) => MaterializedIndex[Doc, Model](json, model).asInstanceOf[V] -> score } case Conversion.DocAndIndexes() => jsonIterator(fields.filter(_.indexed)).map { - case (scoreDoc, json, score) => MaterializedAndDoc[Doc, Model](json, collection.model, loadScoreDoc(scoreDoc)._1).asInstanceOf[V] -> score + case (scoreDoc, json, score) => MaterializedAndDoc[Doc, Model](json, model, loadScoreDoc(scoreDoc)._1).asInstanceOf[V] -> score } case Conversion.Json(fields) => jsonIterator(fields).map(t => t._2 -> t._3).asInstanceOf[Iterator[(V, Double)]] case Conversion.Distance(field, from, sort, radius) => idsAndScores.iterator.map { case (id, score) => val state = new IndexingState - val doc = collection(id)(transaction) + val doc = apply(id)(transaction) val distance = field.get(doc, field, state).map(d => Spatial.distance(from, d)) DistanceAndDoc(doc, distance) -> score } } SearchResults( - model = collection.model, + model = model, offset = query.offset, limit = query.limit, total = Some(total), @@ -397,7 +397,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: private def filter2Lucene(filter: Option[Filter[Doc]]): LuceneQuery = filter match { case Some(f) => - val fields = f.fields(collection.model) + val fields = f.fields(model) def parsed(q: String, allowLeading: Boolean = false): LuceneQuery = { val parser = new QueryParser(f.fieldNames.head, this.index.analyzer) parser.setAllowLeadingWildcard(allowLeading) @@ -405,15 +405,15 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: parser.parse(q) } f match { - case f: Filter.Equals[Doc, _] => exactQuery(f.field(collection.model), f.getJson(collection.model)) + case f: Filter.Equals[Doc, _] => exactQuery(f.field(model), f.getJson(model)) case f: Filter.NotEquals[Doc, _] => val b = new BooleanQuery.Builder b.add(new MatchAllDocsQuery, BooleanClause.Occur.MUST) - b.add(exactQuery(f.field(collection.model), f.getJson(collection.model)), BooleanClause.Occur.MUST_NOT) + b.add(exactQuery(f.field(model), f.getJson(model)), BooleanClause.Occur.MUST_NOT) b.build() case f: Filter.Regex[Doc, _] => new RegexpQuery(new Term(f.fieldName, f.expression)) case f: Filter.In[Doc, _] => - val queries = f.getJson(collection.model).map(json => exactQuery(f.field(collection.model), json)) + val queries = f.getJson(model).map(json => exactQuery(f.field(model), json)) val b = new BooleanQuery.Builder b.setMinimumNumberShouldMatch(1) queries.foreach { q => @@ -524,7 +524,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: override def aggregate(query: AggregateQuery[Doc, Model]) (implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] = - Aggregator(query, collection) + Aggregator(query, model) override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Int = aggregate(query).length @@ -541,6 +541,9 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: } object LuceneStore extends StoreManager { - override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, name: String, storeMode: StoreMode): Store[Doc, Model] = - new LuceneStore[Doc, Model](db.directory.map(_.resolve(s"$name.lucene")), storeMode) + override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, + name: String, + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = + new LuceneStore[Doc, Model](name, model, db.directory.map(_.resolve(s"$name.lucene")), storeMode) } \ No newline at end of file diff --git a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala index 578fba94..b8650b57 100644 --- a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala +++ b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala @@ -13,8 +13,10 @@ import org.mapdb.{DB, DBMaker, HTreeMap, Serializer} import java.nio.file.{Files, Path} import scala.jdk.CollectionConverters.IteratorHasAsScala -class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: Option[Path], - val storeMode: StoreMode) extends Store[Doc, Model] { +class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + directory: Option[Path], + val storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) { private lazy val db: DB = { val maker = directory.map { path => Files.createDirectories(path.getParent) @@ -24,10 +26,7 @@ class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: O } private lazy val map: HTreeMap[String, String] = db.hashMap("map", Serializer.STRING, Serializer.STRING).createOrOpen() - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - map.verify() - } + map.verify() override def prepareTransaction(transaction: Transaction[Doc]): Unit = () @@ -82,7 +81,8 @@ class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: O object MapDBStore extends StoreManager { override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = - new MapDBStore[Doc, Model](db.directory.map(_.resolve(name)), storeMode) + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = + new MapDBStore[Doc, Model](name, model, db.directory.map(_.resolve(name)), storeMode) } \ No newline at end of file diff --git a/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStore.scala b/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStore.scala index ac1a6ede..e408685a 100644 --- a/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStore.scala +++ b/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStore.scala @@ -7,9 +7,11 @@ import lightdb.store.StoreMode import java.sql.Connection -class PostgreSQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connectionManager: ConnectionManager, +class PostgreSQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + val connectionManager: ConnectionManager, val connectionShared: Boolean, - val storeMode: StoreMode) extends SQLStore[Doc, Model] { + val storeMode: StoreMode[Doc, Model]) extends SQLStore[Doc, Model](name, model) { protected def tables(connection: Connection): Set[String] = { val ps = connection.prepareStatement("SELECT * FROM information_schema.tables;") try { @@ -31,7 +33,7 @@ class PostgreSQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val con override protected def createUpsertSQL(): String = { val fieldNames = fields.map(_.name) val values = fields.map(field2Value) - s"""MERGE INTO ${collection.name} target + s"""MERGE INTO $name target |USING (VALUES (${values.mkString(", ")})) AS source (${fieldNames.mkString(", ")}) |ON target._id = source._id |WHEN MATCHED THEN diff --git a/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStoreManager.scala b/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStoreManager.scala index 326f9c9f..ee256a27 100644 --- a/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStoreManager.scala +++ b/postgresql/src/main/scala/lightdb/postgresql/PostgreSQLStoreManager.scala @@ -7,7 +7,8 @@ import lightdb.store.{Store, StoreManager, StoreMode} case class PostgreSQLStoreManager(connectionManager: ConnectionManager, connectionShared: Boolean) extends StoreManager { override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = - new PostgreSQLStore[Doc, Model](connectionManager, connectionShared, storeMode) + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = + new PostgreSQLStore[Doc, Model](name, model, connectionManager, connectionShared, storeMode) } diff --git a/postgresql/src/test/scala/spec/PostgreSQLSpec.scala b/postgresql/src/test/scala/spec/PostgreSQLSpec.scala index 57f8ceee..00ad2f02 100644 --- a/postgresql/src/test/scala/spec/PostgreSQLSpec.scala +++ b/postgresql/src/test/scala/spec/PostgreSQLSpec.scala @@ -1,13 +1,14 @@ -package spec - -import lightdb.postgresql.PostgreSQLStoreManager -import lightdb.sql.connect.{HikariConnectionManager, SQLConfig} -import lightdb.store.StoreManager - -class PostgreSQLSpec extends AbstractBasicSpec { - override lazy val storeManager: StoreManager = PostgreSQLStoreManager(HikariConnectionManager(SQLConfig( - jdbcUrl = s"jdbc:postgresql://localhost:5432/basic", - username = Some("postgres"), - password = Some("password") - )), connectionShared = false) -} +// TODO: Figure out why @EmbeddedTest is no longer filtering +//package spec +// +//import lightdb.postgresql.PostgreSQLStoreManager +//import lightdb.sql.connect.{HikariConnectionManager, SQLConfig} +//import lightdb.store.StoreManager +// +//class PostgreSQLSpec extends AbstractBasicSpec { +// override lazy val storeManager: StoreManager = PostgreSQLStoreManager(HikariConnectionManager(SQLConfig( +// jdbcUrl = s"jdbc:postgresql://localhost:5432/basic", +// username = Some("postgres"), +// password = Some("password") +// )), connectionShared = false) +//} diff --git a/redis/src/main/scala/lightdb/redis/RedisStore.scala b/redis/src/main/scala/lightdb/redis/RedisStore.scala index 6d690424..325ce7ad 100644 --- a/redis/src/main/scala/lightdb/redis/RedisStore.scala +++ b/redis/src/main/scala/lightdb/redis/RedisStore.scala @@ -12,19 +12,17 @@ import _root_.redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} import scala.jdk.CollectionConverters.IteratorHasAsScala -class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMode: StoreMode, +class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + val storeMode: StoreMode[Doc, Model], hostname: String = "localhost", - port: Int = 6379) extends Store[Doc, Model] { + port: Int = 6379) extends Store[Doc, Model](name, model) { private lazy val InstanceKey: TransactionKey[Jedis] = TransactionKey("redisInstance") private lazy val config = new JedisPoolConfig private lazy val pool = new JedisPool(config, hostname, port) - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - - pool.preparePool() - } + pool.preparePool() private def getInstance(implicit transaction: Transaction[Doc]): Jedis = transaction.getOrCreate(InstanceKey, pool.getResource) @@ -39,15 +37,15 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMod override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = upsert(doc) override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = - getInstance.hset(collection.name, doc._id.value, toString(doc)) + getInstance.hset(name, doc._id.value, toString(doc)) override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = - getInstance.hexists(collection.name, id.value) + getInstance.hexists(name, id.value) override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Option[Doc] = { if (field == idField) { - Option(getInstance.hget(collection.name, value.asInstanceOf[Id[Doc]].value)).map(fromString) + Option(getInstance.hget(name, value.asInstanceOf[Id[Doc]].value)).map(fromString) } else { throw new UnsupportedOperationException(s"HaloDBStore can only get on _id, but ${field.name} was attempted") } @@ -57,9 +55,9 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMod (implicit transaction: Transaction[Doc]): Boolean = getInstance.hdel(value.asInstanceOf[Id[Doc]].value) > 0L - override def count(implicit transaction: Transaction[Doc]): Int = getInstance.hlen(collection.name).toInt + override def count(implicit transaction: Transaction[Doc]): Int = getInstance.hlen(name).toInt - override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = getInstance.hgetAll(collection.name) + override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = getInstance.hgetAll(name) .values().iterator().asScala.map(fromString) override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) @@ -75,7 +73,7 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val storeMod override def truncate()(implicit transaction: Transaction[Doc]): Int = { val size = count - getInstance.del(collection.name) + getInstance.del(name) size } diff --git a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala index ea66d630..a6a0a57c 100644 --- a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala +++ b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala @@ -14,27 +14,24 @@ import org.rocksdb.{FlushOptions, Options, RocksDB, RocksIterator} import java.nio.file.{Files, Path} -class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: Path, val storeMode: StoreMode) extends Store[Doc, Model] { - private lazy val db: RocksDB = { +class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + directory: Path, + val storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) { + RocksDB.loadLibrary() + private val db: RocksDB = { Files.createDirectories(directory.getParent) val options = new Options() .setCreateIfMissing(true) RocksDB.open(options, directory.toAbsolutePath.toString) } - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - - RocksDB.loadLibrary() - db - } - override def prepareTransaction(transaction: Transaction[Doc]): Unit = () override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = upsert(doc) override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = { - val json = doc.json(collection.model.rw) + val json = doc.json(model.rw) db.put(doc._id.bytes, JsonFormatter.Compact(json).getBytes("UTF-8")) } @@ -52,7 +49,7 @@ class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: private def bytes2Doc(bytes: Array[Byte]): Doc = { val jsonString = new String(bytes, "UTF-8") val json = JsonParser(jsonString) - json.as[Doc](collection.model.rw) + json.as[Doc](model.rw) } override def delete[V](field: UniqueIndex[Doc, V], value: V) @@ -111,7 +108,8 @@ class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: object RocksDBStore extends StoreManager { override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = - new RocksDBStore[Doc, Model](db.directory.get.resolve(name), storeMode) + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = + new RocksDBStore[Doc, Model](name, model, db.directory.get.resolve(name), storeMode) } \ No newline at end of file diff --git a/sql/src/main/scala/lightdb/sql/SQLQueryBuilder.scala b/sql/src/main/scala/lightdb/sql/SQLQueryBuilder.scala index b572f02a..b01b34f4 100644 --- a/sql/src/main/scala/lightdb/sql/SQLQueryBuilder.scala +++ b/sql/src/main/scala/lightdb/sql/SQLQueryBuilder.scala @@ -5,7 +5,7 @@ import lightdb.doc.Document import java.sql.{PreparedStatement, ResultSet, SQLException} -case class SQLQueryBuilder[Doc <: Document[Doc]](collection: Collection[Doc, _], +case class SQLQueryBuilder[Doc <: Document[Doc]](store: SQLStore[Doc, _], state: SQLState[Doc], fields: List[SQLPart] = Nil, filters: List[SQLPart] = Nil, @@ -19,7 +19,7 @@ case class SQLQueryBuilder[Doc <: Document[Doc]](collection: Collection[Doc, _], b.append("SELECT\n") b.append(s"\t${fields.map(_.sql).mkString(", ")}\n") b.append("FROM\n") - b.append(s"\t${collection.name}\n") + b.append(s"\t${store.name}\n") filters.zipWithIndex.foreach { case (f, index) => if (index == 0) { diff --git a/sql/src/main/scala/lightdb/sql/SQLStore.scala b/sql/src/main/scala/lightdb/sql/SQLStore.scala index 04d5b28b..58deab88 100644 --- a/sql/src/main/scala/lightdb/sql/SQLStore.scala +++ b/sql/src/main/scala/lightdb/sql/SQLStore.scala @@ -22,28 +22,25 @@ import lightdb.field.Field._ import java.sql.{Connection, PreparedStatement, ResultSet} import scala.language.implicitConversions -abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] extends Store[Doc, Model] { +abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, model: Model) extends Store[Doc, Model](name, model) { protected def connectionShared: Boolean protected def connectionManager: ConnectionManager - override def init(collection: Collection[Doc, Model]): Unit = { - super.init(collection) - collection.transaction { implicit transaction => - initTransaction() - } + transaction { implicit transaction => + initTransaction() } protected def createTable()(implicit transaction: Transaction[Doc]): Unit = { val entries = fields.collect { case field if !field.rw.definition.className.contains("lightdb.spatial.GeoPoint") => - if (field == collection.model._id) { + if (field == model._id) { "_id VARCHAR NOT NULL PRIMARY KEY" } else { val t = def2Type(field.name, field.rw.definition) s"${field.name} $t" } }.mkString(", ") - executeUpdate(s"CREATE TABLE ${collection.name}($entries)") + executeUpdate(s"CREATE TABLE $name($entries)") } private def def2Type(name: String, d: DefType): String = d match { @@ -57,14 +54,14 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } protected def addColumn(field: Field[Doc, _])(implicit transaction: Transaction[Doc]): Unit = { - scribe.info(s"Adding column ${collection.name}.${field.name}") - executeUpdate(s"ALTER TABLE ${collection.name} ADD COLUMN ${field.name} ${def2Type(field.name, field.rw.definition)}") + scribe.info(s"Adding column $name.${field.name}") + executeUpdate(s"ALTER TABLE $name ADD COLUMN ${field.name} ${def2Type(field.name, field.rw.definition)}") } protected def initTransaction()(implicit transaction: Transaction[Doc]): Unit = { val connection = connectionManager.getConnection val existingTables = tables(connection) - if (!existingTables.contains(collection.name.toLowerCase)) { + if (!existingTables.contains(name.toLowerCase)) { createTable() } @@ -74,8 +71,8 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten // Drop columns existingColumns.foreach { name => if (!fieldNames.contains(name.toLowerCase)) { - scribe.info(s"Removing column ${collection.name}.$name (existing: ${existingColumns.mkString(", ")}, expected: ${fieldNames.mkString(", ")}).") - executeUpdate(s"ALTER TABLE ${collection.name} DROP COLUMN $name") + scribe.info(s"Removing column ${this.name}.$name (existing: ${existingColumns.mkString(", ")}, expected: ${fieldNames.mkString(", ")}).") + executeUpdate(s"ALTER TABLE ${this.name} DROP COLUMN $name") } } // Add columns @@ -90,9 +87,9 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten fields.foreach { case index: UniqueIndex[Doc, _] if index.name == "_id" => // Ignore _id case index: UniqueIndex[Doc, _] => - executeUpdate(s"CREATE UNIQUE INDEX IF NOT EXISTS ${index.name}_idx ON ${collection.name}(${index.name})") + executeUpdate(s"CREATE UNIQUE INDEX IF NOT EXISTS ${index.name}_idx ON $name(${index.name})") case index: Indexed[Doc, _] => - executeUpdate(s"CREATE INDEX IF NOT EXISTS ${index.name}_idx ON ${collection.name}(${index.name})") + executeUpdate(s"CREATE INDEX IF NOT EXISTS ${index.name}_idx ON $name(${index.name})") case _: Field[Doc, _] => // Nothing to do } } @@ -100,7 +97,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten protected def tables(connection: Connection): Set[String] private def columns(connection: Connection): Set[String] = { - val ps = connection.prepareStatement(s"SELECT * FROM ${collection.name} LIMIT 1") + val ps = connection.prepareStatement(s"SELECT * FROM $name LIMIT 1") try { val rs = ps.executeQuery() val meta = rs.getMetaData @@ -114,7 +111,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten override def prepareTransaction(transaction: Transaction[Doc]): Unit = transaction.put( key = StateKey[Doc], - value = SQLState(connectionManager, transaction, this, collection.cacheQueries) + value = SQLState(connectionManager, transaction, this, Collection.CacheQueries) ) protected def field2Value(field: Field[Doc, _]): String = "?" @@ -125,12 +122,12 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten protected def createInsertSQL(): String = { val values = fields.map(field2Value) - s"$insertPrefix INTO ${collection.name}(${fields.map(_.name).mkString(", ")}) VALUES(${values.mkString(", ")})" + s"$insertPrefix INTO $name(${fields.map(_.name).mkString(", ")}) VALUES(${values.mkString(", ")})" } protected def createUpsertSQL(): String = { val values = fields.map(field2Value) - s"$upsertPrefix INTO ${collection.name}(${fields.map(_.name).mkString(", ")}) VALUES(${values.mkString(", ")})" + s"$upsertPrefix INTO $name(${fields.map(_.name).mkString(", ")}) VALUES(${values.mkString(", ")})" } private[sql] lazy val insertSQL: String = createInsertSQL() @@ -145,7 +142,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } ps.addBatch() state.batchInsert.incrementAndGet() - if (state.batchInsert.get() >= collection.maxInsertBatch) { + if (state.batchInsert.get() >= Collection.MaxInsertBatch) { ps.executeBatch() state.batchInsert.set(0) } @@ -161,7 +158,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } ps.addBatch() state.batchUpsert.incrementAndGet() - if (state.batchUpsert.get() >= collection.maxInsertBatch) { + if (state.batchUpsert.get() >= Collection.MaxInsertBatch) { ps.executeBatch() state.batchUpsert.set(0) } @@ -174,7 +171,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten (implicit transaction: Transaction[Doc]): Option[Doc] = { val state = getState val b = new SQLQueryBuilder[Doc]( - collection = collection, + store = this, state = state, fields = fields.map(f => SQLPart(f.name)), filters = List(filter2Part(field === value)), @@ -201,7 +198,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten override def delete[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Boolean = { val connection = connectionManager.getConnection - val ps = connection.prepareStatement(s"DELETE FROM ${collection.name} WHERE ${field.name} = ?") + val ps = connection.prepareStatement(s"DELETE FROM $name WHERE ${field.name} = ?") try { SQLArg.FieldArg(field, value).set(ps, 1) ps.executeUpdate() > 0 @@ -211,7 +208,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } override def count(implicit transaction: Transaction[Doc]): Int = { - val rs = executeQuery(s"SELECT COUNT(*) FROM ${collection.name}") + val rs = executeQuery(s"SELECT COUNT(*) FROM $name") try { rs.next() rs.getInt(1) @@ -225,7 +222,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten val connection = connectionManager.getConnection val s = connection.createStatement() state.register(s) - val rs = s.executeQuery(s"SELECT * FROM ${collection.name}") + val rs = s.executeQuery(s"SELECT * FROM $name") state.register(rs) rs2Iterator(rs, Conversion.Doc()) } @@ -236,10 +233,14 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten (1 to count).toList.map(index => meta.getColumnName(index)) } - private def getDoc(rs: ResultSet): Doc = collection.model match { - case _ if storeMode == StoreMode.Indexes => - val id = Id[Doc](rs.getString("_id")) - collection.t(_ => collection.model.asInstanceOf[DocumentModel[_]]._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]] -> id) + private def getDoc(rs: ResultSet)(implicit transaction: Transaction[Doc]): Doc = model match { + case _ if storeMode.isIndexes => + storeMode match { + case StoreMode.Indexes(storage) => + val id = Id[Doc](rs.getString("_id")) + storage(id) + case _ => throw new UnsupportedOperationException("This should not be possible") + } case c: SQLConversion[Doc] => c.convertFromSQL(rs) case c: JsonConversion[Doc] => val values = fields.map { field => @@ -257,7 +258,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } catch { case t: Throwable => val columnNames = getColumnNames(rs).mkString(", ") - throw new RuntimeException(s"Unable to get ${collection.name}.${field.name} from [$columnNames]", t) + throw new RuntimeException(s"Unable to get $name.${field.name} from [$columnNames]", t) } } c.convertFromJson(obj(values: _*)) @@ -265,10 +266,11 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten val map = fields.map { field => field.name -> obj2Value(rs.getObject(field.name)) }.toMap - collection.model.map2Doc(map) + model.map2Doc(map) } - private def rs2Iterator[V](rs: ResultSet, conversion: Conversion[Doc, V]): Iterator[V] = new Iterator[V] { + private def rs2Iterator[V](rs: ResultSet, conversion: Conversion[Doc, V]) + (implicit transaction: Transaction[Doc]): Iterator[V] = new Iterator[V] { override def hasNext: Boolean = rs.next() override def next(): V = { @@ -280,11 +282,11 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten case Conversion.Converted(c) => c(getDoc(rs)) case Conversion.Materialized(fields) => val json = jsonFromFields(fields) - MaterializedIndex[Doc, Model](json, collection.model).asInstanceOf[V] + MaterializedIndex[Doc, Model](json, model).asInstanceOf[V] case Conversion.DocAndIndexes() => val json = jsonFromFields(fields.filter(_.indexed)) val doc = getDoc(rs) - MaterializedAndDoc[Doc, Model](json, collection.model, doc).asInstanceOf[V] + MaterializedAndDoc[Doc, Model](json, model, doc).asInstanceOf[V] case Conversion.Json(fields) => jsonFromFields(fields).asInstanceOf[V] case Conversion.Distance(field, _, _, _) => @@ -342,7 +344,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten case Conversion.Value(field) => List(field) case Conversion.Doc() | Conversion.Converted(_) => this.fields case Conversion.Materialized(fields) => fields - case Conversion.DocAndIndexes() => if (storeMode == StoreMode.Indexes) { + case Conversion.DocAndIndexes() => if (storeMode.isIndexes) { this.fields.filter(_.indexed) } else { this.fields @@ -354,7 +356,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } val state = getState val b = SQLQueryBuilder( - collection = collection, + store = this, state = state, fields = fields.map(f => fieldPart(f)) ::: extraFields, filters = query.filter.map(filter2Part).toList, @@ -381,7 +383,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten val ps = rs.getStatement.asInstanceOf[PreparedStatement] val iteratorWithScore = ActionIterator(iterator.map(v => v -> 0.0), onClose = () => state.returnPreparedStatement(b.sql, ps)) SearchResults( - model = collection.model, + model = model, offset = query.offset, limit = query.limit, total = total, @@ -436,7 +438,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten case t => throw new UnsupportedOperationException(s"Unsupported sort: $t") } SQLQueryBuilder( - collection = collection, + store = this, state = getState, fields = fields, filters = filters, @@ -490,7 +492,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } f.name -> json }: _*) - MaterializedAggregate[Doc, Model](json, collection.model) + MaterializedAggregate[Doc, Model](json, model) } } @@ -505,26 +507,26 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten private def filter2Part(f: Filter[Doc]): SQLPart = f match { case f: Filter.DrillDownFacetFilter[Doc] => throw new UnsupportedOperationException(s"SQLStore does not support Facets: $f") - case f: Filter.Equals[Doc, _] if f.field(collection.model).isArr => - val values = f.getJson(collection.model).asVector + case f: Filter.Equals[Doc, _] if f.field(model).isArr => + val values = f.getJson(model).asVector val parts = values.map { json => val jsonString = JsonFormatter.Compact(json) SQLPart(s"${f.fieldName} LIKE ?", List(SQLArg.StringArg(s"%$jsonString%"))) } SQLPart.merge(parts: _*) case f: Filter.Equals[Doc, _] if f.value == null | f.value == None => SQLPart(s"${f.fieldName} IS NULL") - case f: Filter.Equals[Doc, _] => SQLPart(s"${f.fieldName} = ?", List(SQLArg.FieldArg(f.field(collection.model), f.value))) - case f: Filter.NotEquals[Doc, _] if f.field(collection.model).isArr => - val values = f.getJson(collection.model).asVector + case f: Filter.Equals[Doc, _] => SQLPart(s"${f.fieldName} = ?", List(SQLArg.FieldArg(f.field(model), f.value))) + case f: Filter.NotEquals[Doc, _] if f.field(model).isArr => + val values = f.getJson(model).asVector val parts = values.map { json => val jsonString = JsonFormatter.Compact(json) SQLPart(s"${f.fieldName} NOT LIKE ?", List(SQLArg.StringArg(s"%$jsonString%"))) } SQLPart.merge(parts: _*) case f: Filter.NotEquals[Doc, _] if f.value == null | f.value == None => SQLPart(s"${f.fieldName} IS NOT NULL") - case f: Filter.NotEquals[Doc, _] => SQLPart(s"${f.fieldName} != ?", List(SQLArg.FieldArg(f.field(collection.model), f.value))) + case f: Filter.NotEquals[Doc, _] => SQLPart(s"${f.fieldName} != ?", List(SQLArg.FieldArg(f.field(model), f.value))) case f: Filter.Regex[Doc, _] => SQLPart(s"${f.fieldName} REGEXP ?", List(SQLArg.StringArg(f.expression))) - case f: Filter.In[Doc, _] => SQLPart(s"${f.fieldName} IN (${f.values.map(_ => "?").mkString(", ")})", f.values.toList.map(v => SQLArg.FieldArg(f.field(collection.model), v))) + case f: Filter.In[Doc, _] => SQLPart(s"${f.fieldName} IN (${f.values.map(_ => "?").mkString(", ")})", f.values.toList.map(v => SQLArg.FieldArg(f.field(model), v))) case f: Filter.RangeLong[Doc] => (f.from, f.to) match { case (Some(from), Some(to)) => SQLPart(s"${f.fieldName} BETWEEN ? AND ?", List(SQLArg.LongArg(from), SQLArg.LongArg(to))) case (None, Some(to)) => SQLPart(s"${f.fieldName} <= ?", List(SQLArg.LongArg(to))) @@ -618,7 +620,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten override def truncate()(implicit transaction: Transaction[Doc]): Int = { val connection = connectionManager.getConnection - val ps = connection.prepareStatement(s"DELETE FROM ${collection.name}") + val ps = connection.prepareStatement(s"DELETE FROM $name") try { ps.executeUpdate() } finally { diff --git a/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala b/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala index b98c6948..0075a398 100644 --- a/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala +++ b/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala @@ -22,13 +22,15 @@ import java.nio.file.{Files, Path, StandardCopyOption} import java.sql.Connection import java.util.regex.Pattern -class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connectionManager: ConnectionManager, +class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + val connectionManager: ConnectionManager, val connectionShared: Boolean, - val storeMode: StoreMode) extends SQLStore[Doc, Model] { + val storeMode: StoreMode[Doc, Model]) extends SQLStore[Doc, Model](name, model) { override protected def initTransaction()(implicit transaction: Transaction[Doc]): Unit = { val c = connectionManager.getConnection if (hasSpatial) { - scribe.info(s"${collection.name} has spatial features. Enabling...") + scribe.info(s"$name has spatial features. Enabling...") org.sqlite.Function.create(c, "DISTANCE", new org.sqlite.Function() { override def xFunc(): Unit = { def s(index: Int): List[Geo] = Option(value_text(index)) @@ -113,8 +115,13 @@ object SQLiteStore extends StoreManager { )) } - def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](file: Option[Path], storeMode: StoreMode): SQLiteStore[Doc, Model] = { + def apply[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, + model: Model, + file: Option[Path], + storeMode: StoreMode[Doc, Model]): SQLiteStore[Doc, Model] = { new SQLiteStore[Doc, Model]( + name = name, + model = model, connectionManager = singleConnectionManager(file), connectionShared = false, storeMode = storeMode @@ -122,16 +129,19 @@ object SQLiteStore extends StoreManager { } override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB, + model: Model, name: String, - storeMode: StoreMode): Store[Doc, Model] = { + storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = { db.get(SQLDatabase.Key) match { case Some(sqlDB) => new SQLiteStore[Doc, Model]( - connectionManager = sqlDB.connectionManager, - connectionShared = true, - storeMode - ) - case None => apply[Doc, Model](db.directory.map(_.resolve(s"$name.sqlite")), storeMode) + name = name, + model = model, + connectionManager = sqlDB.connectionManager, + connectionShared = true, + storeMode = storeMode + ) + case None => apply[Doc, Model](name, model, db.directory.map(_.resolve(s"$name.sqlite")), storeMode) } }