From 924fde6f12cc187cd0c60ce2c161eadf0417e5af Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Fri, 7 Jun 2024 17:39:47 -0500 Subject: [PATCH] Preliminary support grouping --- all/src/test/scala/spec/SimpleHaloAndLuceneSpec.scala | 8 ++++++++ core/src/main/scala/lightdb/query/PagedResults.scala | 5 +++-- core/src/main/scala/lightdb/query/Query.scala | 11 ++++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/all/src/test/scala/spec/SimpleHaloAndLuceneSpec.scala b/all/src/test/scala/spec/SimpleHaloAndLuceneSpec.scala index 7027d920..d7ff7e76 100644 --- a/all/src/test/scala/spec/SimpleHaloAndLuceneSpec.scala +++ b/all/src/test/scala/spec/SimpleHaloAndLuceneSpec.scala @@ -215,6 +215,14 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche people.map(_.name) should be(List("Jane Doe", "John Doe", "Bob Dole")) } } + "group by age" in { + Person.withSearchContext { implicit context => + Person.query.grouped(Person.age).compile.toList.map { list => + list.map(_._1) should be(List(19, 21, 123)) + list.map(_._2.toList.map(_.name)) should be(List(List("Jane Doe"), List("John Doe"), List("Bob Dole"))) + } + } + } "sort by distance from Oklahoma City" in { Person.query .scoreDocs(true) diff --git a/core/src/main/scala/lightdb/query/PagedResults.scala b/core/src/main/scala/lightdb/query/PagedResults.scala index ad396715..28838b39 100644 --- a/core/src/main/scala/lightdb/query/PagedResults.scala +++ b/core/src/main/scala/lightdb/query/PagedResults.scala @@ -24,14 +24,15 @@ case class PagedResults[D <: Document[D], V](query: Query[D, V], def idAndScoreStream: fs2.Stream[IO, (Id[D], Double)] = fs2.Stream(idsAndScores: _*) - def stream: fs2.Stream[IO, V] = idStream + def docStream: fs2.Stream[IO, D] = idStream .evalMap { id => getter match { case Some(g) => g(id) case None => query.collection(id) } } - .evalMap(query.convert) + + def stream: fs2.Stream[IO, V] = docStream.evalMap(query.convert) def scoredStream: fs2.Stream[IO, (V, Double)] = idAndScoreStream .evalMap { diff --git a/core/src/main/scala/lightdb/query/Query.scala b/core/src/main/scala/lightdb/query/Query.scala index 82da34b3..aab3412d 100644 --- a/core/src/main/scala/lightdb/query/Query.scala +++ b/core/src/main/scala/lightdb/query/Query.scala @@ -1,7 +1,8 @@ package lightdb.query +import cats.Eq import cats.effect.IO -import lightdb.index.{IndexSupport, Index} +import lightdb.index.{Index, IndexSupport} import lightdb.model.AbstractCollection import lightdb.spatial.GeoPoint import lightdb.util.DistanceCalculator @@ -93,10 +94,18 @@ case class Query[D <: Document[D], V](indexSupport: IndexSupport[D], fs2.Stream.force(io) } + def docStream(implicit context: SearchContext[D]): fs2.Stream[IO, D] = pageStream.flatMap(_.docStream) + def idStream(implicit context: SearchContext[D]): fs2.Stream[IO, Id[D]] = pageStream.flatMap(_.idStream) def stream(implicit context: SearchContext[D]): fs2.Stream[IO, V] = pageStream.flatMap(_.stream) + def grouped[F](index: Index[F, D], + direction: SortDirection = SortDirection.Ascending) + (implicit context: SearchContext[D]): fs2.Stream[IO, (F, fs2.Chunk[D])] = sort(Sort.ByField(index, direction)) + .docStream + .groupAdjacentBy(doc => index.get(doc).head)(Eq.fromUniversalEquals) + object scored { def stream(implicit context: SearchContext[D]): fs2.Stream[IO, (V, Double)] = pageStream.flatMap(_.scoredStream)