diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java index e566f510f4dba..b394b50683e06 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java @@ -18,10 +18,8 @@ */ package org.elasticsearch.common.lucene.index; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterDirectoryReader; -import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.*; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -76,4 +74,38 @@ public LeafReader wrap(LeafReader reader) { } } + /** + * Adds the given listener to the provided directory reader. The reader must contain an {@link ElasticsearchDirectoryReader} in it's hierarchy + * otherwise we can't safely install the listener. + * + * @throws IllegalArgumentException if the reader doesn't contain an {@link ElasticsearchDirectoryReader} in it's hierarchy + */ + @SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener") + public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) { + ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader); + if (elasticsearchDirectoryReader != null) { + assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey(); + elasticsearchDirectoryReader.addReaderClosedListener(listener); + return; + } + throw new IllegalArgumentException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader"); + } + + /** + * Tries to unwrap the given reader until the first {@link ElasticsearchDirectoryReader} instance is found or null if no instance is found; + */ + public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) { + if (reader instanceof FilterDirectoryReader) { + if (reader instanceof ElasticsearchDirectoryReader) { + return (ElasticsearchDirectoryReader) reader; + } else { + // We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because + // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately + // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that + // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. + return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate()); + } + } + return null; + } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java index 60a956b1f338f..aff0fa69f0986 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java @@ -18,10 +18,7 @@ */ package org.elasticsearch.common.lucene.index; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterDirectoryReader; -import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.*; import org.elasticsearch.index.shard.ShardId; /** @@ -38,7 +35,7 @@ public final class ElasticsearchLeafReader extends FilterLeafReader { * * @param in specified base reader. */ - ElasticsearchLeafReader(LeafReader in, ShardId shardId) { + public ElasticsearchLeafReader(LeafReader in, ShardId shardId) { super(in); this.shardId = shardId; } @@ -55,8 +52,18 @@ public Object getCoreCacheKey() { return in.getCoreCacheKey(); } - @Override - public Object getCombinedCoreAndDeletesKey() { - return in.getCombinedCoreAndDeletesKey(); + public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) { + if (reader instanceof FilterLeafReader) { + if (reader instanceof ElasticsearchLeafReader) { + return (ElasticsearchLeafReader) reader; + } else { + // We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because + // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately + // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that + // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. + return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate()); + } + } + return null; } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java index a0cf923c5f10e..a7993384267dc 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.lucene.uid; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReader.CoreClosedListener; diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index c07be06448991..1431cbd4f9d17 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -596,6 +596,13 @@ public IndexReader reader() { return searcher.getIndexReader(); } + public DirectoryReader getDirectoryReader() { + if (reader() instanceof DirectoryReader) { + return (DirectoryReader) reader(); + } + throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader"); + } + public IndexSearcher searcher() { return searcher; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3973b47f3ac3a..75bcdfa552e9d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.settings.Settings; @@ -905,9 +906,10 @@ private IndexWriter createWriter(boolean create) throws IOException { @Override public void warm(LeafReader reader) throws IOException { try { - assert isMergedSegment(reader); + LeafReader esLeafReader = new ElasticsearchLeafReader(reader, shardId); + assert isMergedSegment(esLeafReader); if (warmer != null) { - final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(reader, null)); + final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(esLeafReader, null)); final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher); warmer.warmNewReaders(context); } @@ -949,6 +951,12 @@ final static class SearchFactory extends EngineSearcherFactory { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { IndexSearcher searcher = super.newSearcher(reader, previousReader); + if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) { + // we call newSearcher from the IndexReaderWarmer which warms segments during merging + // in that case the reader is a LeafReader and all we need to do is to build a new Searcher + // and return it since it does it's own warming for that particular reader. + return searcher; + } if (warmer != null) { // we need to pass a custom searcher that does not release anything on Engine.Search Release, // we will release explicitly @@ -986,10 +994,11 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) } if (newSearcher != null) { - IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher)); + IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("new_reader_warming", newSearcher)); warmer.warmNewReaders(context); } - warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher))); + assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass(); + warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("top_reader_warming", searcher))); } catch (Throwable e) { if (isEngineClosed.get() == false) { logger.warn("failed to prepare/warm", e); diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java index 0d88ab7d3fe47..e549eb32dd6a0 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.fielddata; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSet; @@ -235,11 +236,11 @@ IndexFieldData build(Index index, @IndexSettings Settings indexSettings, Mapp CircuitBreakerService breakerService, MapperService mapperService); } - public static interface Global extends IndexFieldData { + interface Global extends IndexFieldData { - IndexFieldData loadGlobal(IndexReader indexReader); + IndexFieldData loadGlobal(DirectoryReader indexReader); - IndexFieldData localGlobalDirect(IndexReader indexReader) throws Exception; + IndexFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception; } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java index 6ea49650a851d..dc0db3032393a 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java @@ -19,10 +19,10 @@ package org.elasticsearch.index.fielddata; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; -import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.shard.ShardId; @@ -33,7 +33,7 @@ public interface IndexFieldDataCache { > FD load(LeafReaderContext context, IFD indexFieldData) throws Exception; - > IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception; + > IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception; /** * Clears all the field data stored cached in on this index. @@ -67,7 +67,7 @@ public > FD load(Leaf @Override @SuppressWarnings("unchecked") - public > IFD load(IndexReader indexReader, IFD indexFieldData) throws Exception { + public > IFD load(DirectoryReader indexReader, IFD indexFieldData) throws Exception { return (IFD) indexFieldData.localGlobalDirect(indexReader); } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java index dd4e714dadac3..cb1471179c2d7 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.fielddata; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -33,12 +34,12 @@ public interface IndexOrdinalsFieldData extends IndexFieldData.Global build(Index index, @IndexSettings Settings indexSetting } @Override - public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { + public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) { if (indexReader.leaves().size() <= 1) { // ordinals are already global return this; @@ -170,7 +170,7 @@ public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldDat } @Override - public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception { + public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception { final long startTime = System.nanoTime(); long ramBytesUsed = 0; @@ -347,7 +347,7 @@ public Collection getChildResources() { } @Override - public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { + public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) { if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) { return this; } @@ -355,7 +355,7 @@ public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { } @Override - public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception { + public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception { return loadGlobal(indexReader); } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java index 9d29b3b1a8a2c..a9f324b400f1e 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.fielddata.plain; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.IndexReader; import org.elasticsearch.ElasticsearchException; @@ -61,7 +62,7 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context) throws Exce } @Override - public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) { + public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) { if (indexReader.leaves().size() <= 1) { // ordinals are already global return this; @@ -78,7 +79,7 @@ public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) { } @Override - public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception { + public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception { return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger); } } diff --git a/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java b/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java index 1b42dcd85d225..a7b7506ebae3e 100644 --- a/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java +++ b/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.index.query; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.MultiDocValues; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.join.JoinUtil; import org.apache.lucene.search.join.ScoreMode; @@ -287,12 +289,23 @@ public Query rewrite(IndexReader reader) throws IOException { if (getBoost() != 1.0F) { return super.rewrite(reader); } - String joinField = ParentFieldMapper.joinField(parentType); - IndexSearcher indexSearcher = new IndexSearcher(reader); - indexSearcher.setQueryCache(null); - IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal(indexSearcher.getIndexReader()); - MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType); - return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren); + if (reader instanceof DirectoryReader) { + String joinField = ParentFieldMapper.joinField(parentType); + IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal((DirectoryReader) reader); + MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType); + return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren); + } else { + if (reader.leaves().isEmpty() && reader.numDocs() == 0) { + // asserting reader passes down a MultiReader during rewrite which makes this + // blow up since for this query to work we have to have a DirectoryReader otherwise + // we can't load global ordinals - for this to work we simply check if the reader has no leaves + // and rewrite to match nothing + return new MatchNoDocsQuery(); + } + throw new IllegalStateException("can't load global ordinals for reader of type: " + reader.getClass() + " must be a DirectoryReader"); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index c75f3c7995f0a..dff59e9b24431 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -20,26 +20,38 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineException; import java.io.IOException; /** * Extension point to add custom functionality at request time to the {@link DirectoryReader} - * and {@link IndexSearcher} managed by the {@link Engine}. + * and {@link IndexSearcher} managed by the {@link IndexShard}. */ -public interface IndexSearcherWrapper { +public class IndexSearcherWrapper { /** + * Wraps the given {@link DirectoryReader}. The wrapped reader can filter out document just like delete documents etc. but + * must not change any term or document content. + *

+ * NOTE: The wrapper has a per-request lifecycle, must delegate {@link IndexReader#getCoreCacheKey()} and must be an instance + * of {@link FilterDirectoryReader} that eventually exposes the original reader via {@link FilterDirectoryReader#getDelegate()}. + * The returned reader is closed once it goes out of scope. + *

* @param reader The provided directory reader to be wrapped to add custom functionality * @return a new directory reader wrapping the provided directory reader or if no wrapping was performed * the provided directory reader */ - DirectoryReader wrap(DirectoryReader reader) throws IOException; + protected DirectoryReader wrap(DirectoryReader reader) throws IOException { + return reader; + } /** * @param engineConfig The engine config which can be used to get the query cache and query cache policy from @@ -48,34 +60,87 @@ public interface IndexSearcherWrapper { * @return a new index searcher wrapping the provided index searcher or if no wrapping was performed * the provided index searcher */ - IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException; - + protected IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException { + return searcher; + } /** * If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher * gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned. * * This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search) */ - default Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException { - DirectoryReader reader = wrap((DirectoryReader) engineSearcher.reader()); - IndexSearcher innerIndexSearcher = new IndexSearcher(reader); + public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException { + final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.getDirectoryReader()); + if (elasticsearchDirectoryReader == null) { + throw new IllegalStateException("Can't wrap non elasticsearch directory reader"); + } + NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader()); + DirectoryReader reader = wrap(nonClosingReaderWrapper); + if (reader != nonClosingReaderWrapper) { + if (reader.getCoreCacheKey() != elasticsearchDirectoryReader.getCoreCacheKey()) { + throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey, wrappers must override this method and delegate" + + " to the original readers core cache key. Wrapped readers can't be used as cache keys since their are used only per request which would lead to subtle bugs"); + } + if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) { + // prevent that somebody wraps with a non-filter reader + throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't"); + } + } + + final IndexSearcher innerIndexSearcher = new IndexSearcher(reader); innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); // TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point // For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten // This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times - IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher); - if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) { + final IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher); + if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) { return engineSearcher; } else { return new Engine.Searcher(engineSearcher.source(), indexSearcher) { @Override public void close() throws ElasticsearchException { - engineSearcher.close(); + try { + reader().close(); + // we close the reader to make sure wrappers can release resources if needed.... + // our NonClosingReaderWrapper makes sure that our reader is not closed + } catch (IOException e) { + throw new ElasticsearchException("failed to close reader", e); + } finally { + engineSearcher.close(); + } + } }; } } + private static final class NonClosingReaderWrapper extends FilterDirectoryReader { + + private NonClosingReaderWrapper(DirectoryReader in) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return reader; + } + }); + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new NonClosingReaderWrapper(in); + } + + @Override + protected void doClose() throws IOException { + // don't close here - mimic the MultiReader#doClose = false behavior that FilterDirectoryReader doesn't have + } + + @Override + public Object getCoreCacheKey() { + return in.getCoreCacheKey(); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java index f0f871952fa42..8860bd4274c05 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java @@ -34,7 +34,7 @@ private ShardUtils() {} */ @Nullable public static ShardId extractShardId(LeafReader reader) { - final ElasticsearchLeafReader esReader = getElasticsearchLeafReader(reader); + final ElasticsearchLeafReader esReader = ElasticsearchLeafReader.getElasticsearchLeafReader(reader); if (esReader != null) { assert reader.getRefCount() > 0 : "ElasticsearchLeafReader is already closed"; return esReader.shardId(); @@ -47,45 +47,14 @@ public static ShardId extractShardId(LeafReader reader) { * will return null. */ @Nullable - public static ShardId extractShardId(IndexReader reader) { - final ElasticsearchDirectoryReader esReader = getElasticsearchDirectoryReader(reader); + public static ShardId extractShardId(DirectoryReader reader) { + final ElasticsearchDirectoryReader esReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader); if (esReader != null) { return esReader.shardId(); } - if (!reader.leaves().isEmpty()) { - return extractShardId(reader.leaves().get(0).reader()); - } - return null; + throw new IllegalArgumentException("can't extract shard ID, can't unwrap ElasticsearchDirectoryReader"); } - private static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) { - if (reader instanceof FilterLeafReader) { - if (reader instanceof ElasticsearchLeafReader) { - return (ElasticsearchLeafReader) reader; - } else { - // We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because - // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately - // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that - // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. - return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate()); - } - } - return null; - } - private static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) { - if (reader instanceof FilterDirectoryReader) { - if (reader instanceof ElasticsearchDirectoryReader) { - return (ElasticsearchDirectoryReader) reader; - } else { - // We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because - // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately - // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that - // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. - return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate()); - } - } - return null; - } } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java index 2a82774a612b3..4f6f238ef6d5d 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -180,6 +181,10 @@ public IndexReader reader() { return searcher.reader(); } + public DirectoryReader getDirectoryReader() { + return searcher.getDirectoryReader(); + } + @Override public String toString() { return "WarmerContext: " + searcher.reader(); diff --git a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java index 5fb70b61160bf..7c42aef478832 100644 --- a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java +++ b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.MemorySizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -248,7 +249,7 @@ public void loadIntoContext(final ShardSearchRequest request, final SearchContex if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { - context.searcher().getIndexReader().addReaderClosedListener(cleanupKey); + ElasticsearchDirectoryReader.addReaderCloseListener(context.searcher().getDirectoryReader(), cleanupKey); } } } else { diff --git a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 59421b4f96b7c..74940cfb5b759 100644 --- a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.fielddata.cache; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentReader; @@ -31,6 +32,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -167,12 +169,12 @@ public > FD load(fina } @Override - public > IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception { + public > IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception { final ShardId shardId = ShardUtils.extractShardId(indexReader); final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId); //noinspection unchecked final Accountable accountable = cache.computeIfAbsent(key, k -> { - indexReader.addReaderClosedListener(IndexFieldCache.this); + ElasticsearchDirectoryReader.addReaderCloseListener(indexReader, IndexFieldCache.this); for (Listener listener : this.listeners) { k.listeners.add(listener); } diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 57cecfceb8f25..e797349466635 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -1016,7 +1016,7 @@ public void run() { try { final long start = System.nanoTime(); IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType); - ifd.loadGlobal(context.reader()); + ifd.loadGlobal(context.getDirectoryReader()); if (indexShard.warmerService().logger().isTraceEnabled()) { indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java b/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java index ab6648d85824e..0464dc8c1d83c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java @@ -18,12 +18,7 @@ */ package org.elasticsearch.search.aggregations.support; -import org.apache.lucene.index.DocValues; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.RandomAccessOrds; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.*; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Scorer; import org.apache.lucene.util.Bits; @@ -146,7 +141,7 @@ public RandomAccessOrds ordinalsValues(LeafReaderContext context) { @Override public RandomAccessOrds globalOrdinalsValues(LeafReaderContext context) { - final IndexOrdinalsFieldData global = indexFieldData.loadGlobal(context.parent.reader()); + final IndexOrdinalsFieldData global = indexFieldData.loadGlobal((DirectoryReader)context.parent.reader()); final AtomicOrdinalsFieldData atomicFieldData = global.load(context); return atomicFieldData.getOrdinalsValues(); } @@ -162,7 +157,7 @@ public ParentChild(ParentChildIndexFieldData indexFieldData) { } public long globalMaxOrd(IndexSearcher indexSearcher, String type) { - IndexReader indexReader = indexSearcher.getIndexReader(); + DirectoryReader indexReader = (DirectoryReader) indexSearcher.getIndexReader(); if (indexReader.leaves().isEmpty()) { return 0; } else { @@ -175,7 +170,7 @@ public long globalMaxOrd(IndexSearcher indexSearcher, String type) { } public SortedDocValues globalOrdinalsValues(String type, LeafReaderContext context) { - final IndexParentChildFieldData global = indexFieldData.loadGlobal(context.parent.reader()); + final IndexParentChildFieldData global = indexFieldData.loadGlobal((DirectoryReader)context.parent.reader()); final AtomicParentChildFieldData atomicFieldData = global.load(context); return atomicFieldData.getOrdinalsValues(type); } diff --git a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index a23df0a00f127..0a9b860edb7c7 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.internal; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.index.TermContext; import org.apache.lucene.search.*; @@ -40,9 +41,12 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private AggregatedDfs aggregatedDfs; + private final Engine.Searcher engineSearcher; + public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) { super(searcher.reader()); in = searcher.searcher(); + engineSearcher = searcher; setSimilarity(searcher.searcher().getSimilarity(true)); setQueryCache(searchContext.getQueryCache()); setQueryCachingPolicy(searchContext.indexShard().getQueryCachingPolicy()); @@ -104,4 +108,8 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio } return collectionStatistics; } + + public DirectoryReader getDirectoryReader() { + return engineSearcher.getDirectoryReader(); + } } diff --git a/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java b/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java index 0307a3806c9a9..3c4a34d952f35 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java @@ -67,10 +67,6 @@ public void testCoreCacheKey() throws Exception { assertEquals(1, ir2.numDocs()); assertEquals(1, ir2.leaves().size()); assertSame(ir.leaves().get(0).reader().getCoreCacheKey(), ir2.leaves().get(0).reader().getCoreCacheKey()); - - // this is kind of stupid, but for now its here - assertNotSame(ir.leaves().get(0).reader().getCombinedCoreAndDeletesKey(), ir2.leaves().get(0).reader().getCombinedCoreAndDeletesKey()); - IOUtils.close(ir, ir2, iw, dir); } } diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index c620649797426..6c2397e092e51 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -80,7 +80,7 @@ public static DirectoryReader reopen(DirectoryReader reader, boolean newReaderEx public void testVersions() throws Exception { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); - DirectoryReader directoryReader = DirectoryReader.open(writer, true); + DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); Document doc = new Document(); @@ -148,7 +148,7 @@ public void testNestedDocuments() throws IOException { docs.add(doc); writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); - DirectoryReader directoryReader = DirectoryReader.open(writer, true); + DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5l)); assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5l)); @@ -174,7 +174,7 @@ public void testBackwardCompatibility() throws IOException { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); - DirectoryReader directoryReader = DirectoryReader.open(writer, true); + DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); Document doc = new Document(); @@ -286,7 +286,7 @@ public void testMergingOldIndices() throws Exception { // Force merge and check versions iw.forceMerge(1, true); - final LeafReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory())); + final LeafReader ir = SlowCompositeReaderWrapper.wrap(ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw.getDirectory()), new ShardId("foo", 1))); final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME); assertThat(versions, notNullValue()); for (int i = 0; i < ir.maxDoc(); ++i) { diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index ff1e885100f6f..dd32b309a0f92 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -54,7 +54,7 @@ public void testOtherServiceBound() { assertInstanceBinding(module, IndexMetaData.class, (x) -> x == meta); } - public static final class Wrapper implements IndexSearcherWrapper { + public static final class Wrapper extends IndexSearcherWrapper { @Override public DirectoryReader wrap(DirectoryReader reader) { diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5f6e1db42b917..2a6150267a50f 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1499,7 +1499,7 @@ protected Term newUid(String id) { @Test public void testExtractShardId() { try (Engine.Searcher test = this.engine.acquireSearcher("test")) { - ShardId shardId = ShardUtils.extractShardId(test.reader()); + ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); assertNotNull(shardId); assertEquals(shardId, engine.config().getShardId()); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 2c6ee40b86eb9..7dadafb8a0bb0 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -855,7 +855,7 @@ public void testFailEngineOnCorruption() { @Test public void testExtractShardId() { try (Engine.Searcher test = replicaEngine.acquireSearcher("test")) { - ShardId shardId = ShardUtils.extractShardId(test.reader()); + ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); assertNotNull(shardId); assertEquals(shardId, replicaEngine.config().getShardId()); } diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java index 94178f959a0ba..9200873e1c8f9 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.*; import org.apache.lucene.search.Filter; import org.apache.lucene.store.RAMDirectory; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; @@ -35,6 +36,7 @@ import org.elasticsearch.index.mapper.MapperBuilders; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.After; @@ -52,7 +54,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase { protected MapperService mapperService; protected IndexWriter writer; protected LeafReaderContext readerContext; - protected IndexReader topLevelReader; + protected DirectoryReader topLevelReader; protected IndicesFieldDataCache indicesFieldDataCache; protected abstract FieldDataType getFieldDataType(); @@ -112,7 +114,7 @@ protected final LeafReaderContext refreshReader() throws Exception { if (readerContext != null) { readerContext.reader().close(); } - topLevelReader = DirectoryReader.open(writer, true); + topLevelReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader); readerContext = reader.getContext(); return readerContext; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java new file mode 100644 index 0000000000000..e8a7a75b1d694 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -0,0 +1,277 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.*; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.similarities.DefaultSimilarity; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class IndexSearcherWrapperTests extends ESTestCase { + private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null); + + public void testReaderCloseListenerIsCalled() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + final AtomicInteger closeCalls = new AtomicInteger(0); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new FieldMaskingReader("field", reader, closeCalls); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + + }; + final int sourceRefCount = open.getRefCount(); + final AtomicInteger count = new AtomicInteger(); + final AtomicInteger outerCount = new AtomicInteger(); + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher); + assertEquals(1, wrap.reader().getRefCount()); + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> { + if (reader == open) { + count.incrementAndGet(); + } + outerCount.incrementAndGet(); + }); + assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits); + wrap.close(); + assertFalse("wrapped reader is closed", wrap.reader().tryIncRef()); + assertEquals(sourceRefCount, open.getRefCount()); + } + assertEquals(1, closeCalls.get()); + + IOUtils.close(open, writer, dir); + assertEquals(1, outerCount.get()); + assertEquals(1, count.get()); + assertEquals(0, open.getRefCount()); + assertEquals(1, closeCalls.get()); + } + + public void testIsCacheable() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + searcher.setSimilarity(iwc.getSimilarity()); + final AtomicInteger closeCalls = new AtomicInteger(0); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new FieldMaskingReader("field", reader, closeCalls); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + }; + final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try (final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher)) { + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> { + cache.remove(reader.getCoreCacheKey()); + }); + TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1); + cache.put(wrap.reader().getCoreCacheKey(), search); + } + } + assertEquals(1, closeCalls.get()); + + assertEquals(1, cache.size()); + IOUtils.close(open, writer, dir); + assertEquals(0, cache.size()); + assertEquals(1, closeCalls.get()); + } + + public void testNoWrap() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + searcher.setSimilarity(iwc.getSimilarity()); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper(); + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher); + assertSame(wrap, engineSearcher); + } + IOUtils.close(open, writer, dir); + } + + public void testWrappedReaderMustDelegateCoreCacheKey() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + searcher.setSimilarity(iwc.getSimilarity()); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + protected DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new BrokenWrapper(reader, false); + } + }; + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try { + wrapper.wrap(ENGINE_CONFIG, engineSearcher); + fail("reader must delegate cache key"); + } catch (IllegalStateException ex) { + // all is well + } + } + wrapper = new IndexSearcherWrapper() { + @Override + protected DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new BrokenWrapper(reader, true); + } + }; + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try { + wrapper.wrap(ENGINE_CONFIG, engineSearcher); + fail("reader must delegate cache key"); + } catch (IllegalStateException ex) { + // all is well + } + } + IOUtils.close(open, writer, dir); + } + + private static class FieldMaskingReader extends FilterDirectoryReader { + private final String field; + private final AtomicInteger closeCalls; + + public FieldMaskingReader(String field, DirectoryReader in, AtomicInteger closeCalls) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new FieldFilterLeafReader(reader, Collections.singleton(field), true); + } + }); + this.closeCalls = closeCalls; + this.field = field; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new FieldMaskingReader(field, in, closeCalls); + } + + @Override + public Object getCoreCacheKey() { + return in.getCoreCacheKey(); + } + + @Override + protected void doClose() throws IOException { + super.doClose(); + closeCalls.incrementAndGet(); + } + } + + private static class BrokenWrapper extends FilterDirectoryReader { + + private final boolean hideDelegate; + + public BrokenWrapper(DirectoryReader in, boolean hideDelegate) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return reader; + } + }); + this.hideDelegate = hideDelegate; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new BrokenWrapper(in, hideDelegate); + } + + @Override + public DirectoryReader getDelegate() { + if (hideDelegate) { + try { + return ElasticsearchDirectoryReader.wrap(super.getDelegate(), new ShardId("foo", 1)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return super.getDelegate(); + } + + @Override + public Object getCoreCacheKey() { + if (hideDelegate == false) { + return super.getCoreCacheKey(); + } else { + return in.getCoreCacheKey(); + } + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b891219d63688..f7b14192b4936 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -27,6 +27,7 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -62,13 +63,12 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.fielddata.FieldDataStats; +import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.indexing.IndexingOperationListener; import org.elasticsearch.index.indexing.ShardIndexingService; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.snapshots.IndexShardRepository; @@ -88,6 +88,7 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.BrokenBarrierException; @@ -897,7 +898,7 @@ public void testSearcherWrapperIsUsed() throws IOException { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexService("test"); IndexShard shard = indexService.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(randomBoolean()).get(); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get(); Engine.GetResult getResult = shard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); @@ -928,26 +929,48 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider); - - ShardRoutingHelper.reinit(routing); - newShard.updateRoutingEntry(routing, false); - DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); - assertTrue(newShard.recoverFromStore(routing, localNode)); - routing = new ShardRouting(routing); - ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); - try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { - TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); - assertEquals(search.totalHits, 0); - search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); - assertEquals(search.totalHits, 1); + try { + ShardRoutingHelper.reinit(routing); + newShard.updateRoutingEntry(routing, false); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + assertTrue(newShard.recoverFromStore(routing, localNode)); + routing = new ShardRouting(routing); + ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); + assertEquals(search.totalHits, 0); + search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); + assertEquals(search.totalHits, 1); + } + getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); + assertTrue(getResult.exists()); + assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader + assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); + getResult.release(); + + // test global ordinals are evicted + MappedFieldType foo = newShard.mapperService().indexName("foo"); + IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo); + FieldDataStats before = shard.fieldData().stats("foo"); + FieldDataStats after = null; + try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { + assumeTrue("we have to have more than one segment", searcher.getDirectoryReader().leaves().size() > 1); + IndexFieldData indexFieldData = ifd.loadGlobal(searcher.getDirectoryReader()); + after = shard.fieldData().stats("foo"); + assertEquals(after.getEvictions(), before.getEvictions()); + assertTrue(indexFieldData.toString(), after.getMemorySizeInBytes() > before.getMemorySizeInBytes()); + } + assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); + assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), after.getMemorySizeInBytes()); + newShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + newShard.refresh("test"); + assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes()); + assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); + } finally { + newShard.close("just do it", randomBoolean()); } - getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); - assertTrue(getResult.exists()); - assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader - assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); - getResult.release(); - newShard.close("just do it", randomBoolean()); + } private static class FieldMaskingReader extends FilterDirectoryReader { @@ -958,17 +981,7 @@ public FieldMaskingReader(String field, DirectoryReader in) throws IOException { private final String filteredField = field; @Override public LeafReader wrap(LeafReader reader) { - return new FilterLeafReader(reader) { - @Override - public Fields fields() throws IOException { - return new FilterFields(super.fields()) { - @Override - public Terms terms(String field) throws IOException { - return filteredField.equals(field) ? null : super.terms(field); - } - }; - } - }; + return new FieldFilterLeafReader(reader, Collections.singleton(field), true); } }); this.field = field; @@ -979,5 +992,10 @@ public Terms terms(String field) throws IOException { protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { return new FieldMaskingReader(field, in); } + + @Override + public Object getCoreCacheKey() { + return in.getCoreCacheKey(); + } } } diff --git a/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java b/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java index fec406a78411e..0187d4ac0356b 100644 --- a/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java +++ b/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java @@ -19,6 +19,7 @@ package org.elasticsearch.test.engine; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.common.logging.ESLogger; @@ -33,7 +34,6 @@ class AssertingSearcher extends Engine.Searcher { private final Engine.Searcher wrappedSearcher; private final ShardId shardId; - private final IndexSearcher indexSearcher; private RuntimeException firstReleaseStack; private final Object lock = new Object(); private final int initialRefCount; @@ -50,7 +50,6 @@ class AssertingSearcher extends Engine.Searcher { this.logger = logger; this.shardId = shardId; initialRefCount = wrappedSearcher.reader().getRefCount(); - this.indexSearcher = indexSearcher; assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; } @@ -82,16 +81,6 @@ public void close() { } } - @Override - public IndexReader reader() { - return indexSearcher.getIndexReader(); - } - - @Override - public IndexSearcher searcher() { - return indexSearcher; - } - public ShardId shardId() { return shardId; } diff --git a/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java b/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java index 3649a7b108038..ab570afdd9db2 100644 --- a/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java +++ b/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java @@ -169,11 +169,6 @@ public Object getCoreCacheKey() { return in.getCoreCacheKey(); } - @Override - public Object getCombinedCoreAndDeletesKey() { - return in.getCombinedCoreAndDeletesKey(); - } - } public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) { diff --git a/dev-tools/src/main/resources/forbidden/all-signatures.txt b/dev-tools/src/main/resources/forbidden/all-signatures.txt index f9fba0ab3f793..447e994f50716 100644 --- a/dev-tools/src/main/resources/forbidden/all-signatures.txt +++ b/dev-tools/src/main/resources/forbidden/all-signatures.txt @@ -90,3 +90,6 @@ java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which av @defaultMessage Do not violate java's access system java.lang.reflect.AccessibleObject#setAccessible(boolean) java.lang.reflect.AccessibleObject#setAccessible(java.lang.reflect.AccessibleObject[], boolean) + +@defaultMessage this should not have been added to lucene in the first place +org.apache.lucene.index.IndexReader#getCombinedCoreAndDeletesKey() \ No newline at end of file diff --git a/dev-tools/src/main/resources/forbidden/core-signatures.txt b/dev-tools/src/main/resources/forbidden/core-signatures.txt index 3a925e64d3cb0..08c548f1dcc98 100644 --- a/dev-tools/src/main/resources/forbidden/core-signatures.txt +++ b/dev-tools/src/main/resources/forbidden/core-signatures.txt @@ -36,6 +36,10 @@ org.apache.lucene.index.IndexReader#decRef() org.apache.lucene.index.IndexReader#incRef() org.apache.lucene.index.IndexReader#tryIncRef() +@defaultMessage Close listeners can only installed via ElasticsearchDirectoryReader#addReaderCloseListener +org.apache.lucene.index.IndexReader#addReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener) +org.apache.lucene.index.IndexReader#removeReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener) + @defaultMessage Pass the precision step from the mappings explicitly instead org.apache.lucene.search.NumericRangeQuery#newDoubleRange(java.lang.String,java.lang.Double,java.lang.Double,boolean,boolean) org.apache.lucene.search.NumericRangeQuery#newFloatRange(java.lang.String,java.lang.Float,java.lang.Float,boolean,boolean)