Skip to content

Commit

Permalink
Improvements to Aggregator
Browse files Browse the repository at this point in the history
Added shutdown hook to properly dispose
Added active transaction tracking and disposal during shutdown
Added DatabaseRestore.restore to restore a single collection
Fixed bug in Filter.&& and Filter.|| not setting minShould properly
Fix for Lucene with minShould and no should conditions
Fix for SQLiteStore to only enable spatial functionality if the collection uses spatial
Added basic "should" support in SQLStore
Added convenience synchronized modify to StoredValue
  • Loading branch information
darkfrog26 committed Aug 30, 2024
1 parent dfaa45b commit fd638f4
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 25 deletions.
13 changes: 8 additions & 5 deletions core/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ trait LightDB extends Initializable with FeatureSupport[DBFeatureKey] {
scribe.info(s"Applying ${upgrades.length} upgrades (${upgrades.map(_.label).mkString(", ")})...")
doUpgrades(upgrades, stillBlocking = true)
}
// Setup shutdown hook
Runtime.getRuntime.addShutdownHook(new Thread(() => {
dispose()
}))
// Set initialized
databaseInitialized.set(true)
}
Expand Down Expand Up @@ -166,12 +170,11 @@ trait LightDB extends Initializable with FeatureSupport[DBFeatureKey] {
case Some(upgrade) =>
val continueBlocking = upgrades.exists(_.blockStartup)
upgrade.upgrade(this)
val applied = appliedUpgrades.get()
appliedUpgrades.set(applied + upgrade.label)
appliedUpgrades.modify { set =>
set + upgrade.label
}
if (stillBlocking && !continueBlocking) {
scribe.Platform.executionContext.execute(new Runnable {
override def run(): Unit = doUpgrades(upgrades.tail, continueBlocking)
})
scribe.Platform.executionContext.execute(() => doUpgrades(upgrades.tail, continueBlocking))
} else {
doUpgrades(upgrades.tail, continueBlocking)
}
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/lightdb/StoredValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import lightdb.collection.Collection
case class StoredValue[T](key: String,
collection: Collection[KeyValue, KeyValue.type],
default: () => T,
persistence: Persistence)(implicit rw: RW[T]) {
persistence: Persistence)(implicit rw: RW[T]) { stored =>
private lazy val id = Id[KeyValue](key)

private var cached: Option[T] = None
Expand Down Expand Up @@ -46,6 +46,12 @@ case class StoredValue[T](key: String,
}
}

def modify(f: T => T): T = synchronized {
val current = get()
val modified = f(current)
set(modified)
}

def clear(): Unit = collection.transaction { implicit transaction =>
if (collection.delete(_._id -> id)) {
cached = None
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/lightdb/backup/DatabaseRestore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lightdb.backup
import fabric.io.JsonParser
import lightdb.LightDB
import lightdb.collection.Collection
import lightdb.doc.{Document, DocumentModel}

import java.io.File
import java.util.zip.ZipFile
Expand Down Expand Up @@ -40,6 +41,25 @@ object DatabaseRestore {
}
}

def restore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: Collection[Doc, Model],
file: File,
truncate: Boolean = true): Int = {
if (file.exists()) {
if (truncate) collection.t.truncate()
val source = Source.fromFile(file)
try {
val iterator = source
.getLines()
.map(s => JsonParser(s))
collection.t.json.insert(iterator)
} finally {
source.close()
}
} else {
throw new RuntimeException(s"${file.getAbsolutePath} doesn't exist")
}
}

private def process(db: LightDB, truncate: Boolean)(f: Collection[_, _] => Option[Source]): Int = {
db.collections.map { collection =>
f(collection) match {
Expand Down
27 changes: 25 additions & 2 deletions core/src/main/scala/lightdb/collection/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import lightdb.transaction.Transaction
import lightdb.util.Initializable
import lightdb.{Field, Id, Query, Unique, UniqueIndex}

import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.IteratorHasAsScala

case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String,
model: Model,
loadStore: () => Store[Doc, Model],
Expand Down Expand Up @@ -44,6 +47,10 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S
def reIndex(): Boolean = store.reIndex()

object transaction {
private val set = ConcurrentHashMap.newKeySet[Transaction[Doc]]

def active: Int = set.size()

def apply[Return](f: Transaction[Doc] => Return): Return = {
val transaction = create()
try {
Expand All @@ -55,13 +62,24 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S

def create(): Transaction[Doc] = {
if (Collection.LogTransactions) scribe.info(s"Creating new Transaction for $name")
store.createTransaction()
val transaction = store.createTransaction()
set.add(transaction)
transaction
}

def release(transaction: Transaction[Doc]): Unit = {
if (Collection.LogTransactions) scribe.info(s"Releasing Transaction for $name")
store.releaseTransaction(transaction)
transaction.close()
set.remove(transaction)
}

def releaseAll(): Int = {
val list = set.iterator().asScala.toList
list.foreach { transaction =>
release(transaction)
}
list.size
}
}

Expand Down Expand Up @@ -218,7 +236,12 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S

def truncate()(implicit transaction: Transaction[Doc]): Int = store.truncate()

def dispose(): Unit = {
def dispose(): Unit = try {
val transactions = transaction.releaseAll()
if (transactions > 0) {
scribe.warn(s"Released $transactions active transactions")
}
} finally {
store.dispose()
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/lightdb/filter/Filter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ sealed trait Filter[Doc] {
Filter.Builder[Doc](minShould = b1.minShould, filters = b1.filters ::: b2.filters)
case (_, b: Filter.Builder[Doc]) => b.must(this)
case (b: Filter.Builder[Doc], _) => b.must(that)
case _ => Filter.Builder[Doc]().must(this).must(that)
case _ => Filter.Builder[Doc](minShould = 1).must(this).must(that)
}

def ||(that: Filter[Doc]): Filter[Doc] = (this, that) match {
case (b1: Filter.Builder[Doc], b2: Filter.Builder[Doc]) if b1.minShould == b2.minShould =>
Filter.Builder[Doc](minShould = b1.minShould, filters = b1.filters ::: b2.filters)
case (_, b: Filter.Builder[Doc]) => b.should(this)
case (b: Filter.Builder[Doc], _) => b.should(that)
case _ => Filter.Builder[Doc]().should(this).should(that)
case _ => Filter.Builder[Doc](minShould = 1).should(this).should(that)
}
}

Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/lightdb/util/Aggregator.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package lightdb.util

import fabric.rw._
import fabric.{Json, Num, NumDec, NumInt, Obj, Str, num}
import fabric.{Json, Null, Num, NumDec, NumInt, Obj, Str, num}
import lightdb.Field
import lightdb.SortDirection.Ascending
import lightdb.aggregate.{AggregateQuery, AggregateType}
Expand Down Expand Up @@ -38,6 +38,7 @@ object Aggregator {
case Some(c) => num(bd.max(c.asBigDecimal))
case None => num(bd)
}
case Null => current.getOrElse(Null)
case _ => throw new UnsupportedOperationException(s"Unsupported type for Max: $value (${f.field.name})")
}
case AggregateType.Min => value match {
Expand All @@ -49,6 +50,7 @@ object Aggregator {
case Some(c) => num(bd.min(c.asBigDecimal))
case None => num(bd)
}
case Null => current.getOrElse(Null)
case _ => throw new UnsupportedOperationException(s"Unsupported type for Min: $value (${f.field.name})")
}
case AggregateType.Avg =>
Expand All @@ -66,6 +68,7 @@ object Aggregator {
case Some(c) => num(bd + c.asBigDecimal)
case None => num(bd)
}
case Null => current.getOrElse(Null)
case _ => throw new UnsupportedOperationException(s"Unsupported type for Sum: $value (${f.field.name})")
}
case AggregateType.Count => current match {
Expand All @@ -83,7 +86,9 @@ object Aggregator {
}
case _ => throw new UnsupportedOperationException(s"Unsupported type for ${f.`type`}: $value (${f.field.name})")
}
map += f.name -> newValue
if (newValue != Null) {
map += f.name -> newValue
}
}
groups += group -> map
}
Expand Down
4 changes: 2 additions & 2 deletions lucene/src/main/scala/lightdb/lucene/LuceneStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
LatLonPoint.newDistanceQuery(index.name, from.latitude, from.longitude, radius.toMeters)
case Filter.Builder(minShould, clauses) =>
val b = new BooleanQuery.Builder
b.setMinimumNumberShouldMatch(minShould)
val hasShould = clauses.exists(c => c.condition == Condition.Should || c.condition == Condition.Filter)
b.setMinimumNumberShouldMatch(if (hasShould) minShould else 0)
clauses.foreach { c =>
val q = filter2Lucene(Some(c.filter))
val query = c.boost match {
Expand Down Expand Up @@ -306,7 +307,6 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
}

private def sort2SortField(sort: Sort): SortField = {

sort match {
case Sort.BestMatch => SortField.FIELD_SCORE
case Sort.IndexOrder => SortField.FIELD_DOC
Expand Down
18 changes: 14 additions & 4 deletions sql/src/main/scala/lightdb/sql/SQLStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,18 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten
SQLPart(parts.map(_ => s"${f.field.name} LIKE ?").mkString(" AND "), parts.map(s => SQLArg.StringArg(s)))
case f: Filter.Distance[Doc] => distanceFilter(f)
case f: Filter.Builder[Doc] =>
val parts = f.filters.map { fc =>
val (shoulds, others) = f.filters.partition(f => f.condition == Condition.Filter || f.condition == Condition.Should)
if (f.minShould != 1 && shoulds.nonEmpty) {
throw new UnsupportedOperationException("Should filtering only works in SQL for exactly one condition")
}
val shouldParts = shoulds.map(fc => filter2Part(fc.filter)) match {
case Nil => Nil
case list => List(SQLPart(
sql = list.map(_.sql).mkString("(", " OR ", ")"),
args = list.flatMap(_.args)
))
}
val parts = others.map { fc =>
if (fc.boost.nonEmpty) throw new UnsupportedOperationException("Boost not supported in SQL")
fc.condition match {
case Condition.Must => filter2Part(fc.filter)
Expand All @@ -534,11 +545,10 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten
} else {
p.copy(sql = p.sql.replace(" = ", " != "))
}
case Condition.Filter => throw new UnsupportedOperationException("Filter condition not supported in SQL")
case Condition.Should => throw new UnsupportedOperationException("Should condition not supported in SQL")
case f => throw new UnsupportedOperationException(s"$f condition not supported in SQL")
}
}
SQLPart.merge(parts: _*)
SQLPart.merge(parts ::: shouldParts: _*)
}

private def af2Part(f: AggregateFilter[Doc]): SQLPart = f match {
Expand Down
29 changes: 22 additions & 7 deletions sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lightdb.sql
import fabric._
import fabric.define.DefType
import fabric.rw._
import lightdb.collection.Collection
import lightdb.sql.connect.{ConnectionManager, DBCPConnectionManager, SQLConfig, SingleConnectionManager}
import lightdb.{Field, LightDB}
import lightdb.doc.{Document, DocumentModel}
Expand All @@ -22,6 +23,19 @@ class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connect
private val PointRegex = """POINT\((.+) (.+)\)""".r
private val OptPointRegex = """\[POINT\((.+) (.+)\)\]""".r

override protected def initTransaction()(implicit transaction: Transaction[Doc]): Unit = {
if (hasSpatial) {
scribe.info(s"${collection.name} has spatial features. Enabling...")
val c = connectionManager.getConnection
val s = c.createStatement()
s.executeUpdate(s"SELECT load_extension('${SQLiteStore.spatialitePath}');")
val hasGeometryColumns = this.tables(c).contains("geometry_columns")
if (!hasGeometryColumns) s.executeUpdate("SELECT InitSpatialMetaData()")
s.close()
}
super.initTransaction()
}

override protected def tables(connection: Connection): Set[String] = SQLiteStore.tables(connection)

override protected def toJson(value: Any, rw: RW[_]): Json = {
Expand Down Expand Up @@ -69,6 +83,14 @@ class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val connect
}
}

private lazy val hasSpatial: Boolean = fields.exists { field =>
val className = field.rw.definition match {
case DefType.Opt(d) => d.className
case d => d.className
}
className.contains("lightdb.spatial.GeoPoint")
}

override protected def extraFieldsForDistance(d: Conversion.Distance[Doc]): List[SQLPart] = {
List(SQLPart(s"ST_Distance(GeomFromText('POINT(${d.from.longitude} ${d.from.latitude})', 4326), ${d.field.name}, true) AS ${d.field.name}Distance"))
}
Expand Down Expand Up @@ -111,13 +133,6 @@ object SQLiteStore extends StoreManager {
try {
val c = config.createConnection(uri)
c.setAutoCommit(false)

val s = c.createStatement()
s.executeUpdate(s"SELECT load_extension('$spatialitePath');")
val hasGeometryColumns = this.tables(c).contains("geometry_columns")
if (!hasGeometryColumns) s.executeUpdate("SELECT InitSpatialMetaData()")
s.close()

c
} catch {
case t: Throwable => throw new RuntimeException(s"Error establishing SQLite connection to $uri", t)
Expand Down

0 comments on commit fd638f4

Please sign in to comment.