Skip to content

Commit

Permalink
Testing multi-indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Apr 6, 2024
1 parent b2de848 commit edc80d0
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 43 deletions.
51 changes: 24 additions & 27 deletions all/src/test/scala/spec/SimpleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,30 +69,32 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
}
}
"search by name for positive result" in {
db.people.query.filter(Person.name is "Jane Doe").stream().compile.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
doc(Person.name) should be("Jane Doe")
doc(Person.age) should be(19)
db.people.query.filter(Person.name is "Jane Doe").withStream { stream =>
stream.compile.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
doc(Person.name) should be("Jane Doe")
doc(Person.age) should be(19)
}
}
}
"search by age for positive result" in {
db.people.query.filter(Person.age is 19).stream().compile.toList.map { results =>
db.people.query.filter(Person.age is 19).toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
doc(Person.name) should be("Jane Doe")
doc(Person.age) should be(19)
doc._id should be(id2)
doc.name should be("Jane Doe")
doc.age should be(19)
}
}
"search by id for John" in {
db.people.query.filter(Person._id is id1).stream().compile.toList.map { results =>
db.people.query.filter(Person._id is id1).toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id1)
doc(Person.name) should be("John Doe")
doc(Person.age) should be(21)
doc._id should be(id1)
doc.name should be("John Doe")
doc.age should be(21)
}
}
"delete John" in {
Expand All @@ -112,17 +114,12 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
}
}
"list all documents" in {
db.people.query.stream().compile.toList.flatMap { results =>
db.people.query.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
doc(Person.name) should be("Jane Doe")
doc(Person.age) should be(19)
doc.get().map { person =>
person._id should be(id2)
person.name should be("Jane Doe")
person.age should be(19)
}
doc._id should be(id2)
doc.name should be("Jane Doe")
doc.age should be(19)
}
}
// TODO: search for an item by name and by age range
Expand All @@ -142,12 +139,12 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
db.people.commit()
}
"list new documents" in {
db.people.query.stream().compile.toList.map { results =>
db.people.query.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
doc(Person.name) should be("Jan Doe")
doc(Person.age) should be(20)
doc._id should be(id2)
doc.name should be("Jan Doe")
doc.age should be(20)
}
}
"verify start time has been set" in {
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/index/Indexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ trait Indexer[D <: Document[D]] {

def count(): IO[Int]

def search(query: Query[D]): IO[SearchResults[D]]
def search[Return](query: Query[D])(f: SearchResults[D] => IO[Return]): IO[Return]

def truncate(): IO[Unit]

Expand Down
3 changes: 2 additions & 1 deletion core/shared/src/main/scala/lightdb/index/NullIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ case class NullIndexer[D <: Document[D]](collection: Collection[D]) extends Inde

override def count(): IO[Int] = IO.pure(0)

override def search(query: Query[D]): IO[SearchResults[D]] = IO.pure(SearchResults(query, 0, fs2.Stream.empty))
override def search[Return](query: Query[D])(f: SearchResults[D] => IO[Return]): IO[Return] =
f(SearchResults(query, 0, fs2.Stream.empty))

override def truncate(): IO[Unit] = IO.unit

Expand Down
24 changes: 20 additions & 4 deletions core/shared/src/main/scala/lightdb/query/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,25 @@ case class Query[D <: Document[D]](collection: Collection[D],
def sort(sort: Sort*): Query[D] = copy(sort = this.sort ::: sort.toList)
def limit(limit: Int): Query[D] = copy(limit = limit)
def scoreDocs(b: Boolean = true): Query[D] = copy(scoreDocs = b)
def search(): IO[SearchResults[D]] = collection.indexer.search(this)
def stream(): fs2.Stream[IO, SearchResult[D]] = fs2.Stream.force(search().map(_.stream))
def documentsStream(): fs2.Stream[IO, D] = stream().evalMap(_.get())
def search[Return](f: SearchResults[D] => IO[Return]): IO[Return] = collection.indexer.search(this)(f)
def withStream[Return](f: fs2.Stream[IO, SearchResult[D]] => IO[Return]): IO[Return] = search { results =>
f(results.stream)
}
def withDocumentsStream[Return](f: fs2.Stream[IO, D] => IO[Return]): IO[Return] = withStream { stream =>
f(stream.evalMap(_.get()))
}
def toList: IO[List[D]] = withDocumentsStream { stream =>
stream.compile.toList
}
def toResults: IO[MaterializedResults[D]] = search { searchResults =>
searchResults.documentsStream().compile.toList.map { list =>
MaterializedResults(searchResults.query, searchResults.total, list)
}
}

def matches(document: D): Boolean = filter.forall(_.matches(document))
}
}

case class MaterializedResults[D <: Document[D]](query: Query[D],
total: Int,
list: List[D])
5 changes: 3 additions & 2 deletions halo/src/main/scala/lightdb/store/halo/HaloIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ case class HaloIndexer[D <: Document[D]](collection: Collection[D]) extends Inde
override def delete(id: Id[D]): IO[Unit] = IO.unit
override def commit(): IO[Unit] = IO.unit
override def count(): IO[Int] = collection.store.all().compile.count.map(_.toInt)
override def search(query: Query[D]): IO[SearchResults[D]] = IO {
override def search[Return](query: Query[D])(f: SearchResults[D] => IO[Return]): IO[Return] = {
val stream = collection.store
.all[D]()
.map { t =>
Expand All @@ -26,8 +26,9 @@ case class HaloIndexer[D <: Document[D]](collection: Collection[D]) extends Inde
override def apply[F](field: Field[D, F]): F = field.getter(document)
}
}
SearchResults(query, -1, stream)
f(SearchResults(query, -1, stream))
}

override def truncate(): IO[Unit] = IO.unit
override def dispose(): IO[Unit] = IO.unit
}
15 changes: 7 additions & 8 deletions lucene/src/main/scala/lightdb/index/lucene/LuceneIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
case _ => throw new RuntimeException(s"Unsupported filter: $filter")
}

override def search(query: Query[D]): IO[SearchResults[D]] = IO {
override def search[Return](query: Query[D])(f: SearchResults[D] => IO[Return]): IO[Return] = {
val q = query.filter.map(filter2Query).getOrElse(new MatchAllDocsQuery)
val sortFields = if (query.sort.isEmpty) {
List(SortField.FIELD_SCORE)
Expand All @@ -161,18 +161,17 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
}
}
// TODO: Offset
val (hits, storedFields, total) = withIndexSearcher { indexSearcher =>
searchManager { indexSearcher =>
val topDocs = indexSearcher.search(q, query.limit, new Sort(sortFields: _*), query.scoreDocs)
val hits = topDocs.scoreDocs
val total = topDocs.totalHits.value.toInt
val storedFields = indexSearcher.storedFields()
(hits, storedFields, total)
val stream = fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*)
.map { sd =>
LuceneSearchResult(sd, collection, storedFields)
}
f(SearchResults(query, total, stream))
}
val stream = fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*)
.map { sd =>
LuceneSearchResult(sd, collection, storedFields)
}
SearchResults(query, total, stream)
}

override def truncate(): IO[Unit] = IO {
Expand Down

0 comments on commit edc80d0

Please sign in to comment.