From fd638f4acb051bd1c756689ce8952ade8cf662c1 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Fri, 30 Aug 2024 09:25:08 -0500 Subject: [PATCH] Improvements to Aggregator Added shutdown hook to properly dispose Added active transaction tracking and disposal during shutdown Added DatabaseRestore.restore to restore a single collection Fixed bug in Filter.&& and Filter.|| not setting minShould properly Fix for Lucene with minShould and no should conditions Fix for SQLiteStore to only enable spatial functionality if the collection uses spatial Added basic "should" support in SQLStore Added convenience synchronized modify to StoredValue --- core/src/main/scala/lightdb/LightDB.scala | 13 +++++---- core/src/main/scala/lightdb/StoredValue.scala | 8 ++++- .../lightdb/backup/DatabaseRestore.scala | 20 +++++++++++++ .../scala/lightdb/collection/Collection.scala | 27 +++++++++++++++-- .../main/scala/lightdb/filter/Filter.scala | 4 +-- .../main/scala/lightdb/util/Aggregator.scala | 9 ++++-- .../scala/lightdb/lucene/LuceneStore.scala | 4 +-- sql/src/main/scala/lightdb/sql/SQLStore.scala | 18 +++++++++--- .../main/scala/lightdb/sql/SQLiteStore.scala | 29 ++++++++++++++----- 9 files changed, 107 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/lightdb/LightDB.scala b/core/src/main/scala/lightdb/LightDB.scala index b5effe07..67b4427f 100644 --- a/core/src/main/scala/lightdb/LightDB.scala +++ b/core/src/main/scala/lightdb/LightDB.scala @@ -88,6 +88,10 @@ trait LightDB extends Initializable with FeatureSupport[DBFeatureKey] { scribe.info(s"Applying ${upgrades.length} upgrades (${upgrades.map(_.label).mkString(", ")})...") doUpgrades(upgrades, stillBlocking = true) } + // Setup shutdown hook + Runtime.getRuntime.addShutdownHook(new Thread(() => { + dispose() + })) // Set initialized databaseInitialized.set(true) } @@ -166,12 +170,11 @@ trait LightDB extends Initializable with FeatureSupport[DBFeatureKey] { case Some(upgrade) => val continueBlocking = upgrades.exists(_.blockStartup) upgrade.upgrade(this) - val applied = appliedUpgrades.get() - appliedUpgrades.set(applied + upgrade.label) + appliedUpgrades.modify { set => + set + upgrade.label + } if (stillBlocking && !continueBlocking) { - scribe.Platform.executionContext.execute(new Runnable { - override def run(): Unit = doUpgrades(upgrades.tail, continueBlocking) - }) + scribe.Platform.executionContext.execute(() => doUpgrades(upgrades.tail, continueBlocking)) } else { doUpgrades(upgrades.tail, continueBlocking) } diff --git a/core/src/main/scala/lightdb/StoredValue.scala b/core/src/main/scala/lightdb/StoredValue.scala index 22fe5484..0a536c0a 100644 --- a/core/src/main/scala/lightdb/StoredValue.scala +++ b/core/src/main/scala/lightdb/StoredValue.scala @@ -6,7 +6,7 @@ import lightdb.collection.Collection case class StoredValue[T](key: String, collection: Collection[KeyValue, KeyValue.type], default: () => T, - persistence: Persistence)(implicit rw: RW[T]) { + persistence: Persistence)(implicit rw: RW[T]) { stored => private lazy val id = Id[KeyValue](key) private var cached: Option[T] = None @@ -46,6 +46,12 @@ case class StoredValue[T](key: String, } } + def modify(f: T => T): T = synchronized { + val current = get() + val modified = f(current) + set(modified) + } + def clear(): Unit = collection.transaction { implicit transaction => if (collection.delete(_._id -> id)) { cached = None diff --git a/core/src/main/scala/lightdb/backup/DatabaseRestore.scala b/core/src/main/scala/lightdb/backup/DatabaseRestore.scala index 911a98c2..bde7d241 100644 --- a/core/src/main/scala/lightdb/backup/DatabaseRestore.scala +++ b/core/src/main/scala/lightdb/backup/DatabaseRestore.scala @@ -3,6 +3,7 @@ package lightdb.backup import fabric.io.JsonParser import lightdb.LightDB import lightdb.collection.Collection +import lightdb.doc.{Document, DocumentModel} import java.io.File import java.util.zip.ZipFile @@ -40,6 +41,25 @@ object DatabaseRestore { } } + def restore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: Collection[Doc, Model], + file: File, + truncate: Boolean = true): Int = { + if (file.exists()) { + if (truncate) collection.t.truncate() + val source = Source.fromFile(file) + try { + val iterator = source + .getLines() + .map(s => JsonParser(s)) + collection.t.json.insert(iterator) + } finally { + source.close() + } + } else { + throw new RuntimeException(s"${file.getAbsolutePath} doesn't exist") + } + } + private def process(db: LightDB, truncate: Boolean)(f: Collection[_, _] => Option[Source]): Int = { db.collections.map { collection => f(collection) match { diff --git a/core/src/main/scala/lightdb/collection/Collection.scala b/core/src/main/scala/lightdb/collection/Collection.scala index e25a9283..c1ee147a 100644 --- a/core/src/main/scala/lightdb/collection/Collection.scala +++ b/core/src/main/scala/lightdb/collection/Collection.scala @@ -10,6 +10,9 @@ import lightdb.transaction.Transaction import lightdb.util.Initializable import lightdb.{Field, Id, Query, Unique, UniqueIndex} +import java.util.concurrent.ConcurrentHashMap +import scala.jdk.CollectionConverters.IteratorHasAsScala + case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, model: Model, loadStore: () => Store[Doc, Model], @@ -44,6 +47,10 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def reIndex(): Boolean = store.reIndex() object transaction { + private val set = ConcurrentHashMap.newKeySet[Transaction[Doc]] + + def active: Int = set.size() + def apply[Return](f: Transaction[Doc] => Return): Return = { val transaction = create() try { @@ -55,13 +62,24 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def create(): Transaction[Doc] = { if (Collection.LogTransactions) scribe.info(s"Creating new Transaction for $name") - store.createTransaction() + val transaction = store.createTransaction() + set.add(transaction) + transaction } def release(transaction: Transaction[Doc]): Unit = { if (Collection.LogTransactions) scribe.info(s"Releasing Transaction for $name") store.releaseTransaction(transaction) transaction.close() + set.remove(transaction) + } + + def releaseAll(): Int = { + val list = set.iterator().asScala.toList + list.foreach { transaction => + release(transaction) + } + list.size } } @@ -218,7 +236,12 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def truncate()(implicit transaction: Transaction[Doc]): Int = store.truncate() - def dispose(): Unit = { + def dispose(): Unit = try { + val transactions = transaction.releaseAll() + if (transactions > 0) { + scribe.warn(s"Released $transactions active transactions") + } + } finally { store.dispose() } } diff --git a/core/src/main/scala/lightdb/filter/Filter.scala b/core/src/main/scala/lightdb/filter/Filter.scala index 2f37852a..44dc22c5 100644 --- a/core/src/main/scala/lightdb/filter/Filter.scala +++ b/core/src/main/scala/lightdb/filter/Filter.scala @@ -12,7 +12,7 @@ sealed trait Filter[Doc] { Filter.Builder[Doc](minShould = b1.minShould, filters = b1.filters ::: b2.filters) case (_, b: Filter.Builder[Doc]) => b.must(this) case (b: Filter.Builder[Doc], _) => b.must(that) - case _ => Filter.Builder[Doc]().must(this).must(that) + case _ => Filter.Builder[Doc](minShould = 1).must(this).must(that) } def ||(that: Filter[Doc]): Filter[Doc] = (this, that) match { @@ -20,7 +20,7 @@ sealed trait Filter[Doc] { Filter.Builder[Doc](minShould = b1.minShould, filters = b1.filters ::: b2.filters) case (_, b: Filter.Builder[Doc]) => b.should(this) case (b: Filter.Builder[Doc], _) => b.should(that) - case _ => Filter.Builder[Doc]().should(this).should(that) + case _ => Filter.Builder[Doc](minShould = 1).should(this).should(that) } } diff --git a/core/src/main/scala/lightdb/util/Aggregator.scala b/core/src/main/scala/lightdb/util/Aggregator.scala index 03b7f2d3..ef7c17b8 100644 --- a/core/src/main/scala/lightdb/util/Aggregator.scala +++ b/core/src/main/scala/lightdb/util/Aggregator.scala @@ -1,7 +1,7 @@ package lightdb.util import fabric.rw._ -import fabric.{Json, Num, NumDec, NumInt, Obj, Str, num} +import fabric.{Json, Null, Num, NumDec, NumInt, Obj, Str, num} import lightdb.Field import lightdb.SortDirection.Ascending import lightdb.aggregate.{AggregateQuery, AggregateType} @@ -38,6 +38,7 @@ object Aggregator { case Some(c) => num(bd.max(c.asBigDecimal)) case None => num(bd) } + case Null => current.getOrElse(Null) case _ => throw new UnsupportedOperationException(s"Unsupported type for Max: $value (${f.field.name})") } case AggregateType.Min => value match { @@ -49,6 +50,7 @@ object Aggregator { case Some(c) => num(bd.min(c.asBigDecimal)) case None => num(bd) } + case Null => current.getOrElse(Null) case _ => throw new UnsupportedOperationException(s"Unsupported type for Min: $value (${f.field.name})") } case AggregateType.Avg => @@ -66,6 +68,7 @@ object Aggregator { case Some(c) => num(bd + c.asBigDecimal) case None => num(bd) } + case Null => current.getOrElse(Null) case _ => throw new UnsupportedOperationException(s"Unsupported type for Sum: $value (${f.field.name})") } case AggregateType.Count => current match { @@ -83,7 +86,9 @@ object Aggregator { } case _ => throw new UnsupportedOperationException(s"Unsupported type for ${f.`type`}: $value (${f.field.name})") } - map += f.name -> newValue + if (newValue != Null) { + map += f.name -> newValue + } } groups += group -> map } diff --git a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala index 2099b58b..94bc17cd 100644 --- a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala +++ b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala @@ -259,7 +259,8 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: LatLonPoint.newDistanceQuery(index.name, from.latitude, from.longitude, radius.toMeters) case Filter.Builder(minShould, clauses) => val b = new BooleanQuery.Builder - b.setMinimumNumberShouldMatch(minShould) + val hasShould = clauses.exists(c => c.condition == Condition.Should || c.condition == Condition.Filter) + b.setMinimumNumberShouldMatch(if (hasShould) minShould else 0) clauses.foreach { c => val q = filter2Lucene(Some(c.filter)) val query = c.boost match { @@ -306,7 +307,6 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: } private def sort2SortField(sort: Sort): SortField = { - sort match { case Sort.BestMatch => SortField.FIELD_SCORE case Sort.IndexOrder => SortField.FIELD_DOC diff --git a/sql/src/main/scala/lightdb/sql/SQLStore.scala b/sql/src/main/scala/lightdb/sql/SQLStore.scala index c178011b..8faffb9c 100644 --- a/sql/src/main/scala/lightdb/sql/SQLStore.scala +++ b/sql/src/main/scala/lightdb/sql/SQLStore.scala @@ -523,7 +523,18 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten SQLPart(parts.map(_ => s"${f.field.name} LIKE ?").mkString(" AND "), parts.map(s => SQLArg.StringArg(s))) case f: Filter.Distance[Doc] => distanceFilter(f) case f: Filter.Builder[Doc] => - val parts = f.filters.map { fc => + val (shoulds, others) = f.filters.partition(f => f.condition == Condition.Filter || f.condition == Condition.Should) + if (f.minShould != 1 && shoulds.nonEmpty) { + throw new UnsupportedOperationException("Should filtering only works in SQL for exactly one condition") + } + val shouldParts = shoulds.map(fc => filter2Part(fc.filter)) match { + case Nil => Nil + case list => List(SQLPart( + sql = list.map(_.sql).mkString("(", " OR ", ")"), + args = list.flatMap(_.args) + )) + } + val parts = others.map { fc => if (fc.boost.nonEmpty) throw new UnsupportedOperationException("Boost not supported in SQL") fc.condition match { case Condition.Must => filter2Part(fc.filter) @@ -534,11 +545,10 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten } else { p.copy(sql = p.sql.replace(" = ", " != ")) } - case Condition.Filter => throw new UnsupportedOperationException("Filter condition not supported in SQL") - case Condition.Should => throw new UnsupportedOperationException("Should condition not supported in SQL") + case f => throw new UnsupportedOperationException(s"$f condition not supported in SQL") } } - SQLPart.merge(parts: _*) + SQLPart.merge(parts ::: shouldParts: _*) } private def af2Part(f: AggregateFilter[Doc]): SQLPart = f match { diff --git a/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala b/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala index b2ec8c70..c84daad5 100644 --- a/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala +++ b/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala @@ -3,6 +3,7 @@ package lightdb.sql import fabric._ import fabric.define.DefType import fabric.rw._ +import lightdb.collection.Collection import lightdb.sql.connect.{ConnectionManager, DBCPConnectionManager, SQLConfig, SingleConnectionManager} import lightdb.{Field, LightDB} import lightdb.doc.{Document, DocumentModel} @@ -22,6 +23,19 @@ class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connect private val PointRegex = """POINT\((.+) (.+)\)""".r private val OptPointRegex = """\[POINT\((.+) (.+)\)\]""".r + override protected def initTransaction()(implicit transaction: Transaction[Doc]): Unit = { + if (hasSpatial) { + scribe.info(s"${collection.name} has spatial features. Enabling...") + val c = connectionManager.getConnection + val s = c.createStatement() + s.executeUpdate(s"SELECT load_extension('${SQLiteStore.spatialitePath}');") + val hasGeometryColumns = this.tables(c).contains("geometry_columns") + if (!hasGeometryColumns) s.executeUpdate("SELECT InitSpatialMetaData()") + s.close() + } + super.initTransaction() + } + override protected def tables(connection: Connection): Set[String] = SQLiteStore.tables(connection) override protected def toJson(value: Any, rw: RW[_]): Json = { @@ -69,6 +83,14 @@ class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connect } } + private lazy val hasSpatial: Boolean = fields.exists { field => + val className = field.rw.definition match { + case DefType.Opt(d) => d.className + case d => d.className + } + className.contains("lightdb.spatial.GeoPoint") + } + override protected def extraFieldsForDistance(d: Conversion.Distance[Doc]): List[SQLPart] = { List(SQLPart(s"ST_Distance(GeomFromText('POINT(${d.from.longitude} ${d.from.latitude})', 4326), ${d.field.name}, true) AS ${d.field.name}Distance")) } @@ -111,13 +133,6 @@ object SQLiteStore extends StoreManager { try { val c = config.createConnection(uri) c.setAutoCommit(false) - - val s = c.createStatement() - s.executeUpdate(s"SELECT load_extension('$spatialitePath');") - val hasGeometryColumns = this.tables(c).contains("geometry_columns") - if (!hasGeometryColumns) s.executeUpdate("SELECT InitSpatialMetaData()") - s.close() - c } catch { case t: Throwable => throw new RuntimeException(s"Error establishing SQLite connection to $uri", t)