Skip to content

Commit

Permalink
Fixes to Aggregator and improvements to AsyncQuery stream support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Aug 12, 2024
1 parent 289f224 commit 67ed41c
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 100 deletions.
117 changes: 102 additions & 15 deletions async/src/main/scala/lightdb/async/AsyncQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,24 @@ import lightdb.transaction.Transaction
import lightdb.util.GroupedIterator

case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: Collection[Doc, Model],
filter: Option[Filter[Doc]] = None,
sort: List[Sort] = Nil,
offset: Int = 0,
limit: Option[Int] = None,
countTotal: Boolean = false) { query =>
private[async] def toQuery: Query[Doc, Model] = Query[Doc, Model](collection, filter, sort, offset, limit, countTotal)
filter: Option[Filter[Doc]] = None,
sort: List[Sort] = Nil,
offset: Int = 0,
limit: Option[Int] = None,
countTotal: Boolean = false,
scoreDocs: Boolean = false,
minDocScore: Option[Double] = None) { query =>
private[async] def toQuery: Query[Doc, Model] = Query[Doc, Model](collection, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore)

def scored: AsyncQuery[Doc, Model] = copy(scoreDocs = true)

def minDocScore(min: Double): AsyncQuery[Doc, Model] = copy(
scoreDocs = true,
minDocScore = Some(min)
)

def clearFilters: AsyncQuery[Doc, Model] = copy(filter = None)

def filter(f: Model => Filter[Doc]): AsyncQuery[Doc, Model] = {
val filter = f(collection.model)
val combined = this.filter match {
Expand All @@ -31,12 +41,89 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect
}
copy(filter = Some(combined))
}

def clearSort: AsyncQuery[Doc, Model] = copy(sort = Nil)

def sort(sort: Sort*): AsyncQuery[Doc, Model] = copy(sort = this.sort ::: sort.toList)

def offset(offset: Int): AsyncQuery[Doc, Model] = copy(offset = offset)

def limit(limit: Int): AsyncQuery[Doc, Model] = copy(limit = Some(limit))

def clearLimit: AsyncQuery[Doc, Model] = copy(limit = None)

def countTotal(b: Boolean): AsyncQuery[Doc, Model] = copy(countTotal = b)

object stream {
object scored {
def apply[V](conversion: Conversion[Doc, V])
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (V, Double)] = {
val io = search(conversion)
.map(_.scoredStream)
fs2.Stream.force(io)
}

def docs(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Doc, Double)] = apply(Conversion.Doc())

def value[F](f: Model => Field[Doc, F])
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (F, Double)] =
apply(Conversion.Value(f(collection.model)))

def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): fs2.Stream[IO, (Id[Doc], Double)] =
value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]])

def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Json, Double)] =
apply(Conversion.Json(f(collection.model)))

def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (T, Double)] =
apply(Conversion.Converted(f))

def materialized(f: Model => List[Field[Doc, _]])
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (MaterializedIndex[Doc, Model], Double)] =
apply(Conversion.Materialized[Doc, Model](f(collection.model)))

def distance(f: Model => Field[Doc, Option[GeoPoint]],
from: GeoPoint,
sort: Boolean = true,
radius: Option[Distance] = None)
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (DistanceAndDoc[Doc], Double)] =
apply(Conversion.Distance(f(collection.model), from, sort, radius))
}

def apply[V](conversion: Conversion[Doc, V])
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, V] = {
val io = search(conversion)
.map(_.stream)
fs2.Stream.force(io)
}

def docs(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] = apply(Conversion.Doc())

def value[F](f: Model => Field[Doc, F])
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, F] =
apply(Conversion.Value(f(collection.model)))

def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): fs2.Stream[IO, Id[Doc]] =
value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]])

def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Json] =
apply(Conversion.Json(f(collection.model)))

def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): fs2.Stream[IO, T] =
apply(Conversion.Converted(f))

def materialized(f: Model => List[Field[Doc, _]])
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedIndex[Doc, Model]] =
apply(Conversion.Materialized[Doc, Model](f(collection.model)))

def distance(f: Model => Field[Doc, Option[GeoPoint]],
from: GeoPoint,
sort: Boolean = true,
radius: Option[Distance] = None)
(implicit transaction: Transaction[Doc]): fs2.Stream[IO, DistanceAndDoc[Doc]] =
apply(Conversion.Distance(f(collection.model), from, sort, radius))
}

object search {
def apply[V](conversion: Conversion[Doc, V])
(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, V]] =
Expand All @@ -54,18 +141,24 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect
}

def docs(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Doc]] = apply(Conversion.Doc())

def value[F](f: Model => Field[Doc, F])
(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, F]] =
apply(Conversion.Value(f(collection.model)))

def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): IO[AsyncSearchResults[Doc, Id[Doc]]] =
value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]])

def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Json]] =
apply(Conversion.Json(f(collection.model)))

def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, T]] =
apply(Conversion.Converted(f))

def materialized(f: Model => List[Field[Doc, _]])
(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, MaterializedIndex[Doc, Model]]] =
apply(Conversion.Materialized(f(collection.model)))

def distance(f: Model => Field[Doc, Option[GeoPoint]],
from: GeoPoint,
sort: Boolean = true,
Expand All @@ -74,17 +167,11 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect
apply(Conversion.Distance(f(collection.model), from, sort, radius))
}

def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] =
fs2.Stream.force(search.docs.map(_.stream))

def scoredStream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Doc, Double)] =
fs2.Stream.force(search.docs.map(_.scoredStream))

def toList(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.compile.toList
def toList(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.docs.compile.toList

def first(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = stream.take(1).compile.last
def first(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = stream.docs.take(1).compile.last

def one(implicit transaction: Transaction[Doc]): IO[Doc] = stream.take(1).compile.lastOrError
def one(implicit transaction: Transaction[Doc]): IO[Doc] = stream.docs.take(1).compile.lastOrError

def count(implicit transaction: Transaction[Doc]): IO[Int] = copy(limit = Some(1), countTotal = true)
.search.docs.map(_.total.get)
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/lightdb/Field.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ trait UniqueIndex[Doc, V] extends Indexed[Doc, V]
trait Tokenized[Doc] extends Indexed[Doc, String]

object Field {
val NullString: String = "||NULL||"

var MaxIn: Option[Int] = Some(1_000)

def apply[Doc, V](name: String, get: Doc => V)(implicit getRW: => RW[V]): Field[Doc, V] = new Field[Doc, V](
Expand Down Expand Up @@ -102,7 +104,7 @@ object Field {
}

def string2Json(name: String, s: String, definition: DefType): Json = definition match {
case _ if s == null => Null
case _ if s == null | s == NullString => Null
case DefType.Str => str(s)
case DefType.Int => num(s.toLong)
case DefType.Dec => num(BigDecimal(s))
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/lightdb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection:
offset: Int = 0,
limit: Option[Int] = None,
countTotal: Boolean = false,
scoreDocs: Boolean = false) { query =>
scoreDocs: Boolean = false,
minDocScore: Option[Double] = None) { query =>
def scored: Query[Doc, Model] = copy(scoreDocs = true)

def minDocScore(min: Double): Query[Doc, Model] = copy(
scoreDocs = true,
minDocScore = Some(min)
)

def clearFilters: Query[Doc, Model] = copy(filter = None)

def filter(f: Model => Filter[Doc]): Query[Doc, Model] = {
Expand Down
97 changes: 48 additions & 49 deletions core/src/main/scala/lightdb/util/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,63 +28,62 @@ object Aggregator {
query.functions.foreach { f =>
val current = map.get(f.name)
val value = m.value(_ => f.field)
if (f.`type` != AggregateType.Group) {
val newValue: Json = f.`type` match {
case AggregateType.Max => value match {
case NumInt(l, _) => current match {
case Some(c) => num(math.max(c.asLong, l))
case None => num(l)
}
case NumDec(bd, _) => current match {
case Some(c) => num(bd.max(c.asBigDecimal))
case None => num(bd)
}
case _ => throw new UnsupportedOperationException(s"Unsupported type for Max: $value (${f.field.name})")
val newValue: Json = f.`type` match {
case AggregateType.Max => value match {
case NumInt(l, _) => current match {
case Some(c) => num(math.max(c.asLong, l))
case None => num(l)
}
case AggregateType.Min => value match {
case NumInt(l, _) => current match {
case Some(c) => num(math.min(c.asLong, l))
case None => num(l)
}
case NumDec(bd, _) => current match {
case Some(c) => num(bd.min(c.asBigDecimal))
case None => num(bd)
}
case _ => throw new UnsupportedOperationException(s"Unsupported type for Min: $value (${f.field.name})")
case NumDec(bd, _) => current match {
case Some(c) => num(bd.max(c.asBigDecimal))
case None => num(bd)
}
case AggregateType.Avg =>
val v = value.asBigDecimal
current match {
case Some(c) => (v :: c.as[List[BigDecimal]]).json
case None => List(v).json
}
case AggregateType.Sum => value match {
case NumInt(l, _) => current match {
case Some(c) => num(c.asLong + l)
case None => num(l)
}
case NumDec(bd, _) => current match {
case Some(c) => num(bd + c.asBigDecimal)
case None => num(bd)
}
case _ => throw new UnsupportedOperationException(s"Unsupported type for Sum: $value (${f.field.name})")
case _ => throw new UnsupportedOperationException(s"Unsupported type for Max: $value (${f.field.name})")
}
case AggregateType.Min => value match {
case NumInt(l, _) => current match {
case Some(c) => num(math.min(c.asLong, l))
case None => num(l)
}
case NumDec(bd, _) => current match {
case Some(c) => num(bd.min(c.asBigDecimal))
case None => num(bd)
}
case AggregateType.Count => current match {
case Some(c) => num(c.asInt + 1)
case None => num(0)
case _ => throw new UnsupportedOperationException(s"Unsupported type for Min: $value (${f.field.name})")
}
case AggregateType.Avg =>
val v = value.asBigDecimal
current match {
case Some(c) => (v :: c.as[List[BigDecimal]]).json
case None => List(v).json
}
case AggregateType.CountDistinct | AggregateType.ConcatDistinct => current match {
case Some(c) => (c.as[Set[Json]] + value).json
case None => Set(value).json
case AggregateType.Sum => value match {
case NumInt(l, _) => current match {
case Some(c) => num(c.asLong + l)
case None => num(l)
}
case AggregateType.Concat => current match {
case Some(c) => (value :: c.as[List[Json]]).json
case None => List(value).json
case NumDec(bd, _) => current match {
case Some(c) => num(bd + c.asBigDecimal)
case None => num(bd)
}
case _ => throw new UnsupportedOperationException(s"Unsupported type for ${f.`type`}: $value (${f.field.name})")
case _ => throw new UnsupportedOperationException(s"Unsupported type for Sum: $value (${f.field.name})")
}
case AggregateType.Count => current match {
case Some(c) => num(c.asInt + 1)
case None => num(0)
}
case AggregateType.CountDistinct | AggregateType.ConcatDistinct => current match {
case Some(c) => (c.as[Set[Json]] + value).json
case None => Set(value).json
}
case AggregateType.Group => value
case AggregateType.Concat => current match {
case Some(c) => (value :: c.as[List[Json]]).json
case None => List(value).json
}
map += f.name -> newValue
case _ => throw new UnsupportedOperationException(s"Unsupported type for ${f.`type`}: $value (${f.field.name})")
}
map += f.name -> newValue
}
groups += group -> map
}
Expand Down
Loading

0 comments on commit 67ed41c

Please sign in to comment.