From 72e1fb64dd5fc5e998f8686786bd227da5b31429 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Sun, 24 Mar 2024 13:23:18 -0500 Subject: [PATCH] Cleaning up --- .../index/lucene/LuceneIndexerSupport.scala | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/lucene/src/main/scala/lightdb/index/lucene/LuceneIndexerSupport.scala b/lucene/src/main/scala/lightdb/index/lucene/LuceneIndexerSupport.scala index 9629635d..1c50d146 100644 --- a/lucene/src/main/scala/lightdb/index/lucene/LuceneIndexerSupport.scala +++ b/lucene/src/main/scala/lightdb/index/lucene/LuceneIndexerSupport.scala @@ -7,11 +7,11 @@ import lightdb.index.{Indexer, SearchResult} import lightdb.query.{Filter, Query} import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.standard.StandardAnalyzer -import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, StoredFields} +import org.apache.lucene.index.{DirectoryReader, IndexReader, IndexWriter, IndexWriterConfig, StoredFields} import org.apache.lucene.store.{ByteBuffersDirectory, FSDirectory, MMapDirectory} import org.apache.lucene.document.{Field, IntField, TextField, Document => LuceneDocument} import org.apache.lucene.queryparser.classic.QueryParser -import org.apache.lucene.search.{IndexSearcher, ScoreDoc} +import org.apache.lucene.search.{IndexSearcher, MatchAllDocsQuery, ScoreDoc, SearcherFactory, SearcherManager} import java.nio.file.{Files, Path} import java.util.Comparator @@ -25,7 +25,7 @@ trait LuceneIndexerSupport { case class LuceneIndexer[D <: Document[D]](collection: Collection[D], autoCommit: Boolean = false, - analyzer: Analyzer = new StandardAnalyzer) extends Indexer[D] { + analyzer: Analyzer = new StandardAnalyzer) extends Indexer[D] { i => private var disposed = false private lazy val path: Option[Path] = collection.db.directory.map(_.resolve(collection.collectionName)) private lazy val directory = path @@ -34,14 +34,14 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D], private lazy val config = new IndexWriterConfig(analyzer) private lazy val indexWriter = new IndexWriter(directory, config) - private var _reader: LuceneReader = _ - private def reader: LuceneReader = synchronized { - if (disposed) throw new RuntimeException("LuceneIndexer is already disposed") - if (_reader == null) { - _reader = new LuceneReader + private var _indexSearcher: IndexSearcher = _ + private lazy val searcherManager = new SearcherManager(indexWriter, new SearcherFactory) + private def indexSearcher: IndexSearcher = synchronized { + if (_indexSearcher == null) { + _indexSearcher = searcherManager.acquire() } - _reader + _indexSearcher } override def put(value: D): IO[D] = IO { @@ -65,13 +65,40 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D], override def commit(): IO[Unit] = IO { indexWriter.flush() indexWriter.commit() + i.synchronized { + if (_indexSearcher != null) { + searcherManager.release(_indexSearcher) + } + _indexSearcher = null + } } override def count(): IO[Long] = IO { - reader.count() + indexSearcher.count(new MatchAllDocsQuery) } - override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = reader.search(query) + override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = { + val indexSearch = this.indexSearcher + val parser = new QueryParser("_id", analyzer) + val filters = query.filters.map { + case Filter.Equals(field, value) => s"${field.name}:$value" + case f => throw new UnsupportedOperationException(s"Unsupported filter: $f") + } + // TODO: Support filtering better + val filterString = filters match { + case f :: Nil => f + case list => list.mkString("(", " AND ", ")") + } + val q = parser.parse(filterString) + val topDocs = indexSearcher.search(q, query.batchSize) + val hits = topDocs.scoreDocs + val total = topDocs.totalHits.value + val storedFields = indexSearcher.storedFields() + fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*) + .map { sd => + LuceneSearchResult(sd, total, query, storedFields) + } + } override def truncate(): IO[Unit] = for { _ <- close() @@ -89,49 +116,16 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D], indexWriter.flush() indexWriter.commit() indexWriter.close() - if (_reader != null) { - _reader.close() + if (_indexSearcher != null) { + searcherManager.release(_indexSearcher) } + searcherManager.close() } override def dispose(): IO[Unit] = close().map { _ => disposed = true } - class LuceneReader { - private lazy val indexReader = DirectoryReader.open(directory) - private lazy val indexSearcher = new IndexSearcher(indexReader) - - def count(): Int = indexReader.getDocCount("_id") - - def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = { - val parser = new QueryParser("_id", analyzer) - val filters = query.filters.map { - case Filter.Equals(field, value) => s"${field.name}:$value" - case f => throw new UnsupportedOperationException(s"Unsupported filter: $f") - } - // TODO: Support filtering better - val filterString = filters match { - case f :: Nil => f - case list => list.mkString("(", " AND ", ")") - } - val q = parser.parse(filterString) - val topDocs = indexSearcher.search(q, query.batchSize) - val hits = topDocs.scoreDocs - val total = topDocs.totalHits.value - val storedFields = indexSearcher.storedFields() - fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*) - .map { sd => - LuceneSearchResult(sd, total, query, storedFields) - } - } - - def close(): Unit = { - indexReader.close() - _reader = null - } - } - private case class LuceneSearchResult(scoreDoc: ScoreDoc, total: Long, query: Query[D],