Skip to content

Commit

Permalink
Continued working on aggregation functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jun 9, 2024
1 parent 8e21975 commit 3be5220
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 13 deletions.
102 changes: 102 additions & 0 deletions all/src/test/scala/spec/AggregationSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package spec

import cats.effect.testing.scalatest.AsyncIOSpec
import fabric.rw._
import lightdb.aggregate.AggregateType
import lightdb.{Document, Id, IndexedLinks, LightDB, StoredValue}
import lightdb.halo.HaloDBSupport
import lightdb.model.Collection
import lightdb.sqlite.SQLiteSupport
import lightdb.upgrade.DatabaseUpgrade
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec

import java.nio.file.{Path, Paths}

class AggregationSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
private val adam = Person("Adam", 21)
private val brenda = Person("Brenda", 11)
private val charlie = Person("Charlie", 35)
private val diana = Person("Diana", 15)
private val evan = Person("Evan", 53)
private val fiona = Person("Fiona", 23)
private val greg = Person("Greg", 12)
private val hanna = Person("Hanna", 62)
private val ian = Person("Ian", 89)
private val jenna = Person("Jenna", 4)
private val kevin = Person("Kevin", 33)
private val linda = Person("Linda", 72)
private val mike = Person("Mike", 42)
private val nancy = Person("Nancy", 22)
private val oscar = Person("Oscar", 21)
private val penny = Person("Penny", 2)
private val quintin = Person("Quintin", 99)
private val ruth = Person("Ruth", 102)
private val sam = Person("Sam", 81)
private val tori = Person("Tori", 30)

private val names = List(
adam, brenda, charlie, diana, evan, fiona, greg, hanna, ian, jenna, kevin, linda, mike, nancy, oscar, penny,
quintin, ruth, sam, tori
)

"Aggregation" should {
"initialize the database" in {
DB.init(truncate = true)
}
"insert people" in {
Person.setAll(names).map { count =>
count should be(20)
}
}
"commit the changes" in {
Person.commit()
}
"get a basic aggregation" in {
val max = Person.age.max()
val min = Person.age.min()
val avg = Person.age.avg()
val sum = Person.age.sum()
val count = Person.age.count()

Person.withSearchContext { implicit context =>
Person.query
.filter(Person.age <=> (5, 16))
.aggregate(max, min, avg, sum, count)
.compile
.toList
.map { list =>
list.map(m => m(max)) should be(List(15))
list.map(m => m(min)) should be(List(11))
list.map(m => m(avg)) should be(List(12.666666666666666))
list.map(m => m(sum)) should be(List(38))
list.map(m => m(count)) should be(List(3))
}
}
}
"dispose" in {
DB.dispose()
}
}

object DB extends LightDB with HaloDBSupport {
override lazy val directory: Path = Paths.get("testdb")

val startTime: StoredValue[Long] = stored[Long]("startTime", -1L)

override lazy val userCollections: List[Collection[_]] = List(
Person
)

override def upgrades: List[DatabaseUpgrade] = Nil
}

case class Person(name: String, age: Int, _id: Id[Person] = Id()) extends Document[Person]

object Person extends Collection[Person]("people", DB) with SQLiteSupport[Person] {
override implicit val rw: RW[Person] = RW.gen

val name: I[String] = index.one("name", _.name)
val age: I[Int] = index.one("age", _.age)
}
}
10 changes: 5 additions & 5 deletions all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche
}
"query with aggregate functions" in {
Person.withSearchContext { implicit context =>
val minAge = Person.age.agg(AggregateType.Min, "minAge")
val maxAge = Person.age.agg(AggregateType.Max, "maxAge")
val avgAge = Person.age.agg(AggregateType.Avg, "avgAge")
val sumAge = Person.age.agg(AggregateType.Sum, "sumAge")
val minAge = Person.age.min()
val maxAge = Person.age.max()
val avgAge = Person.age.avg()
val sumAge = Person.age.sum()
Person.query.aggregate(
minAge,
maxAge,
Expand Down Expand Up @@ -283,7 +283,7 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche
override implicit val rw: RW[Person] = RW.gen

val name: I[String] = index.one("name", _.name)
val age: I[Int] = index.one("age", _.age, materialize = true)
val age: I[Int] = index.one("age", _.age)
val ageLinks: IndexedLinks[Int, Person] = IndexedLinks[Int, Person]("age", _.age, _.toString, this)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package lightdb.aggregate
import fabric.rw.RW
import lightdb.Document

case class AggregateFunction[F, D <: Document[D]](name: String, fieldName: String, `type`: AggregateType, rw: RW[F])
case class AggregateFunction[F, D <: Document[D]](name: String, fieldName: String, `type`: AggregateType)(implicit val rw: RW[F])
1 change: 1 addition & 0 deletions core/src/main/scala/lightdb/aggregate/AggregateType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ object AggregateType {
case object Min extends AggregateType
case object Avg extends AggregateType
case object Sum extends AggregateType
case object Count extends AggregateType
}
10 changes: 8 additions & 2 deletions core/src/main/scala/lightdb/index/Index.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lightdb.index

import fabric.rw.{Convertible, RW}
import fabric.{Json, Null, NumDec, NumInt}
import lightdb.Document
import lightdb.{Document, Unique}
import lightdb.aggregate.{AggregateFunction, AggregateType}
import lightdb.model.{AbstractCollection, Collection}
import lightdb.query.Filter
Expand Down Expand Up @@ -65,5 +65,11 @@ trait Index[F, D <: Document[D]] {

indexSupport.index.register(this)

def agg(`type`: AggregateType, name: String): AggregateFunction[F, D] = AggregateFunction(name, fieldName, `type`, rw)
private def un: String = Unique(length = 8, characters = Unique.LettersLower)

def max(name: String = un): AggregateFunction[F, D] = AggregateFunction(name, fieldName, AggregateType.Max)
def min(name: String = un): AggregateFunction[F, D] = AggregateFunction(name, fieldName, AggregateType.Min)
def avg(name: String = un): AggregateFunction[Double, D] = AggregateFunction(name, fieldName, AggregateType.Avg)
def sum(name: String = un): AggregateFunction[F, D] = AggregateFunction(name, fieldName, AggregateType.Sum)
def count(name: String = un): AggregateFunction[Int, D] = AggregateFunction(name, fieldName, AggregateType.Count)
}
6 changes: 5 additions & 1 deletion core/src/main/scala/lightdb/index/Materialized.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import lightdb.aggregate.AggregateFunction
import lightdb.{Document, Id}

case class Materialized[D <: Document[D]](json: Json) {
private def get[F](name: String, rw: RW[F]): Option[F] = json.get(name).map(_.as[F](rw))
private def get[F](name: String, rw: RW[F]): Option[F] = try {
json.get(name).map(_.as[F](rw))
} catch {
case t: Throwable => throw new RuntimeException(s"Failed to materialize $name, JSON: $json", t)
}
def get[F](index: Index[F, D]): Option[F] = get(index.fieldName, index.rw)
def get[F](function: AggregateFunction[F, D]): Option[F] = get(function.name, function.rw)

Expand Down
6 changes: 3 additions & 3 deletions sql/src/main/scala/lightdb/sql/SQLIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ case class SQLIndexer[D <: Document[D]](indexSupport: SQLSupport[D]) extends Ind
f(context)
}

def apply[F](name: String, get: D => List[F], materialize: Boolean = false)
def apply[F](name: String, get: D => List[F])
(implicit rw: RW[F]): Index[F, D] = SQLIndex(
fieldName = name,
indexSupport = indexSupport,
get = doc => get(doc)
)

def one[F](name: String, get: D => F, materialize: Boolean = false)
(implicit rw: RW[F]): Index[F, D] = apply[F](name, doc => List(get(doc)), materialize = materialize)
def one[F](name: String, get: D => F)
(implicit rw: RW[F]): Index[F, D] = apply[F](name, doc => List(get(doc)))

override def truncate(): IO[Unit] = indexSupport.truncate()

Expand Down
3 changes: 2 additions & 1 deletion sql/src/main/scala/lightdb/sql/SQLSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] {

override lazy val index: SQLIndexer[D] = SQLIndexer(this)

val _id: Index[Id[D], D] = index.one("_id", _._id, materialize = true)
val _id: Index[Id[D], D] = index.one("_id", _._id)

private[lightdb] lazy val backlog = new FlushingBacklog[Id[D], D](1_000, 10_000) {
override protected def write(list: List[D]): IO[Unit] = IO.blocking {
Expand Down Expand Up @@ -232,6 +232,7 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] {
case AggregateType.Min => "MIN"
case AggregateType.Avg => "AVG"
case AggregateType.Sum => "SUM"
case AggregateType.Count => "COUNT"
}
s"$af(${f.fieldName}) AS ${f.name}"
}.mkString(", ")
Expand Down

0 comments on commit 3be5220

Please sign in to comment.