Skip to content

Commit

Permalink
Cleaning up
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Mar 24, 2024
1 parent e1a8b81 commit 72e1fb6
Showing 1 changed file with 41 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import lightdb.index.{Indexer, SearchResult}
import lightdb.query.{Filter, Query}
import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, StoredFields}
import org.apache.lucene.index.{DirectoryReader, IndexReader, IndexWriter, IndexWriterConfig, StoredFields}
import org.apache.lucene.store.{ByteBuffersDirectory, FSDirectory, MMapDirectory}
import org.apache.lucene.document.{Field, IntField, TextField, Document => LuceneDocument}
import org.apache.lucene.queryparser.classic.QueryParser
import org.apache.lucene.search.{IndexSearcher, ScoreDoc}
import org.apache.lucene.search.{IndexSearcher, MatchAllDocsQuery, ScoreDoc, SearcherFactory, SearcherManager}

import java.nio.file.{Files, Path}
import java.util.Comparator
Expand All @@ -25,7 +25,7 @@ trait LuceneIndexerSupport {

case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
autoCommit: Boolean = false,
analyzer: Analyzer = new StandardAnalyzer) extends Indexer[D] {
analyzer: Analyzer = new StandardAnalyzer) extends Indexer[D] { i =>
private var disposed = false
private lazy val path: Option[Path] = collection.db.directory.map(_.resolve(collection.collectionName))
private lazy val directory = path
Expand All @@ -34,14 +34,14 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],

private lazy val config = new IndexWriterConfig(analyzer)
private lazy val indexWriter = new IndexWriter(directory, config)
private var _reader: LuceneReader = _

private def reader: LuceneReader = synchronized {
if (disposed) throw new RuntimeException("LuceneIndexer is already disposed")
if (_reader == null) {
_reader = new LuceneReader
private var _indexSearcher: IndexSearcher = _
private lazy val searcherManager = new SearcherManager(indexWriter, new SearcherFactory)
private def indexSearcher: IndexSearcher = synchronized {
if (_indexSearcher == null) {
_indexSearcher = searcherManager.acquire()
}
_reader
_indexSearcher
}

override def put(value: D): IO[D] = IO {
Expand All @@ -65,13 +65,40 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
override def commit(): IO[Unit] = IO {
indexWriter.flush()
indexWriter.commit()
i.synchronized {
if (_indexSearcher != null) {
searcherManager.release(_indexSearcher)
}
_indexSearcher = null
}
}

override def count(): IO[Long] = IO {
reader.count()
indexSearcher.count(new MatchAllDocsQuery)
}

override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = reader.search(query)
override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = {
val indexSearch = this.indexSearcher
val parser = new QueryParser("_id", analyzer)
val filters = query.filters.map {
case Filter.Equals(field, value) => s"${field.name}:$value"
case f => throw new UnsupportedOperationException(s"Unsupported filter: $f")
}
// TODO: Support filtering better
val filterString = filters match {
case f :: Nil => f
case list => list.mkString("(", " AND ", ")")
}
val q = parser.parse(filterString)
val topDocs = indexSearcher.search(q, query.batchSize)
val hits = topDocs.scoreDocs
val total = topDocs.totalHits.value
val storedFields = indexSearcher.storedFields()
fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*)
.map { sd =>
LuceneSearchResult(sd, total, query, storedFields)
}
}

override def truncate(): IO[Unit] = for {
_ <- close()
Expand All @@ -89,49 +116,16 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
indexWriter.flush()
indexWriter.commit()
indexWriter.close()
if (_reader != null) {
_reader.close()
if (_indexSearcher != null) {
searcherManager.release(_indexSearcher)
}
searcherManager.close()
}

override def dispose(): IO[Unit] = close().map { _ =>
disposed = true
}

class LuceneReader {
private lazy val indexReader = DirectoryReader.open(directory)
private lazy val indexSearcher = new IndexSearcher(indexReader)

def count(): Int = indexReader.getDocCount("_id")

def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = {
val parser = new QueryParser("_id", analyzer)
val filters = query.filters.map {
case Filter.Equals(field, value) => s"${field.name}:$value"
case f => throw new UnsupportedOperationException(s"Unsupported filter: $f")
}
// TODO: Support filtering better
val filterString = filters match {
case f :: Nil => f
case list => list.mkString("(", " AND ", ")")
}
val q = parser.parse(filterString)
val topDocs = indexSearcher.search(q, query.batchSize)
val hits = topDocs.scoreDocs
val total = topDocs.totalHits.value
val storedFields = indexSearcher.storedFields()
fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*)
.map { sd =>
LuceneSearchResult(sd, total, query, storedFields)
}
}

def close(): Unit = {
indexReader.close()
_reader = null
}
}

private case class LuceneSearchResult(scoreDoc: ScoreDoc,
total: Long,
query: Query[D],
Expand Down

0 comments on commit 72e1fb6

Please sign in to comment.