Skip to content

Commit

Permalink
Preliminary extraction of Lucene successful
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Apr 9, 2024
1 parent 051a3c7 commit 785d21c
Show file tree
Hide file tree
Showing 17 changed files with 487 additions and 173 deletions.
16 changes: 16 additions & 0 deletions core/src/main/scala/lightdb/index/IndexManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package lightdb.index

import cats.effect.IO
import lightdb.Document

trait IndexManager[D <: Document[D]] {
protected var _fields = List.empty[IndexedField[_, D]]

def fields: List[IndexedField[_, D]] = _fields

def count(): IO[Int]

protected[lightdb] def register[F](field: IndexedField[F, D]): Unit = synchronized {
_fields = field :: _fields
}
}
19 changes: 7 additions & 12 deletions core/src/main/scala/lightdb/index/IndexSupport.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
package lightdb.index

import cats.effect.IO
import lightdb.query.Query
import lightdb.query.{PagedResults, Query, SearchContext}
import lightdb.{Collection, Document}

trait IndexSupport[D <: Document[D], IF[F] <: IndexedField[F, D]] extends Collection[D] {
trait IndexSupport[D <: Document[D]] extends Collection[D] {
lazy val query: Query[D] = Query(this)

protected def indexer: Indexer[D]

override def commit(): IO[Unit] = super.commit().flatMap(_ => indexer.commit())

def index: IndexManager[D, IF]
}
def index: IndexManager[D]

trait IndexManager[D <: Document[D], IF <: IndexedField[_, D]] {
protected var _fields = List.empty[IF]

def fields: List[IF] = _fields

protected[lightdb] def register[F, Field <: IF with IndexedField[F, D]](field: Field): Unit = synchronized {
_fields = field :: _fields
}
def doSearch(query: Query[D],
context: SearchContext[D],
offset: Int,
after: Option[PagedResults[D]]): IO[PagedResults[D]]
}
7 changes: 5 additions & 2 deletions core/src/main/scala/lightdb/index/Indexer.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package lightdb.index

import cats.effect.IO
import lightdb.{Collection, Document, Id}
import lightdb.query.SearchContext
import lightdb.{Collection, Document, Id, query}

trait Indexer[D <: Document[D]] {
def collection: Collection[D]
def indexSupport: IndexSupport[D]

private[lightdb] def delete(id: Id[D]): IO[Unit]

def commit(): IO[Unit]

def count(): IO[Int]

def withSearchContext[Return](f: SearchContext[D] => IO[Return]): IO[Return]
}
8 changes: 8 additions & 0 deletions core/src/main/scala/lightdb/query/IndexContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package lightdb.query

import cats.effect.IO
import lightdb.Document

trait IndexContext[D <: Document[D]] {
def nextPage(currentPage: PagedResults[D]): IO[Option[PagedResults[D]]]
}
10 changes: 3 additions & 7 deletions core/src/main/scala/lightdb/query/PagedResults.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import cats.effect.IO
import cats.implicits.toTraverseOps
import lightdb.{Document, Id}

trait IndexContext[D <: Document[D]] {
def nextPage(): IO[Option[PagedResults[D]]]
}

case class PagedResults[D <: Document[D]](query: Query[D],
context: IndexContext[D],
offset: Int,
Expand All @@ -17,11 +13,11 @@ case class PagedResults[D <: Document[D]](query: Query[D],
lazy val pages: Int = math.ceil(total.toDouble / query.pageSize.toDouble).toInt

def stream: fs2.Stream[IO, D] = fs2.Stream(ids: _*)
.evalMap(id => query.collection(id))
.evalMap(id => query.indexSupport(id))

def docs: IO[List[D]] = ids.map(id => query.collection(id)).sequence
def docs: IO[List[D]] = ids.map(id => query.indexSupport(id)).sequence

def hasNext: Boolean = pages > (page + 1)

def next(): IO[Option[PagedResults[D]]] = context.nextPage()
def next(): IO[Option[PagedResults[D]]] = context.nextPage(this)
}
43 changes: 5 additions & 38 deletions core/src/main/scala/lightdb/query/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package lightdb.query

import cats.effect.IO
import lightdb.index.IndexSupport
import lightdb.{Collection, Document, Id}
import org.apache.lucene.index.StoredFields
import org.apache.lucene.search.{MatchAllDocsQuery, ScoreDoc, SortField, TopFieldDocs, Query => LuceneQuery, Sort => LuceneSort}
import lightdb.Document

case class Query[D <: Document[D]](filter: Option[Filter[D]] = None,
case class Query[D <: Document[D]](indexSupport: IndexSupport[D],
filter: Option[Filter[D]] = None,
sort: List[Sort] = Nil,
scoreDocs: Boolean = false,
pageSize: Int = 1_000) {
Expand All @@ -16,7 +15,8 @@ case class Query[D <: Document[D]](filter: Option[Filter[D]] = None,
def scoreDocs(b: Boolean): Query[D] = copy(scoreDocs = b)
def pageSize(size: Int): Query[D] = copy(pageSize = size)

def search()(implicit context: SearchContext[D]): IO[PagedResults[D]] = doSearch(
def search()(implicit context: SearchContext[D]): IO[PagedResults[D]] = indexSupport.doSearch(
query = this,
context = context,
offset = 0,
after = None
Expand All @@ -30,37 +30,4 @@ case class Query[D <: Document[D]](filter: Option[Filter[D]] = None,
fs2.Stream.force(io)
.flatMap(_.stream)
}

private[query] def doSearch(context: SearchContext[D],
offset: Int,
after: Option[ScoreDoc]): IO[PagedResults[D]] = IO {
val q = filter.map(_.asQuery).getOrElse(new MatchAllDocsQuery)
val sortFields = sort match {
case Nil => List(SortField.FIELD_SCORE)
case _ => sort.map(sort2SortField)
}
val s = new LuceneSort(sortFields: _*)
val topFieldDocs: TopFieldDocs = after match {
case Some(scoreDoc) => context.indexSearcher.searchAfter(scoreDoc, q, pageSize, s, this.scoreDocs)
case None => context.indexSearcher.search(q, pageSize, s, this.scoreDocs)
}
val scoreDocs: List[ScoreDoc] = topFieldDocs.scoreDocs.toList
val total: Int = topFieldDocs.totalHits.value.toInt
val storedFields: StoredFields = context.indexSearcher.storedFields()
val ids: List[Id[D]] = scoreDocs.map(doc => Id[D](storedFields.document(doc.doc).get("_id")))
PagedResults(
query = this,
context = context,
offset = offset,
total = total,
ids = ids,
lastScoreDoc = scoreDocs.lastOption
)
}

private[query] def sort2SortField(sort: Sort): SortField = sort match {
case Sort.BestMatch => SortField.FIELD_SCORE
case Sort.IndexOrder => SortField.FIELD_DOC
case Sort.ByField(field, reverse) => new SortField(field.fieldName, field.sortType, reverse)
}
}
6 changes: 6 additions & 0 deletions core/src/main/scala/lightdb/query/SearchContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package lightdb.query

import lightdb.index.IndexSupport
import lightdb.Document

case class SearchContext[D <: Document[D]](indexSupport: IndexSupport[D])
7 changes: 7 additions & 0 deletions lucene/src/main/scala/lightdb/lucene/LuceneFilter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package lightdb.lucene

import lightdb.Document
import lightdb.query.Filter
import org.apache.lucene.search.{Query => LuceneQuery}

case class LuceneFilter[D <: Document[D]](asQuery: () => LuceneQuery) extends Filter[D]
20 changes: 20 additions & 0 deletions lucene/src/main/scala/lightdb/lucene/LuceneIndexContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package lightdb.lucene

import cats.effect.IO
import lightdb.Document
import lightdb.query.{IndexContext, PagedResults, SearchContext}
import org.apache.lucene.search.ScoreDoc

case class LuceneIndexContext[D <: Document[D]](context: SearchContext[D],
lastScoreDoc: Option[ScoreDoc]) extends IndexContext[D] {
override def nextPage(currentPage: PagedResults[D]): IO[Option[PagedResults[D]]] = if (currentPage.hasNext) {
currentPage.query.indexSupport.doSearch(
query = currentPage.query,
context = context,
offset = currentPage.offset + currentPage.query.pageSize,
after = Some(currentPage)
).map(Some.apply)
} else {
IO.pure(None)
}
}
22 changes: 22 additions & 0 deletions lucene/src/main/scala/lightdb/lucene/LuceneIndexManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package lightdb.lucene

import cats.effect.IO
import lightdb.Document
import lightdb.index.IndexManager
import lightdb.lucene.index._

case class LuceneIndexManager[D <: Document[D]](indexSupport: LuceneSupport[D]) extends IndexManager[D] {
def apply(name: String): IndexedFieldBuilder = IndexedFieldBuilder(name)

override def count(): IO[Int] = indexSupport.indexer.count()

case class IndexedFieldBuilder(fieldName: String) {
def tokenized(f: D => String): TokenizedField[D] = TokenizedField(fieldName, indexSupport, f)
def string(f: D => String, store: Boolean = false): StringField[D] = StringField(fieldName, indexSupport, f, store)
def int(f: D => Int): IntField[D] = IntField(fieldName, indexSupport, f)
def long(f: D => Long): LongField[D] = LongField(fieldName, indexSupport, f)
def float(f: D => Float): FloatField[D] = FloatField(fieldName, indexSupport, f)
def double(f: D => Double): DoubleField[D] = DoubleField(fieldName, indexSupport, f)
def bigDecimal(f: D => BigDecimal): BigDecimalField[D] = BigDecimalField(fieldName, indexSupport, f)
}
}
14 changes: 14 additions & 0 deletions lucene/src/main/scala/lightdb/lucene/LuceneIndexedField.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package lightdb.lucene

import lightdb.Document
import lightdb.index.IndexedField
import org.apache.lucene.search.SortField
import org.apache.lucene.{document => ld}

trait LuceneIndexedField[F, D <: Document[D]] extends IndexedField[F, D] {
protected[lightdb] def createFields(doc: D): List[ld.Field]

protected[lightdb] def sortType: SortField.Type

collection.asInstanceOf[LuceneSupport[D]].index.register(this)
}
72 changes: 72 additions & 0 deletions lucene/src/main/scala/lightdb/lucene/LuceneIndexer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package lightdb.lucene

import cats.effect.IO
import lightdb.index.{IndexSupport, Indexer}
import lightdb.query.SearchContext
import lightdb.{Document, Id}
import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.index.{IndexWriter, IndexWriterConfig, Term}
import org.apache.lucene.queryparser.classic.QueryParser
import org.apache.lucene.search.{IndexSearcher, MatchAllDocsQuery, SearcherFactory, SearcherManager}
import org.apache.lucene.store.{ByteBuffersDirectory, FSDirectory}
import org.apache.lucene.document.{Document => LuceneDocument, Field => LuceneField}

import java.nio.file.{Files, Path}
import java.util.concurrent.ConcurrentHashMap

case class LuceneIndexer[D <: Document[D]](indexSupport: IndexSupport[D],
persistent: Boolean = true,
analyzer: Analyzer = new StandardAnalyzer) extends Indexer[D] {
private lazy val path: Option[Path] = if (persistent) {
val p = indexSupport.db.directory.resolve(indexSupport.collectionName).resolve("index")
Files.createDirectories(p)
Some(p)
} else {
None
}
private lazy val directory = path
.map(p => FSDirectory.open(p))
.getOrElse(new ByteBuffersDirectory())
private lazy val config = new IndexWriterConfig(analyzer)
private lazy val indexWriter = new IndexWriter(directory, config)

private lazy val searcherManager = new SearcherManager(indexWriter, new SearcherFactory)

private lazy val parser = new QueryParser("_id", analyzer)

private[lucene] val contextMapping = new ConcurrentHashMap[SearchContext[D], IndexSearcher]

override def withSearchContext[Return](f: SearchContext[D] => IO[Return]): IO[Return] = {
val indexSearcher = searcherManager.acquire()
val context = SearchContext(indexSupport)
contextMapping.put(context, indexSearcher)
f(context)
.guarantee(IO {
contextMapping.remove(context)
searcherManager.release(indexSearcher)
})
}

private[lightdb] def addDoc(id: Id[D], fields: List[LuceneField]): Unit = if (fields.length > 1) {
val document = new LuceneDocument
fields.foreach(document.add)
indexWriter.updateDocument(new Term("_id", id.value), document)
}

private[lightdb] def delete(id: Id[D]): IO[Unit] = IO {
indexWriter.deleteDocuments(parser.parse(s"_id:${id.value}"))
}

private def commitBlocking(): Unit = {
indexWriter.flush()
indexWriter.commit()
searcherManager.maybeRefreshBlocking()
}

override def commit(): IO[Unit] = IO(commitBlocking())

override def count(): IO[Int] = withSearchContext { context =>
IO(context.indexSupport.asInstanceOf[LuceneSupport[D]].indexSearcher(context).count(new MatchAllDocsQuery))
}
}
Loading

0 comments on commit 785d21c

Please sign in to comment.