Skip to content

Commit

Permalink
Added IndexingState to FieldGetter to allow more powerful indexing fe…
Browse files Browse the repository at this point in the history
…atures
  • Loading branch information
darkfrog26 committed Sep 19, 2024
1 parent d0c0021 commit dbccc1a
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 27 deletions.
5 changes: 3 additions & 2 deletions async/src/main/scala/lightdb/async/AsyncQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import lightdb.field.Field._
import lightdb.collection.Collection
import lightdb.distance.Distance
import lightdb.doc.{Document, DocumentModel}
import lightdb.field.Field
import lightdb.field.{Field, IndexingState}
import lightdb.filter._
import lightdb.materialized.MaterializedIndex
import lightdb.spatial.{DistanceAndDoc, Geo}
Expand Down Expand Up @@ -186,12 +186,13 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect
direction: SortDirection = SortDirection.Ascending)
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (F, List[Doc])] = {
val field = f(collection.model)
val state = new IndexingState
val io = IO.blocking(sort(Sort.ByField(field, direction))
.toQuery
.search
.docs
.iterator).map { iterator =>
val grouped = GroupedIterator[Doc, F](iterator, doc => field.get(doc, field))
val grouped = GroupedIterator[Doc, F](iterator, doc => field.get(doc, field, state))
fs2.Stream.fromBlockingIterator[IO](grouped, 512)
}
fs2.Stream.force(io)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/lightdb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import lightdb.distance.Distance
import lightdb.doc.{Document, DocumentModel}
import lightdb.error.NonIndexedFieldException
import lightdb.facet.FacetQuery
import lightdb.field.Field
import lightdb.field.{Field, IndexingState}
import lightdb.filter._
import lightdb.materialized.MaterializedIndex
import lightdb.spatial.{DistanceAndDoc, Geo}
Expand Down Expand Up @@ -149,11 +149,12 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
direction: SortDirection = SortDirection.Ascending)
(implicit transaction: Transaction[Doc]): GroupedIterator[Doc, F] = {
val field = f(collection.model)
val state = new IndexingState
val iterator = sort(Sort.ByField(field, direction))
.search
.docs
.iterator
GroupedIterator[Doc, F](iterator, doc => field.get(doc, field))
GroupedIterator[Doc, F](iterator, doc => field.get(doc, field, state))
}
}

Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/lightdb/field/Field.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sealed class Field[Doc <: Document[Doc], V](val name: String,

lazy val isSpatial: Boolean = className.exists(_.startsWith("lightdb.spatial.Geo"))

def getJson(doc: Doc): Json = get(doc, this).json
def getJson(doc: Doc, state: IndexingState): Json = get(doc, this, state).json

override def is(value: V): Filter[Doc] = Filter.Equals(name, value)

Expand Down Expand Up @@ -76,9 +76,13 @@ sealed class Field[Doc <: Document[Doc], V](val name: String,
parsed(words, allowLeadingWildcard = matchEndsWith)
}

def opt: Field[Doc, Option[V]] = new Field[Doc, Option[V]](name, (doc: Doc) => Option(get(doc, this)), () => implicitly[RW[Option[V]]], indexed)
def opt: Field[Doc, Option[V]] = new Field[Doc, Option[V]](name, FieldGetter {
case (doc, _, state) => Option(get(doc, this, state))
}, () => implicitly[RW[Option[V]]], indexed)

def list: Field[Doc, List[V]] = new Field[Doc, List[V]](name, (doc: Doc) => List(get(doc, this)), () => implicitly[RW[List[V]]], indexed)
def list: Field[Doc, List[V]] = new Field[Doc, List[V]](name, FieldGetter {
case (doc, _, state) => List(get(doc, this, state))
}, () => implicitly[RW[List[V]]], indexed)

override def distance(from: Geo.Point, radius: Distance): Filter[Doc] =
Filter.Distance(name, from, radius)
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/scala/lightdb/field/FieldGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ import lightdb.doc.Document
import scala.language.implicitConversions

trait FieldGetter[Doc <: Document[Doc], V] {
def apply(doc: Doc, field: Field[Doc, V]): V
def apply(doc: Doc, field: Field[Doc, V], state: IndexingState): V
}

object FieldGetter {
implicit def function2Getter[Doc <: Document[Doc], V](f: Doc => V): FieldGetter[Doc, V] = new FieldGetter[Doc, V] {
override def apply(doc: Doc, field: Field[Doc, V]): V = f(doc)
implicit def function2Getter[Doc <: Document[Doc], V](f: Doc => V): FieldGetter[Doc, V] = func(f)

def func[Doc <: Document[Doc], V](f: Doc => V): FieldGetter[Doc, V] = apply {
case (doc, _, _) => f(doc)
}

def apply[Doc <: Document[Doc], V](f: (Doc, Field[Doc, V], IndexingState) => V): FieldGetter[Doc, V] = new FieldGetter[Doc, V] {
override def apply(doc: Doc, field: Field[Doc, V], state: IndexingState): V = f(doc, field, state)
}
}
3 changes: 3 additions & 0 deletions core/src/main/scala/lightdb/field/IndexingKey.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package lightdb.field

trait IndexingKey[T]
26 changes: 26 additions & 0 deletions core/src/main/scala/lightdb/field/IndexingState.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package lightdb.field

class IndexingState {
private var map = Map.empty[IndexingKey[_], Any]

def get[T](key: IndexingKey[T]): Option[T] = map.get(key).map(_.asInstanceOf[T])
def getOrCreate[T](key: IndexingKey[T], create: => T): T = synchronized {
get(key) match {
case Some(value) => value
case None =>
val value: T = create
set(key, value)
value
}
}
def apply[T](key: IndexingKey[T]): T = get[T](key).getOrElse(throw new NullPointerException(s"Not found: $key"))
def set[T](key: IndexingKey[T], value: T): Unit = synchronized {
map += key -> value
}
def remove[T](key: IndexingKey[T]): Unit = synchronized {
map -= key
}
def clear(): Unit = synchronized {
map = Map.empty
}
}
10 changes: 7 additions & 3 deletions core/src/main/scala/lightdb/store/InMemoryIndexes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import lightdb._
import lightdb.field.Field._
import lightdb.collection.Collection
import lightdb.doc.{Document, DocumentModel}
import lightdb.field.IndexingState
import lightdb.transaction.Transaction
import lightdb.util.InMemoryIndex

Expand All @@ -18,20 +19,23 @@ trait InMemoryIndexes[Doc <: Document[Doc], Model <: DocumentModel[Doc]] extends

// Populate indexes
collection.transaction { implicit transaction =>
val state = new IndexingState
collection.iterator.foreach { doc =>
indexes.foreach(_.set(doc))
indexes.foreach(_.set(doc, state))
}
}
}

abstract override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = {
super.insert(doc)
indexes.foreach(_.set(doc))
val state = new IndexingState
indexes.foreach(_.set(doc, state))
}

abstract override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = {
super.upsert(doc)
indexes.foreach(_.set(doc))
val state = new IndexingState
indexes.foreach(_.set(doc, state))
}

abstract override def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Boolean = {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/lightdb/util/InMemoryIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lightdb.util
import lightdb._
import lightdb.field.Field._
import lightdb.doc.Document
import lightdb.field.IndexingState

import java.util.Comparator
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -12,7 +13,7 @@ class InMemoryIndex[Doc <: Document[Doc], V](field: Indexed[Doc, V], comparator:
private val currentValue = new ConcurrentHashMap[Id[Doc], V]
private val sorted = new AtomicList[V](comparator)

def set(doc: Doc): Unit = set(doc._id, field.get(doc, field))
def set(doc: Doc, state: IndexingState): Unit = set(doc._id, field.get(doc, field, state))

def set(id: Id[Doc], value: V): Unit = {
Option(currentValue.get(id)).foreach(previous => remove(id, previous))
Expand Down
18 changes: 10 additions & 8 deletions lucene/src/main/scala/lightdb/lucene/LuceneStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import lightdb._
import lightdb.field.Field._
import lightdb.doc.{Document, DocumentModel, JsonConversion}
import lightdb.facet.{FacetResult, FacetResultValue}
import lightdb.field.Field
import lightdb.field.{Field, IndexingState}
import lightdb.filter.{Condition, Filter}
import lightdb.lucene.index.Index
import lightdb.materialized.{MaterializedAggregate, MaterializedIndex}
Expand Down Expand Up @@ -110,13 +110,13 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
add(new StoredField(field.name, JsonFormatter.Compact(json)))
}

private def createLuceneFields(field: Field[Doc, _], doc: Doc): List[LuceneField] = {
private def createLuceneFields(field: Field[Doc, _], doc: Doc, state: IndexingState): List[LuceneField] = {
def fs: LuceneField.Store = if (storeMode == StoreMode.All || field.indexed) LuceneField.Store.YES else LuceneField.Store.NO
val json = field.getJson(doc)
val json = field.getJson(doc, state)
var fields = List.empty[LuceneField]
def add(field: LuceneField): Unit = fields = field :: fields
field match {
case ff: FacetField[Doc] => ff.get(doc, ff).flatMap { value =>
case ff: FacetField[Doc] => ff.get(doc, ff, state).flatMap { value =>
if (value.path.nonEmpty || ff.hierarchical) {
val path = if (ff.hierarchical) value.path ::: List("$ROOT$") else value.path
Some(new LuceneFacetField(field.name, path: _*))
Expand All @@ -125,7 +125,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
}
}
case t: Tokenized[Doc] =>
List(new LuceneField(field.name, t.get(doc, t), if (fs == LuceneField.Store.YES) TextField.TYPE_STORED else TextField.TYPE_NOT_STORED))
List(new LuceneField(field.name, t.get(doc, t, state), if (fs == LuceneField.Store.YES) TextField.TYPE_STORED else TextField.TYPE_NOT_STORED))
case _ =>
def addJson(json: Json, d: DefType): Unit = {
if (field.isSpatial) {
Expand All @@ -149,7 +149,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
addJson(json, field.rw.definition)

val fieldSortName = s"${field.name}Sort"
field.getJson(doc) match {
field.getJson(doc, state) match {
case Str(s, _) =>
val bytes = new BytesRef(s)
val sorted = new SortedDocValuesField(fieldSortName, bytes)
Expand All @@ -171,8 +171,9 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:

private def addDoc(doc: Doc, upsert: Boolean): Unit = if (fields.tail.nonEmpty) {
val id = this.id(doc)
val state = new IndexingState
val luceneFields = fields.flatMap { field =>
createLuceneFields(field, doc)
createLuceneFields(field, doc, state)
}
val document = new LuceneDocument
luceneFields.foreach(document.add)
Expand Down Expand Up @@ -319,8 +320,9 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory:
case Conversion.Json(fields) => jsonIterator(fields).asInstanceOf[Iterator[(V, Double)]]
case Conversion.Distance(field, from, sort, radius) => idsAndScores.iterator.map {
case (id, score) =>
val state = new IndexingState
val doc = collection(id)(transaction)
val distance = field.get(doc, field).map(d => Spatial.distance(from, d))
val distance = field.get(doc, field, state).map(d => Spatial.distance(from, d))
DistanceAndDoc(doc, distance) -> score
}
}
Expand Down
6 changes: 4 additions & 2 deletions sql/src/main/scala/lightdb/sql/SQLArg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import fabric.rw._
import lightdb.doc.Document
import lightdb.spatial.Geo
import lightdb._
import lightdb.field.Field
import lightdb.field.{Field, IndexingState}
import lightdb.field.Field._

import java.sql.{JDBCType, PreparedStatement, SQLType, Types}
Expand Down Expand Up @@ -57,7 +57,9 @@ object SQLArg {
}

object FieldArg {
def apply[Doc <: Document[Doc], F](doc: Doc, field: Field[Doc, F]): FieldArg[Doc, F] = apply(field, field.get(doc, field))
def apply[Doc <: Document[Doc], F](doc: Doc,
field: Field[Doc, F],
state: IndexingState): FieldArg[Doc, F] = apply(field, field.get(doc, field, state))
}

case class StringArg(s: String) extends SQLArg {
Expand Down
8 changes: 5 additions & 3 deletions sql/src/main/scala/lightdb/sql/SQLStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import lightdb.store.{Conversion, Store, StoreMode}
import lightdb.transaction.{Transaction, TransactionKey}
import lightdb.util.ActionIterator
import lightdb._
import lightdb.field.Field
import lightdb.field.{Field, IndexingState}
import lightdb.field.Field._

import java.sql.{Connection, PreparedStatement, ResultSet}
Expand Down Expand Up @@ -138,9 +138,10 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten

override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = {
val state = getState
val indexingState = new IndexingState
state.withInsertPreparedStatement { ps =>
fields.zipWithIndex.foreach {
case (field, index) => SQLArg.FieldArg(doc, field).set(ps, index + 1)
case (field, index) => SQLArg.FieldArg(doc, field, indexingState).set(ps, index + 1)
}
ps.addBatch()
state.batchInsert.incrementAndGet()
Expand All @@ -153,9 +154,10 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]] exten

override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = {
val state = getState
val indexingState = new IndexingState
state.withUpsertPreparedStatement { ps =>
fields.zipWithIndex.foreach {
case (field, index) => SQLArg.FieldArg(doc, field).set(ps, index + 1)
case (field, index) => SQLArg.FieldArg(doc, field, indexingState).set(ps, index + 1)
}
ps.addBatch()
state.batchUpsert.incrementAndGet()
Expand Down

0 comments on commit dbccc1a

Please sign in to comment.