Skip to content

Commit

Permalink
Lots of work on spatial support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed May 24, 2024
1 parent e39be7f commit 1ca51e2
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 22 deletions.
50 changes: 45 additions & 5 deletions all/src/test/scala/spec/SimpleHaloAndLuceneSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import lightdb.halo.HaloDBSupport
import lightdb.lucene.{LuceneIndex, LuceneSupport}
import lightdb.model.Collection
import lightdb.query.Sort
import lightdb.spatial.GeoPoint
import lightdb.upgrade.DatabaseUpgrade
import lightdb.util.DistanceCalculator
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scribe.{Level, Logger}
Expand All @@ -19,8 +21,27 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche
private val id1 = Id[Person]("john")
private val id2 = Id[Person]("jane")

private val p1 = Person("John Doe", 21, Set("dog", "cat"), id1)
private val p2 = Person("Jane Doe", 19, Set("cat"), id2)
private val newYorkCity = GeoPoint(40.7142, -74.0119)
private val chicago = GeoPoint(41.8119, -87.6873)
private val jeffersonValley = GeoPoint(41.3385, -73.7947)
private val noble = GeoPoint(35.1417, -97.3409)
private val oklahomaCity = GeoPoint(35.5514, -97.4075)
private val yonkers = GeoPoint(40.9461, -73.8669)

private val p1 = Person(
name = "John Doe",
age = 21,
tags = Set("dog", "cat"),
point = newYorkCity,
_id = id1
)
private val p2 = Person(
name = "Jane Doe",
age = 19,
tags = Set("cat"),
point = noble,
_id = id2
)

"Simple database" should {
"initialize the database" in {
Expand Down Expand Up @@ -162,6 +183,21 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche
people.map(_.name) should be(List("Jane Doe", "John Doe"))
}
}
"sort by distance from Oklahoma City" in {
Person.query
.scoreDocs(true)
.sort(Sort.ByDistance(Person.point, oklahomaCity))
.scored
.toList
.map { peopleAndScores =>
val people = peopleAndScores.map(_._1)
val scores = peopleAndScores.map(_._2)
people.map(_.name) should be(List("Jane Doe", "John Doe"))
scores should be(List(1.0, 1.0))
val calculated = people.map(p => DistanceCalculator(oklahomaCity, p.point).toUsMiles)
calculated should be(List(28.555228128634383, 1316.1223938032729))
}
}
"delete John" in {
Person.delete(id1).map { deleted =>
deleted should be(id1)
Expand Down Expand Up @@ -190,7 +226,7 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche
}
}
"replace Jane Doe" in {
Person.set(Person("Jan Doe", 20, Set("cat", "bear"), id2)).map { p =>
Person.set(Person("Jan Doe", 20, Set("cat", "bear"), chicago, id2)).map { p =>
p._id should be(id2)
}
}
Expand Down Expand Up @@ -225,11 +261,11 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche

object DB extends LightDB with HaloDBSupport {
override lazy val directory: Path = Paths.get("testdb")
// override protected def autoCommit: Boolean = true
// override protected def autoCommit: Boolean = true

val startTime: StoredValue[Long] = stored[Long]("startTime", -1L)

// val people: Collection[Person] = collection("people", Person)
// val people: Collection[Person] = collection("people", Person)

override lazy val collections: List[Collection[_]] = List(
Person
Expand All @@ -241,6 +277,7 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche
case class Person(name: String,
age: Int,
tags: Set[String],
point: GeoPoint,
_id: Id[Person] = Id()) extends Document[Person]

object Person extends Collection[Person]("people", DB) with LuceneSupport[Person] {
Expand All @@ -250,11 +287,14 @@ class SimpleHaloAndLuceneSpec extends AsyncWordSpec with AsyncIOSpec with Matche
val age: LuceneIndex[Int, Person] = index.one("age", _.age)
val ageLinks: IndexedLinks[Int, Person] = indexedLinks[Int]("age", _.toString, _.age)
val tag: LuceneIndex[String, Person] = index("tag", _.tags.toList)
val point: LuceneIndex[GeoPoint, Person] = index.one("point", _.point, sorted = true)
}

object InitialSetupUpgrade extends DatabaseUpgrade {
override def applyToNew: Boolean = true

override def blockStartup: Boolean = true

override def alwaysRun: Boolean = false

override def upgrade(ldb: LightDB): IO[Unit] = DB.startTime.set(System.currentTimeMillis()).map(_ => ())
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ val scribeVersion: String = "3.13.5"
val luceneVersion: String = "9.10.0"
val sqliteVersion: String = "3.45.3.0"
val keysemaphoreVersion: String = "0.3.0-M1"
val squantsVersion: String = "1.8.3"

val scalaTestVersion: String = "3.2.18"
val catsEffectTestingVersion: String = "1.5.0"
Expand All @@ -79,6 +80,7 @@ lazy val core = project.in(file("core"))
"co.fs2" %% "fs2-core" % fs2Version,
"com.outr" %% "scribe-slf4j" % scribeVersion,
"io.chrisdavenport" %% "keysemaphore" % keysemaphoreVersion,
"org.typelevel" %% "squants" % squantsVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test
),
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/lightdb/model/DocumentModel.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package lightdb.model

import cats.effect.IO
import cats.implicits.{catsSyntaxApplicativeByName, toTraverseOps}
import fabric.Json
import fabric.rw.RW
import cats.implicits._
import lightdb.index.IndexedField
import lightdb.{Document, Id, IndexedLinks}
import lightdb.{Document, Id, IndexedLinks, Unique}

trait DocumentModel[D <: Document[D]] {
type Field[F] = IndexedField[F, D]

private[lightdb] var _indexedLinks = List.empty[IndexedLinks[_, D]]

def id(value: String = Unique()): Id[D] = Id(value)

def indexedLinks: List[IndexedLinks[_, D]] = _indexedLinks

protected[lightdb] def initModel(collection: AbstractCollection[D]): Unit = {
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/lightdb/model/RecordDocumentModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import lightdb.RecordDocument
trait RecordDocumentModel[D <: RecordDocument[D]] extends DocumentModel[D] {
override protected[lightdb] def initModel(collection: AbstractCollection[D]): Unit = {
super.initModel(collection)
collection.preSetJson.add(new DocumentListener[D, Json] {
override def apply(action: DocumentAction, json: Json, collection: AbstractCollection[D]): IO[Option[Json]] = IO {
Some(json.modify("modified")(_ => System.currentTimeMillis()))
}
collection.preSetJson.add((_: DocumentAction, json: Json, _: AbstractCollection[D]) => IO {
Some(json.modify("modified")(_ => System.currentTimeMillis()))
})
}
}
17 changes: 15 additions & 2 deletions core/src/main/scala/lightdb/query/PagedResults.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ case class PagedResults[D <: Document[D]](query: Query[D],
context: PageContext[D],
offset: Int,
total: Int,
ids: List[Id[D]],
idsAndScores: List[(Id[D], Double)],
getter: Option[Id[D] => IO[D]] = None) {
lazy val page: Int = offset / query.pageSize
lazy val pages: Int = math.ceil(total.toDouble / query.pageSize.toDouble).toInt

lazy val ids: List[Id[D]] = idsAndScores.map(_._1)
lazy val scores: List[Double] = idsAndScores.map(_._2)

def idStream: fs2.Stream[IO, Id[D]] = fs2.Stream(ids: _*)

def idAndScoreStream: fs2.Stream[IO, (Id[D], Double)] = fs2.Stream(idsAndScores: _*)

def stream: fs2.Stream[IO, D] = idStream
.evalMap { id =>
getter match {
Expand All @@ -23,7 +28,15 @@ case class PagedResults[D <: Document[D]](query: Query[D],
}
}

def docs: IO[List[D]] = ids.map(id => query.collection(id)).sequence
def scoredStream: fs2.Stream[IO, (D, Double)] = idAndScoreStream
.evalMap {
case (id, score) => getter match {
case Some(g) => g(id).map(doc => doc -> score)
case None => query.collection(id).map(doc => doc -> score)
}
}

def docs: IO[List[D]] = ids.map(query.collection(_)).sequence

def hasNext: Boolean = pages > (page + 1)

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/lightdb/query/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ case class Query[D <: Document[D]](indexSupport: IndexSupport[D],
def idStream(implicit context: SearchContext[D]): fs2.Stream[IO, Id[D]] = pageStream.flatMap(_.idStream)
def stream(implicit context: SearchContext[D]): fs2.Stream[IO, D] = pageStream.flatMap(_.stream)

object scored {
def stream(implicit context: SearchContext[D]): fs2.Stream[IO, (D, Double)] = pageStream.flatMap(_.scoredStream)
def toList: IO[List[(D, Double)]] = indexSupport.withSearchContext { implicit context =>
stream.compile.toList
}
}

def toIdList: IO[List[Id[D]]] = indexSupport.withSearchContext { implicit context =>
idStream.compile.toList
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/lightdb/query/Sort.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package lightdb.query

import lightdb.Document
import lightdb.index.IndexedField
import lightdb.spatial.GeoPoint

sealed trait Sort
trait Sort

object Sort {
case object BestMatch extends Sort
case object IndexOrder extends Sort
case class ByField[D <: Document[D], F](field: IndexedField[F, D], reverse: Boolean = false) extends Sort
case class ByDistance[D <: Document[D]](field: IndexedField[GeoPoint, D],
from: GeoPoint) extends Sort
}
9 changes: 9 additions & 0 deletions core/src/main/scala/lightdb/spatial/GeoPoint.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package lightdb.spatial

import fabric.rw.RW

case class GeoPoint(latitude: Double, longitude: Double)

object GeoPoint {
implicit val rw: RW[GeoPoint] = RW.gen
}
27 changes: 27 additions & 0 deletions core/src/main/scala/lightdb/util/DistanceCalculator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package lightdb.util

import lightdb.spatial.GeoPoint
import squants.space.Length
import squants.space.LengthConversions.LengthConversions

import scala.math._

object DistanceCalculator {
val EarthRadiusMeters: Int = 6371000 // Earth's radius in meters

// Calculate the Haversine distance between two points in meters
def haversineDistance(lat1: Double, lon1: Double, lat2: Double, lon2: Double): Double = {
val dLat = toRadians(lat2 - lat1)
val dLon = toRadians(lon2 - lon1)
val a = sin(dLat / 2) * sin(dLat / 2) + cos(toRadians(lat1)) * cos(toRadians(lat2)) * sin(dLon / 2) * sin(dLon / 2)
val c = 2 * atan2(sqrt(a), sqrt(1 - a))
EarthRadiusMeters * c
}

def apply(p1: GeoPoint, p2: GeoPoint): Length = haversineDistance(
lat1 = p1.latitude,
lon1 = p1.longitude,
lat2 = p2.latitude,
lon2 = p2.longitude
).meters
}
27 changes: 26 additions & 1 deletion lucene/src/main/scala/lightdb/lucene/LuceneIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import fabric.define.DefType
import fabric.rw._
import lightdb.Document
import lightdb.index.{IndexSupport, IndexedField}
import lightdb.spatial.GeoPoint
import org.apache.lucene.index.Term
import org.apache.lucene.search._
import org.apache.lucene.document._
Expand All @@ -13,8 +14,16 @@ case class LuceneIndex[F, D <: Document[D]](fieldName: String,
indexSupport: IndexSupport[D],
get: D => List[F],
store: Boolean,
sorted: Boolean,
tokenized: Boolean)
(implicit val rw: RW[F]) extends IndexedField[F, D] {
lazy val fieldSortName: String = {
val separate = rw.definition.className.collect {
case "lightdb.spatial.GeoPoint" => true
}.getOrElse(false)
if (separate) s"${fieldName}Sort" else fieldName
}

def ===(value: F): LuceneFilter[D] = is(value)
def is(value: F): LuceneFilter[D] = LuceneFilter(() => value.json match {
case Str(s, _) => new TermQuery(new Term(fieldName, s))
Expand Down Expand Up @@ -46,14 +55,30 @@ case class LuceneIndex[F, D <: Document[D]](fieldName: String,
} else {
def fs: Field.Store = if (store) Field.Store.YES else Field.Store.NO

getJson(doc).flatMap {
val filterField = getJson(doc).flatMap {
case Null => None
case Str(s, _) => Some(new StringField(fieldName, s, fs))
case Bool(b, _) => Some(new StringField(fieldName, b.toString, fs))
case NumInt(l, _) => Some(new LongField(fieldName, l, fs))
case NumDec(bd, _) => Some(new DoubleField(fieldName, bd.toDouble, fs))
case obj: Obj if obj.reference.nonEmpty => obj.reference.get match {
case GeoPoint(latitude, longitude) => Some(new LatLonPoint(fieldName, latitude, longitude))
case ref => throw new RuntimeException(s"Unsupported object reference: $ref for JSON: $obj")
}
case json => throw new RuntimeException(s"Unsupported JSON: $json (${rw.definition})")
}
val sortField = if (sorted) {
getJson(doc).flatMap {
case obj: Obj if obj.reference.nonEmpty => obj.reference.get match {
case GeoPoint(latitude, longitude) => Some(new LatLonDocValuesField(fieldSortName, latitude, longitude))
case _ => None
}
case _ => None
}
} else {
Nil
}
filterField ::: sortField
}

protected[lightdb] def sortType: SortField.Type = rw.definition match {
Expand Down
5 changes: 4 additions & 1 deletion lucene/src/main/scala/lightdb/lucene/LuceneIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,23 @@ case class LuceneIndexer[D <: Document[D]](indexSupport: IndexSupport[D],
def apply[F](name: String,
get: D => List[F],
store: Boolean = false,
sorted: Boolean = false,
tokenized: Boolean = false)
(implicit rw: RW[F]): LuceneIndex[F, D] = LuceneIndex(
fieldName = name,
indexSupport = indexSupport,
get = get,
store = store,
sorted = sorted,
tokenized = tokenized
)

def one[F](name: String,
get: D => F,
store: Boolean = false,
sorted: Boolean = false,
tokenized: Boolean = false)
(implicit rw: RW[F]): LuceneIndex[F, D] = apply[F](name, doc => List(get(doc)), store, tokenized)
(implicit rw: RW[F]): LuceneIndex[F, D] = apply[F](name, doc => List(get(doc)), store, sorted, tokenized)

override def commit(): IO[Unit] = IO(commitBlocking())

Expand Down
8 changes: 6 additions & 2 deletions lucene/src/main/scala/lightdb/lucene/LuceneSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import fabric.define.DefType
import lightdb._
import lightdb.index.{IndexSupport, IndexedField}
import lightdb.query.{Filter, PageContext, PagedResults, Query, SearchContext, Sort}
import org.apache.lucene.document.LatLonDocValuesField
import org.apache.lucene.search.{IndexSearcher, MatchAllDocsQuery, ScoreDoc, SearcherFactory, SearcherManager, SortField, SortedNumericSortField, TopFieldDocs, Query => LuceneQuery, Sort => LuceneSort}
import org.apache.lucene.index.StoredFields

Expand All @@ -25,6 +26,9 @@ trait LuceneSupport[D <: Document[D]] extends IndexSupport[D] {
case DefType.Str => new SortField(field.fieldName, f.sortType, reverse)
case d => throw new RuntimeException(s"Unsupported sort definition: $d")
}
case Sort.ByDistance(field, from) =>
val f = field.asInstanceOf[LuceneIndex[_, D]]
LatLonDocValuesField.newDistanceSort(f.fieldSortName, from.latitude, from.longitude)
}

override def doSearch(query: Query[D],
Expand All @@ -47,7 +51,7 @@ trait LuceneSupport[D <: Document[D]] extends IndexSupport[D] {
val scoreDocs: List[ScoreDoc] = topFieldDocs.scoreDocs.toList
val total: Int = topFieldDocs.totalHits.value.toInt
val storedFields: StoredFields = indexSearcher.storedFields()
val ids: List[Id[D]] = scoreDocs.map(doc => Id[D](storedFields.document(doc.doc).get("_id")))
val idsAndScores = scoreDocs.map(doc => Id[D](storedFields.document(doc.doc).get("_id")) -> doc.score.toDouble)
val indexContext = LucenePageContext(
context = context,
lastScoreDoc = scoreDocs.lastOption
Expand All @@ -57,7 +61,7 @@ trait LuceneSupport[D <: Document[D]] extends IndexSupport[D] {
context = indexContext,
offset = offset,
total = total,
ids = ids
idsAndScores = idsAndScores
)
}

Expand Down
2 changes: 1 addition & 1 deletion sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
context = SQLPageContext(context),
offset = offset,
total = total,
ids = data.ids,
idsAndScores = data.ids.map(id => id -> 0.0),
getter = data.lookup
)
} finally {
Expand Down

0 comments on commit 1ca51e2

Please sign in to comment.