Skip to content

Commit

Permalink
Preliminary support grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jun 7, 2024
1 parent 789d465 commit 924fde6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
8 changes: 8 additions & 0 deletions all/src/test/scala/spec/SimpleHaloAndLuceneSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche
people.map(_.name) should be(List("Jane Doe", "John Doe", "Bob Dole"))
}
}
"group by age" in {
Person.withSearchContext { implicit context =>
Person.query.grouped(Person.age).compile.toList.map { list =>
list.map(_._1) should be(List(19, 21, 123))
list.map(_._2.toList.map(_.name)) should be(List(List("Jane Doe"), List("John Doe"), List("Bob Dole")))
}
}
}
"sort by distance from Oklahoma City" in {
Person.query
.scoreDocs(true)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/lightdb/query/PagedResults.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ case class PagedResults[D <: Document[D], V](query: Query[D, V],

def idAndScoreStream: fs2.Stream[IO, (Id[D], Double)] = fs2.Stream(idsAndScores: _*)

def stream: fs2.Stream[IO, V] = idStream
def docStream: fs2.Stream[IO, D] = idStream
.evalMap { id =>
getter match {
case Some(g) => g(id)
case None => query.collection(id)
}
}
.evalMap(query.convert)

def stream: fs2.Stream[IO, V] = docStream.evalMap(query.convert)

def scoredStream: fs2.Stream[IO, (V, Double)] = idAndScoreStream
.evalMap {
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/lightdb/query/Query.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package lightdb.query

import cats.Eq
import cats.effect.IO
import lightdb.index.{IndexSupport, Index}
import lightdb.index.{Index, IndexSupport}
import lightdb.model.AbstractCollection
import lightdb.spatial.GeoPoint
import lightdb.util.DistanceCalculator
Expand Down Expand Up @@ -93,10 +94,18 @@ case class Query[D <: Document[D], V](indexSupport: IndexSupport[D],
fs2.Stream.force(io)
}

def docStream(implicit context: SearchContext[D]): fs2.Stream[IO, D] = pageStream.flatMap(_.docStream)

def idStream(implicit context: SearchContext[D]): fs2.Stream[IO, Id[D]] = pageStream.flatMap(_.idStream)

def stream(implicit context: SearchContext[D]): fs2.Stream[IO, V] = pageStream.flatMap(_.stream)

def grouped[F](index: Index[F, D],
direction: SortDirection = SortDirection.Ascending)
(implicit context: SearchContext[D]): fs2.Stream[IO, (F, fs2.Chunk[D])] = sort(Sort.ByField(index, direction))
.docStream
.groupAdjacentBy(doc => index.get(doc).head)(Eq.fromUniversalEquals)

object scored {
def stream(implicit context: SearchContext[D]): fs2.Stream[IO, (V, Double)] = pageStream.flatMap(_.scoredStream)

Expand Down

0 comments on commit 924fde6

Please sign in to comment.