From 67ed41cc7e87702b250be96c43e70fd735bf845b Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Mon, 12 Aug 2024 18:53:49 -0500 Subject: [PATCH] Fixes to Aggregator and improvements to AsyncQuery stream support --- .../main/scala/lightdb/async/AsyncQuery.scala | 117 +++++++++++++++--- core/src/main/scala/lightdb/Field.scala | 4 +- core/src/main/scala/lightdb/Query.scala | 8 +- .../main/scala/lightdb/util/Aggregator.scala | 97 +++++++-------- .../test/scala/spec/AbstractBasicSpec.scala | 74 ++++++----- .../scala/lightdb/lucene/LuceneStore.scala | 18 ++- 6 files changed, 218 insertions(+), 100 deletions(-) diff --git a/async/src/main/scala/lightdb/async/AsyncQuery.scala b/async/src/main/scala/lightdb/async/AsyncQuery.scala index fab82c49..f603cd9b 100644 --- a/async/src/main/scala/lightdb/async/AsyncQuery.scala +++ b/async/src/main/scala/lightdb/async/AsyncQuery.scala @@ -15,14 +15,24 @@ import lightdb.transaction.Transaction import lightdb.util.GroupedIterator case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: Collection[Doc, Model], - filter: Option[Filter[Doc]] = None, - sort: List[Sort] = Nil, - offset: Int = 0, - limit: Option[Int] = None, - countTotal: Boolean = false) { query => - private[async] def toQuery: Query[Doc, Model] = Query[Doc, Model](collection, filter, sort, offset, limit, countTotal) + filter: Option[Filter[Doc]] = None, + sort: List[Sort] = Nil, + offset: Int = 0, + limit: Option[Int] = None, + countTotal: Boolean = false, + scoreDocs: Boolean = false, + minDocScore: Option[Double] = None) { query => + private[async] def toQuery: Query[Doc, Model] = Query[Doc, Model](collection, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore) + + def scored: AsyncQuery[Doc, Model] = copy(scoreDocs = true) + + def minDocScore(min: Double): AsyncQuery[Doc, Model] = copy( + scoreDocs = true, + minDocScore = Some(min) + ) def clearFilters: AsyncQuery[Doc, Model] = copy(filter = None) + def filter(f: Model => Filter[Doc]): AsyncQuery[Doc, Model] = { val filter = f(collection.model) val combined = this.filter match { @@ -31,12 +41,89 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect } copy(filter = Some(combined)) } + def clearSort: AsyncQuery[Doc, Model] = copy(sort = Nil) + def sort(sort: Sort*): AsyncQuery[Doc, Model] = copy(sort = this.sort ::: sort.toList) + def offset(offset: Int): AsyncQuery[Doc, Model] = copy(offset = offset) + def limit(limit: Int): AsyncQuery[Doc, Model] = copy(limit = Some(limit)) + def clearLimit: AsyncQuery[Doc, Model] = copy(limit = None) + def countTotal(b: Boolean): AsyncQuery[Doc, Model] = copy(countTotal = b) + + object stream { + object scored { + def apply[V](conversion: Conversion[Doc, V]) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (V, Double)] = { + val io = search(conversion) + .map(_.scoredStream) + fs2.Stream.force(io) + } + + def docs(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Doc, Double)] = apply(Conversion.Doc()) + + def value[F](f: Model => Field[Doc, F]) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (F, Double)] = + apply(Conversion.Value(f(collection.model))) + + def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): fs2.Stream[IO, (Id[Doc], Double)] = + value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) + + def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Json, Double)] = + apply(Conversion.Json(f(collection.model))) + + def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (T, Double)] = + apply(Conversion.Converted(f)) + + def materialized(f: Model => List[Field[Doc, _]]) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (MaterializedIndex[Doc, Model], Double)] = + apply(Conversion.Materialized[Doc, Model](f(collection.model))) + + def distance(f: Model => Field[Doc, Option[GeoPoint]], + from: GeoPoint, + sort: Boolean = true, + radius: Option[Distance] = None) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (DistanceAndDoc[Doc], Double)] = + apply(Conversion.Distance(f(collection.model), from, sort, radius)) + } + + def apply[V](conversion: Conversion[Doc, V]) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, V] = { + val io = search(conversion) + .map(_.stream) + fs2.Stream.force(io) + } + + def docs(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] = apply(Conversion.Doc()) + + def value[F](f: Model => Field[Doc, F]) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, F] = + apply(Conversion.Value(f(collection.model))) + + def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): fs2.Stream[IO, Id[Doc]] = + value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) + + def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Json] = + apply(Conversion.Json(f(collection.model))) + + def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): fs2.Stream[IO, T] = + apply(Conversion.Converted(f)) + + def materialized(f: Model => List[Field[Doc, _]]) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedIndex[Doc, Model]] = + apply(Conversion.Materialized[Doc, Model](f(collection.model))) + + def distance(f: Model => Field[Doc, Option[GeoPoint]], + from: GeoPoint, + sort: Boolean = true, + radius: Option[Distance] = None) + (implicit transaction: Transaction[Doc]): fs2.Stream[IO, DistanceAndDoc[Doc]] = + apply(Conversion.Distance(f(collection.model), from, sort, radius)) + } + object search { def apply[V](conversion: Conversion[Doc, V]) (implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, V]] = @@ -54,18 +141,24 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect } def docs(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Doc]] = apply(Conversion.Doc()) + def value[F](f: Model => Field[Doc, F]) (implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, F]] = apply(Conversion.Value(f(collection.model))) + def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): IO[AsyncSearchResults[Doc, Id[Doc]]] = value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) + def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Json]] = apply(Conversion.Json(f(collection.model))) + def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, T]] = apply(Conversion.Converted(f)) + def materialized(f: Model => List[Field[Doc, _]]) (implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, MaterializedIndex[Doc, Model]]] = apply(Conversion.Materialized(f(collection.model))) + def distance(f: Model => Field[Doc, Option[GeoPoint]], from: GeoPoint, sort: Boolean = true, @@ -74,17 +167,11 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect apply(Conversion.Distance(f(collection.model), from, sort, radius)) } - def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] = - fs2.Stream.force(search.docs.map(_.stream)) - - def scoredStream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Doc, Double)] = - fs2.Stream.force(search.docs.map(_.scoredStream)) - - def toList(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.compile.toList + def toList(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.docs.compile.toList - def first(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = stream.take(1).compile.last + def first(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = stream.docs.take(1).compile.last - def one(implicit transaction: Transaction[Doc]): IO[Doc] = stream.take(1).compile.lastOrError + def one(implicit transaction: Transaction[Doc]): IO[Doc] = stream.docs.take(1).compile.lastOrError def count(implicit transaction: Transaction[Doc]): IO[Int] = copy(limit = Some(1), countTotal = true) .search.docs.map(_.total.get) diff --git a/core/src/main/scala/lightdb/Field.scala b/core/src/main/scala/lightdb/Field.scala index 4c078b61..dc13dad1 100644 --- a/core/src/main/scala/lightdb/Field.scala +++ b/core/src/main/scala/lightdb/Field.scala @@ -66,6 +66,8 @@ trait UniqueIndex[Doc, V] extends Indexed[Doc, V] trait Tokenized[Doc] extends Indexed[Doc, String] object Field { + val NullString: String = "||NULL||" + var MaxIn: Option[Int] = Some(1_000) def apply[Doc, V](name: String, get: Doc => V)(implicit getRW: => RW[V]): Field[Doc, V] = new Field[Doc, V]( @@ -102,7 +104,7 @@ object Field { } def string2Json(name: String, s: String, definition: DefType): Json = definition match { - case _ if s == null => Null + case _ if s == null | s == NullString => Null case DefType.Str => str(s) case DefType.Int => num(s.toLong) case DefType.Dec => num(BigDecimal(s)) diff --git a/core/src/main/scala/lightdb/Query.scala b/core/src/main/scala/lightdb/Query.scala index fe9faad6..0478b016 100644 --- a/core/src/main/scala/lightdb/Query.scala +++ b/core/src/main/scala/lightdb/Query.scala @@ -19,9 +19,15 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: offset: Int = 0, limit: Option[Int] = None, countTotal: Boolean = false, - scoreDocs: Boolean = false) { query => + scoreDocs: Boolean = false, + minDocScore: Option[Double] = None) { query => def scored: Query[Doc, Model] = copy(scoreDocs = true) + def minDocScore(min: Double): Query[Doc, Model] = copy( + scoreDocs = true, + minDocScore = Some(min) + ) + def clearFilters: Query[Doc, Model] = copy(filter = None) def filter(f: Model => Filter[Doc]): Query[Doc, Model] = { diff --git a/core/src/main/scala/lightdb/util/Aggregator.scala b/core/src/main/scala/lightdb/util/Aggregator.scala index 7b9a9811..03b7f2d3 100644 --- a/core/src/main/scala/lightdb/util/Aggregator.scala +++ b/core/src/main/scala/lightdb/util/Aggregator.scala @@ -28,63 +28,62 @@ object Aggregator { query.functions.foreach { f => val current = map.get(f.name) val value = m.value(_ => f.field) - if (f.`type` != AggregateType.Group) { - val newValue: Json = f.`type` match { - case AggregateType.Max => value match { - case NumInt(l, _) => current match { - case Some(c) => num(math.max(c.asLong, l)) - case None => num(l) - } - case NumDec(bd, _) => current match { - case Some(c) => num(bd.max(c.asBigDecimal)) - case None => num(bd) - } - case _ => throw new UnsupportedOperationException(s"Unsupported type for Max: $value (${f.field.name})") + val newValue: Json = f.`type` match { + case AggregateType.Max => value match { + case NumInt(l, _) => current match { + case Some(c) => num(math.max(c.asLong, l)) + case None => num(l) } - case AggregateType.Min => value match { - case NumInt(l, _) => current match { - case Some(c) => num(math.min(c.asLong, l)) - case None => num(l) - } - case NumDec(bd, _) => current match { - case Some(c) => num(bd.min(c.asBigDecimal)) - case None => num(bd) - } - case _ => throw new UnsupportedOperationException(s"Unsupported type for Min: $value (${f.field.name})") + case NumDec(bd, _) => current match { + case Some(c) => num(bd.max(c.asBigDecimal)) + case None => num(bd) } - case AggregateType.Avg => - val v = value.asBigDecimal - current match { - case Some(c) => (v :: c.as[List[BigDecimal]]).json - case None => List(v).json - } - case AggregateType.Sum => value match { - case NumInt(l, _) => current match { - case Some(c) => num(c.asLong + l) - case None => num(l) - } - case NumDec(bd, _) => current match { - case Some(c) => num(bd + c.asBigDecimal) - case None => num(bd) - } - case _ => throw new UnsupportedOperationException(s"Unsupported type for Sum: $value (${f.field.name})") + case _ => throw new UnsupportedOperationException(s"Unsupported type for Max: $value (${f.field.name})") + } + case AggregateType.Min => value match { + case NumInt(l, _) => current match { + case Some(c) => num(math.min(c.asLong, l)) + case None => num(l) + } + case NumDec(bd, _) => current match { + case Some(c) => num(bd.min(c.asBigDecimal)) + case None => num(bd) } - case AggregateType.Count => current match { - case Some(c) => num(c.asInt + 1) - case None => num(0) + case _ => throw new UnsupportedOperationException(s"Unsupported type for Min: $value (${f.field.name})") + } + case AggregateType.Avg => + val v = value.asBigDecimal + current match { + case Some(c) => (v :: c.as[List[BigDecimal]]).json + case None => List(v).json } - case AggregateType.CountDistinct | AggregateType.ConcatDistinct => current match { - case Some(c) => (c.as[Set[Json]] + value).json - case None => Set(value).json + case AggregateType.Sum => value match { + case NumInt(l, _) => current match { + case Some(c) => num(c.asLong + l) + case None => num(l) } - case AggregateType.Concat => current match { - case Some(c) => (value :: c.as[List[Json]]).json - case None => List(value).json + case NumDec(bd, _) => current match { + case Some(c) => num(bd + c.asBigDecimal) + case None => num(bd) } - case _ => throw new UnsupportedOperationException(s"Unsupported type for ${f.`type`}: $value (${f.field.name})") + case _ => throw new UnsupportedOperationException(s"Unsupported type for Sum: $value (${f.field.name})") + } + case AggregateType.Count => current match { + case Some(c) => num(c.asInt + 1) + case None => num(0) + } + case AggregateType.CountDistinct | AggregateType.ConcatDistinct => current match { + case Some(c) => (c.as[Set[Json]] + value).json + case None => Set(value).json + } + case AggregateType.Group => value + case AggregateType.Concat => current match { + case Some(c) => (value :: c.as[List[Json]]).json + case None => List(value).json } - map += f.name -> newValue + case _ => throw new UnsupportedOperationException(s"Unsupported type for ${f.`type`}: $value (${f.field.name})") } + map += f.name -> newValue } groups += group -> map } diff --git a/core/src/test/scala/spec/AbstractBasicSpec.scala b/core/src/test/scala/spec/AbstractBasicSpec.scala index 1f672680..764424bd 100644 --- a/core/src/test/scala/spec/AbstractBasicSpec.scala +++ b/core/src/test/scala/spec/AbstractBasicSpec.scala @@ -19,32 +19,32 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => protected def filterBuilderSupported: Boolean = false protected def memoryOnly: Boolean = false - private val adam = Person("Adam", 21, Person.id("adam")) - private val brenda = Person("Brenda", 11, Person.id("brenda")) - private val charlie = Person("Charlie", 35, Person.id("charlie")) - private val diana = Person("Diana", 15, Person.id("diana")) - private val evan = Person("Evan", 53, Person.id("evan")) - private val fiona = Person("Fiona", 23, Person.id("fiona")) - private val greg = Person("Greg", 12, Person.id("greg")) - private val hanna = Person("Hanna", 62, Person.id("hanna")) - private val ian = Person("Ian", 89, Person.id("ian")) - private val jenna = Person("Jenna", 4, Person.id("jenna")) - private val kevin = Person("Kevin", 33, Person.id("kevin")) - private val linda = Person("Linda", 72, Person.id("linda")) - private val mike = Person("Mike", 42, Person.id("mike")) - private val nancy = Person("Nancy", 22, Person.id("nancy")) - private val oscar = Person("Oscar", 21, Person.id("oscar")) - private val penny = Person("Penny", 2, Person.id("penny")) - private val quintin = Person("Quintin", 99, Person.id("quintin")) - private val ruth = Person("Ruth", 102, Person.id("ruth")) - private val sam = Person("Sam", 81, Person.id("sam")) - private val tori = Person("Tori", 30, Person.id("tori")) - private val uba = Person("Uba", 21, Person.id("uba")) - private val veronica = Person("Veronica", 13, Person.id("veronica")) - private val wyatt = Person("Wyatt", 30, Person.id("wyatt")) - private val xena = Person("Xena", 63, Person.id("xena")) - private val yuri = Person("Yuri", 30, Person.id("yuri")) - private val zoey = Person("Zoey", 101, Person.id("zoey")) + private val adam = Person("Adam", 21, None, Person.id("adam")) + private val brenda = Person("Brenda", 11, None, Person.id("brenda")) + private val charlie = Person("Charlie", 35, None, Person.id("charlie")) + private val diana = Person("Diana", 15, None, Person.id("diana")) + private val evan = Person("Evan", 53, Some("Dallas"), Person.id("evan")) + private val fiona = Person("Fiona", 23, None, Person.id("fiona")) + private val greg = Person("Greg", 12, None, Person.id("greg")) + private val hanna = Person("Hanna", 62, None, Person.id("hanna")) + private val ian = Person("Ian", 89, None, Person.id("ian")) + private val jenna = Person("Jenna", 4, None, Person.id("jenna")) + private val kevin = Person("Kevin", 33, None, Person.id("kevin")) + private val linda = Person("Linda", 72, None, Person.id("linda")) + private val mike = Person("Mike", 42, None, Person.id("mike")) + private val nancy = Person("Nancy", 22, None, Person.id("nancy")) + private val oscar = Person("Oscar", 21, None, Person.id("oscar")) + private val penny = Person("Penny", 2, None, Person.id("penny")) + private val quintin = Person("Quintin", 99, None, Person.id("quintin")) + private val ruth = Person("Ruth", 102, None, Person.id("ruth")) + private val sam = Person("Sam", 81, None, Person.id("sam")) + private val tori = Person("Tori", 30, None, Person.id("tori")) + private val uba = Person("Uba", 21, None, Person.id("uba")) + private val veronica = Person("Veronica", 13, None, Person.id("veronica")) + private val wyatt = Person("Wyatt", 30, None, Person.id("wyatt")) + private val xena = Person("Xena", 63, None, Person.id("xena")) + private val yuri = Person("Yuri", 30, None, Person.id("yuri")) + private val zoey = Person("Zoey", 101, None, Person.id("zoey")) private val names = List( adam, brenda, charlie, diana, evan, fiona, greg, hanna, ian, jenna, kevin, linda, mike, nancy, oscar, penny, @@ -211,6 +211,20 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => } } } + "search where city is not set" in { + db.people.transaction { implicit transaction => + val people = db.people.query.filter(_.city === None).toList + people.map(_.name).toSet should be(Set("Tori", "Ruth", "Sam", "Nancy", "Jenna", "Hanna", "Wyatt", "Diana", "Ian", "Quintin", "Uba", "Oscar", "Kevin", "Penny", "Charlie", "Mike", "Brenda", "Zoey", "Allan", "Xena", "Fiona", "Greg", "Veronica")) + } + } + "search where city is set" in { + db.people.transaction { implicit transaction => + val people = db.people.query.filter(p => Filter.Builder() + .mustNot(p.city === None) + ).toList + people.map(_.name) should be(List("Evan")) + } + } "truncate the collection" in { db.people.transaction { implicit transaction => db.people.truncate() should be(24) @@ -245,13 +259,17 @@ abstract class AbstractBasicSpec extends AnyWordSpec with Matchers { spec => override def upgrades: List[DatabaseUpgrade] = List(InitialSetupUpgrade) } - case class Person(name: String, age: Int, _id: Id[Person] = Person.id()) extends Document[Person] + case class Person(name: String, + age: Int, + city: Option[String], + _id: Id[Person] = Person.id()) extends Document[Person] object Person extends DocumentModel[Person] with JsonConversion[Person] { implicit val rw: RW[Person] = RW.gen val name: F[String] = field("name", _.name) - val age: F[Int] = field.index("age", _.age) + val age: I[Int] = field.index("age", _.age) + val city: I[Option[String]] = field.index("city", _.city) val search: T = field.tokenized("search", doc => s"${doc.name} ${doc.age}") } diff --git a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala index 74c0bcb3..906f4285 100644 --- a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala +++ b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala @@ -61,7 +61,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: case _ => def addJson(json: Json): Unit = json match { case _ if field.rw.definition == DefType.Json => add(new StringField(field.name, JsonFormatter.Compact(json), fs)) - case Null => // Ignore null + case Null => add(new StringField(field.name, Field.NullString, fs)) case Str(s, _) => add(new StringField(field.name, s, fs)) case Bool(b, _) => add(new IntField(field.name, if (b) 1 else 0, fs)) case NumInt(l, _) => add(new LongField(field.name, l, fs)) @@ -144,10 +144,16 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: } } val topFieldDocs: TopFieldDocs = search(None) - val scoreDocs: List[ScoreDoc] = topFieldDocs - .scoreDocs - .toList - .drop(query.offset) + val scoreDocs: List[ScoreDoc] = { + val list = topFieldDocs + .scoreDocs + .toList + .drop(query.offset) + query.minDocScore match { + case Some(min) => list.filter(_.score.toDouble >= min) + case None => list + } + } val total: Int = topFieldDocs.totalHits.value.toInt val storedFields: StoredFields = indexSearcher.storedFields() val idsAndScores = scoreDocs.map(doc => Id[Doc](storedFields.document(doc.doc).get("_id")) -> doc.score.toDouble) @@ -276,7 +282,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](directory: case Bool(b, _) => IntPoint.newExactQuery(field.name, if (b) 1 else 0) case NumInt(l, _) => LongPoint.newExactQuery(field.name, l) case NumDec(bd, _) => DoublePoint.newExactQuery(field.name, bd.toDouble) - case Null if field.rw.definition.isOpt => new FieldExistsQuery(field.name) + case Null if field.rw.definition.isOpt => new TermQuery(new Term(field.name, Field.NullString)) case json => throw new RuntimeException(s"Unsupported equality check: $json (${field.rw.definition})") }