From d3436ff59243f2a100583a110df4f1553d418748 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 12 Oct 2015 16:47:12 +0200 Subject: [PATCH 01/11] Streamline top level reader close listeners and forbid general usage IndexReader#addReaderCloseListener is very error prone when it comes to caching and reader wrapping. The listeners are not delegated to the sub readers nor can it's implementation change since it's final in the base class. This commit only allows installing close listeners on the top level ElasticsearchDirecotryReader which is known to work an has a defined lifetime which corresponds to its subreader. This ensure that cachesa re cleared once the reader goes out of scope. --- .../index/ElasticsearchDirectoryReader.java | 40 +++- .../lucene/index/ElasticsearchLeafReader.java | 15 ++ .../index/fielddata/IndexFieldDataCache.java | 1 - .../index/shard/IndexSearcherWrapper.java | 65 ++++++- .../elasticsearch/index/shard/ShardUtils.java | 32 +--- .../cache/request/IndicesRequestCache.java | 3 +- .../cache/IndicesFieldDataCache.java | 3 +- .../common/lucene/uid/VersionsTests.java | 8 +- .../elasticsearch/index/IndexModuleTests.java | 2 +- .../fielddata/AbstractFieldDataTestCase.java | 4 +- .../shard/IndexSearcherWrapperTests.java | 172 ++++++++++++++++++ .../index/shard/IndexShardTests.java | 3 +- .../resources/forbidden/all-signatures.txt | 1 + .../resources/forbidden/core-signatures.txt | 4 + 14 files changed, 300 insertions(+), 53 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java index e566f510f4dba..8c016062681fe 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java @@ -18,10 +18,8 @@ */ package org.elasticsearch.common.lucene.index; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterDirectoryReader; -import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.*; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -76,4 +74,38 @@ public LeafReader wrap(LeafReader reader) { } } + @SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener") + public static void addReaderCloseListener(IndexReader reader, IndexReader.ReaderClosedListener listener) { + ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader); + if (elasticsearchDirectoryReader == null && reader instanceof LeafReader) { + ElasticsearchLeafReader leafReader = ElasticsearchLeafReader.getElasticsearchLeafReader((LeafReader) reader); + if (leafReader != null) { + assert reader.getCoreCacheKey() == leafReader.getCoreCacheKey(); + leafReader.addReaderClosedListener(listener); + return; + } + } else { + assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey(); + elasticsearchDirectoryReader.addReaderClosedListener(listener); + return; + } + throw new IllegalStateException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader"); + } + + public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) { + if (reader instanceof FilterDirectoryReader) { + if (reader instanceof ElasticsearchDirectoryReader) { + return (ElasticsearchDirectoryReader) reader; + } else { + // We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because + // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately + // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that + // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. + return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate()); + } + } + return null; + } + + } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java index 60a956b1f338f..e666b91d82494 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java @@ -59,4 +59,19 @@ public Object getCoreCacheKey() { public Object getCombinedCoreAndDeletesKey() { return in.getCombinedCoreAndDeletesKey(); } + + public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) { + if (reader instanceof FilterLeafReader) { + if (reader instanceof ElasticsearchLeafReader) { + return (ElasticsearchLeafReader) reader; + } else { + // We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because + // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately + // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that + // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. + return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate()); + } + } + return null; + } } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java index 6ea49650a851d..a2b0467731c69 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java @@ -22,7 +22,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; -import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.shard.ShardId; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index c75f3c7995f0a..914c5398dbf98 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -20,11 +20,14 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineException; import java.io.IOException; @@ -32,14 +35,16 @@ * Extension point to add custom functionality at request time to the {@link DirectoryReader} * and {@link IndexSearcher} managed by the {@link Engine}. */ -public interface IndexSearcherWrapper { +public class IndexSearcherWrapper { /** * @param reader The provided directory reader to be wrapped to add custom functionality * @return a new directory reader wrapping the provided directory reader or if no wrapping was performed * the provided directory reader */ - DirectoryReader wrap(DirectoryReader reader) throws IOException; + protected DirectoryReader wrap(DirectoryReader reader) throws IOException { + return reader; + } /** * @param engineConfig The engine config which can be used to get the query cache and query cache policy from @@ -48,17 +53,22 @@ public interface IndexSearcherWrapper { * @return a new index searcher wrapping the provided index searcher or if no wrapping was performed * the provided index searcher */ - IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException; - + protected IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException { + return searcher; + } /** * If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher * gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned. * * This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search) */ - default Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException { - DirectoryReader reader = wrap((DirectoryReader) engineSearcher.reader()); - IndexSearcher innerIndexSearcher = new IndexSearcher(reader); + public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException { + final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.reader()); + if (elasticsearchDirectoryReader == null) { + throw new IllegalStateException("Can't wrap non elasticsearch directory reader"); + } + DirectoryReader reader = wrap((DirectoryReader)engineSearcher.reader()); + IndexSearcher innerIndexSearcher = new IndexSearcher(new CacheFriendlyReaderWrapper(reader, elasticsearchDirectoryReader)); innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); @@ -72,10 +82,47 @@ default Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSe return new Engine.Searcher(engineSearcher.source(), indexSearcher) { @Override public void close() throws ElasticsearchException { - engineSearcher.close(); + try { + reader().close(); + } catch (IOException e) { + throw new ElasticsearchException("failed to close reader", e); + } finally { + engineSearcher.close(); + } + } }; } } + final class CacheFriendlyReaderWrapper extends FilterDirectoryReader { + private final ElasticsearchDirectoryReader elasticsearchReader; + + private CacheFriendlyReaderWrapper(DirectoryReader in, ElasticsearchDirectoryReader elasticsearchReader) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return reader; + } + }); + this.elasticsearchReader = elasticsearchReader; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new CacheFriendlyReaderWrapper(in, elasticsearchReader); + } + + @Override + protected void doClose() throws IOException { + // don't close here - mimic the MultiReader#doClose = false behavior that FilterDirectoryReader doesn't have + } + + @Override + public Object getCoreCacheKey() { + // this is important = we always use the ES reader core cache key on top level + return elasticsearchReader.getCoreCacheKey(); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java index f0f871952fa42..84b5f69ab1db1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java @@ -34,7 +34,7 @@ private ShardUtils() {} */ @Nullable public static ShardId extractShardId(LeafReader reader) { - final ElasticsearchLeafReader esReader = getElasticsearchLeafReader(reader); + final ElasticsearchLeafReader esReader = ElasticsearchLeafReader.getElasticsearchLeafReader(reader); if (esReader != null) { assert reader.getRefCount() > 0 : "ElasticsearchLeafReader is already closed"; return esReader.shardId(); @@ -48,7 +48,7 @@ public static ShardId extractShardId(LeafReader reader) { */ @Nullable public static ShardId extractShardId(IndexReader reader) { - final ElasticsearchDirectoryReader esReader = getElasticsearchDirectoryReader(reader); + final ElasticsearchDirectoryReader esReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader); if (esReader != null) { return esReader.shardId(); } @@ -58,34 +58,6 @@ public static ShardId extractShardId(IndexReader reader) { return null; } - private static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) { - if (reader instanceof FilterLeafReader) { - if (reader instanceof ElasticsearchLeafReader) { - return (ElasticsearchLeafReader) reader; - } else { - // We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because - // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately - // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that - // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. - return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate()); - } - } - return null; - } - private static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) { - if (reader instanceof FilterDirectoryReader) { - if (reader instanceof ElasticsearchDirectoryReader) { - return (ElasticsearchDirectoryReader) reader; - } else { - // We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because - // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately - // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that - // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. - return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate()); - } - } - return null; - } } diff --git a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java index 5fb70b61160bf..cc242a8a174ca 100644 --- a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java +++ b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.MemorySizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -248,7 +249,7 @@ public void loadIntoContext(final ShardSearchRequest request, final SearchContex if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { - context.searcher().getIndexReader().addReaderClosedListener(cleanupKey); + ElasticsearchDirectoryReader.addReaderCloseListener(context.searcher().getIndexReader(), cleanupKey); } } } else { diff --git a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 59421b4f96b7c..50b00820f79ba 100644 --- a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -172,7 +173,7 @@ public > IFD l final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId); //noinspection unchecked final Accountable accountable = cache.computeIfAbsent(key, k -> { - indexReader.addReaderClosedListener(IndexFieldCache.this); + ElasticsearchDirectoryReader.addReaderCloseListener(indexReader, IndexFieldCache.this); for (Listener listener : this.listeners) { k.listeners.add(listener); } diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index c620649797426..6c2397e092e51 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -80,7 +80,7 @@ public static DirectoryReader reopen(DirectoryReader reader, boolean newReaderEx public void testVersions() throws Exception { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); - DirectoryReader directoryReader = DirectoryReader.open(writer, true); + DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); Document doc = new Document(); @@ -148,7 +148,7 @@ public void testNestedDocuments() throws IOException { docs.add(doc); writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); - DirectoryReader directoryReader = DirectoryReader.open(writer, true); + DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5l)); assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5l)); @@ -174,7 +174,7 @@ public void testBackwardCompatibility() throws IOException { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); - DirectoryReader directoryReader = DirectoryReader.open(writer, true); + DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); Document doc = new Document(); @@ -286,7 +286,7 @@ public void testMergingOldIndices() throws Exception { // Force merge and check versions iw.forceMerge(1, true); - final LeafReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory())); + final LeafReader ir = SlowCompositeReaderWrapper.wrap(ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw.getDirectory()), new ShardId("foo", 1))); final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME); assertThat(versions, notNullValue()); for (int i = 0; i < ir.maxDoc(); ++i) { diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index ff1e885100f6f..dd32b309a0f92 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -54,7 +54,7 @@ public void testOtherServiceBound() { assertInstanceBinding(module, IndexMetaData.class, (x) -> x == meta); } - public static final class Wrapper implements IndexSearcherWrapper { + public static final class Wrapper extends IndexSearcherWrapper { @Override public DirectoryReader wrap(DirectoryReader reader) { diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java index 94178f959a0ba..bdeacc5768317 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.*; import org.apache.lucene.search.Filter; import org.apache.lucene.store.RAMDirectory; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; @@ -35,6 +36,7 @@ import org.elasticsearch.index.mapper.MapperBuilders; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.After; @@ -112,7 +114,7 @@ protected final LeafReaderContext refreshReader() throws Exception { if (readerContext != null) { readerContext.reader().close(); } - topLevelReader = DirectoryReader.open(writer, true); + topLevelReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader); readerContext = reader.getContext(); return readerContext; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java new file mode 100644 index 0000000000000..cdc696fea4f4a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -0,0 +1,172 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.*; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.similarities.DefaultSimilarity; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class IndexSearcherWrapperTests extends ESTestCase { + private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null); + + public void testReaderCloseListenerIsCalled() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new FieldMaskingReader("field", reader); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + }; + final int sourceRefCount = open.getRefCount(); + final AtomicInteger count = new AtomicInteger(); + final AtomicInteger outerCount = new AtomicInteger(); + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher); + assertEquals(1, wrap.reader().getRefCount()); + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.reader(), reader -> { + if (reader == open) { + count.incrementAndGet(); + } + outerCount.incrementAndGet(); + }); + assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits); + wrap.close(); + assertFalse("wrapped reader is closed", wrap.reader().tryIncRef()); + assertEquals(sourceRefCount, open.getRefCount()); + } + + IOUtils.close(open, writer, dir); + assertEquals(1, outerCount.get()); + assertEquals(1, count.get()); + assertEquals(0, open.getRefCount()); + } + + public void testIsCacheable() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + searcher.setSimilarity(iwc.getSimilarity()); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new FieldMaskingReader("field", reader); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + }; + final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try (final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher)) { + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.reader(), reader -> { + cache.remove(reader.getCoreCacheKey()); + }); + TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1); + cache.put(wrap.reader().getCoreCacheKey(), search); + } + } + + assertEquals(1, cache.size()); + IOUtils.close(open, writer, dir); + assertEquals(0, cache.size()); + } + + public void testNoWrap() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + searcher.setSimilarity(iwc.getSimilarity()); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper(); + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher); + assertSame(wrap, engineSearcher); + } + IOUtils.close(open, writer, dir); + } + + private static class FieldMaskingReader extends FilterDirectoryReader { + private final String field; + public FieldMaskingReader(String field, DirectoryReader in) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new FieldFilterLeafReader(reader, Collections.singleton(field), true); + } + }); + this.field = field; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new FieldMaskingReader(field, in); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b891219d63688..29240b8d08f1a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -945,7 +945,8 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); assertTrue(getResult.exists()); assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader - assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); + assertTrue(getResult.searcher().reader() instanceof FilterDirectoryReader); + assertTrue(((FilterDirectoryReader)getResult.searcher().reader()).getDelegate() instanceof FieldMaskingReader); getResult.release(); newShard.close("just do it", randomBoolean()); } diff --git a/dev-tools/src/main/resources/forbidden/all-signatures.txt b/dev-tools/src/main/resources/forbidden/all-signatures.txt index f9fba0ab3f793..3023416d54936 100644 --- a/dev-tools/src/main/resources/forbidden/all-signatures.txt +++ b/dev-tools/src/main/resources/forbidden/all-signatures.txt @@ -90,3 +90,4 @@ java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which av @defaultMessage Do not violate java's access system java.lang.reflect.AccessibleObject#setAccessible(boolean) java.lang.reflect.AccessibleObject#setAccessible(java.lang.reflect.AccessibleObject[], boolean) + diff --git a/dev-tools/src/main/resources/forbidden/core-signatures.txt b/dev-tools/src/main/resources/forbidden/core-signatures.txt index 3a925e64d3cb0..08c548f1dcc98 100644 --- a/dev-tools/src/main/resources/forbidden/core-signatures.txt +++ b/dev-tools/src/main/resources/forbidden/core-signatures.txt @@ -36,6 +36,10 @@ org.apache.lucene.index.IndexReader#decRef() org.apache.lucene.index.IndexReader#incRef() org.apache.lucene.index.IndexReader#tryIncRef() +@defaultMessage Close listeners can only installed via ElasticsearchDirectoryReader#addReaderCloseListener +org.apache.lucene.index.IndexReader#addReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener) +org.apache.lucene.index.IndexReader#removeReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener) + @defaultMessage Pass the precision step from the mappings explicitly instead org.apache.lucene.search.NumericRangeQuery#newDoubleRange(java.lang.String,java.lang.Double,java.lang.Double,boolean,boolean) org.apache.lucene.search.NumericRangeQuery#newFloatRange(java.lang.String,java.lang.Float,java.lang.Float,boolean,boolean) From bd5ac9ce1a80cd1468ec800e5986641ed372974d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 14:56:16 +0200 Subject: [PATCH 02/11] enforce DirectoryReader when globals are loaded --- .../lucene/index/ElasticsearchDirectoryReader.java | 13 +++---------- .../elasticsearch/common/lucene/uid/Versions.java | 1 + .../java/org/elasticsearch/index/engine/Engine.java | 6 ++++-- .../index/fielddata/IndexFieldData.java | 5 +++-- .../index/fielddata/IndexFieldDataCache.java | 5 +++-- .../index/fielddata/IndexOrdinalsFieldData.java | 5 +++-- .../index/fielddata/IndexParentChildFieldData.java | 5 +++-- .../ordinals/GlobalOrdinalsIndexFieldData.java | 5 +++-- .../plain/AbstractIndexOrdinalsFieldData.java | 4 ++-- .../index/fielddata/plain/IndexIndexFieldData.java | 10 +++------- .../fielddata/plain/ParentChildIndexFieldData.java | 8 ++++---- .../plain/SortedSetDVOrdinalsIndexFieldData.java | 5 +++-- .../index/query/HasChildQueryBuilder.java | 3 ++- .../org/elasticsearch/index/shard/ShardUtils.java | 7 ++----- .../org/elasticsearch/indices/IndicesWarmer.java | 3 ++- .../indices/cache/request/IndicesRequestCache.java | 2 +- .../fielddata/cache/IndicesFieldDataCache.java | 3 ++- .../search/aggregations/support/ValuesSource.java | 13 ++++--------- .../search/internal/ContextIndexSearcher.java | 8 ++++++++ .../index/fielddata/AbstractFieldDataTestCase.java | 2 +- .../test/engine/AssertingSearcher.java | 13 +------------ 21 files changed, 58 insertions(+), 68 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java index 8c016062681fe..c6e78aab20413 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java @@ -75,16 +75,9 @@ public LeafReader wrap(LeafReader reader) { } @SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener") - public static void addReaderCloseListener(IndexReader reader, IndexReader.ReaderClosedListener listener) { + public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) { ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader); - if (elasticsearchDirectoryReader == null && reader instanceof LeafReader) { - ElasticsearchLeafReader leafReader = ElasticsearchLeafReader.getElasticsearchLeafReader((LeafReader) reader); - if (leafReader != null) { - assert reader.getCoreCacheKey() == leafReader.getCoreCacheKey(); - leafReader.addReaderClosedListener(listener); - return; - } - } else { + if (elasticsearchDirectoryReader != null) { assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey(); elasticsearchDirectoryReader.addReaderClosedListener(listener); return; @@ -92,7 +85,7 @@ public static void addReaderCloseListener(IndexReader reader, IndexReader.Reader throw new IllegalStateException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader"); } - public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) { + public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) { if (reader instanceof FilterDirectoryReader) { if (reader instanceof ElasticsearchDirectoryReader) { return (ElasticsearchDirectoryReader) reader; diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java index a0cf923c5f10e..a7993384267dc 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.lucene.uid; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReader.CoreClosedListener; diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index c07be06448991..90218431d0dc0 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -579,10 +579,12 @@ public static class Searcher implements Releasable { private final String source; private final IndexSearcher searcher; + private final DirectoryReader reader; public Searcher(String source, IndexSearcher searcher) { this.source = source; this.searcher = searcher; + this.reader = (DirectoryReader) searcher.getIndexReader(); } /** @@ -592,8 +594,8 @@ public String source() { return source; } - public IndexReader reader() { - return searcher.getIndexReader(); + public DirectoryReader reader() { + return reader; } public IndexSearcher searcher() { diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java index 0d88ab7d3fe47..75218016d7c58 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.fielddata; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSet; @@ -237,9 +238,9 @@ IndexFieldData build(Index index, @IndexSettings Settings indexSettings, Mapp public static interface Global extends IndexFieldData { - IndexFieldData loadGlobal(IndexReader indexReader); + IndexFieldData loadGlobal(DirectoryReader indexReader); - IndexFieldData localGlobalDirect(IndexReader indexReader) throws Exception; + IndexFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception; } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java index a2b0467731c69..dc0db3032393a 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.fielddata; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; @@ -32,7 +33,7 @@ public interface IndexFieldDataCache { > FD load(LeafReaderContext context, IFD indexFieldData) throws Exception; - > IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception; + > IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception; /** * Clears all the field data stored cached in on this index. @@ -66,7 +67,7 @@ public > FD load(Leaf @Override @SuppressWarnings("unchecked") - public > IFD load(IndexReader indexReader, IFD indexFieldData) throws Exception { + public > IFD load(DirectoryReader indexReader, IFD indexFieldData) throws Exception { return (IFD) indexFieldData.localGlobalDirect(indexReader); } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java index dd4e714dadac3..cb1471179c2d7 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexOrdinalsFieldData.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.fielddata; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -33,12 +34,12 @@ public interface IndexOrdinalsFieldData extends IndexFieldData.Global build(Index index, @IndexSettings Settings indexSetting } @Override - public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { + public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) { if (indexReader.leaves().size() <= 1) { // ordinals are already global return this; @@ -170,7 +170,7 @@ public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldDat } @Override - public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception { + public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception { final long startTime = System.nanoTime(); long ramBytesUsed = 0; @@ -347,7 +347,7 @@ public Collection getChildResources() { } @Override - public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { + public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) { if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) { return this; } @@ -355,7 +355,7 @@ public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { } @Override - public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception { + public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception { return loadGlobal(indexReader); } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java index 9d29b3b1a8a2c..a9f324b400f1e 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.fielddata.plain; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.IndexReader; import org.elasticsearch.ElasticsearchException; @@ -61,7 +62,7 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context) throws Exce } @Override - public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) { + public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) { if (indexReader.leaves().size() <= 1) { // ordinals are already global return this; @@ -78,7 +79,7 @@ public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) { } @Override - public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception { + public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception { return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger); } } diff --git a/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java b/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java index 1b42dcd85d225..692f6456a2d99 100644 --- a/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java +++ b/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.query; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.MultiDocValues; import org.apache.lucene.search.IndexSearcher; @@ -290,7 +291,7 @@ public Query rewrite(IndexReader reader) throws IOException { String joinField = ParentFieldMapper.joinField(parentType); IndexSearcher indexSearcher = new IndexSearcher(reader); indexSearcher.setQueryCache(null); - IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal(indexSearcher.getIndexReader()); + IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal((DirectoryReader) reader); MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType); return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java index 84b5f69ab1db1..8860bd4274c05 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java @@ -47,15 +47,12 @@ public static ShardId extractShardId(LeafReader reader) { * will return null. */ @Nullable - public static ShardId extractShardId(IndexReader reader) { + public static ShardId extractShardId(DirectoryReader reader) { final ElasticsearchDirectoryReader esReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader); if (esReader != null) { return esReader.shardId(); } - if (!reader.leaves().isEmpty()) { - return extractShardId(reader.leaves().get(0).reader()); - } - return null; + throw new IllegalArgumentException("can't extract shard ID, can't unwrap ElasticsearchDirectoryReader"); } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java index 2a82774a612b3..6d8d407e9bc8e 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -176,7 +177,7 @@ public Engine.Searcher searcher() { return searcher; } - public IndexReader reader() { + public DirectoryReader reader() { return searcher.reader(); } diff --git a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java index cc242a8a174ca..7c42aef478832 100644 --- a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java +++ b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java @@ -249,7 +249,7 @@ public void loadIntoContext(final ShardSearchRequest request, final SearchContex if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { - ElasticsearchDirectoryReader.addReaderCloseListener(context.searcher().getIndexReader(), cleanupKey); + ElasticsearchDirectoryReader.addReaderCloseListener(context.searcher().getDirectoryReader(), cleanupKey); } } } else { diff --git a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 50b00820f79ba..74940cfb5b759 100644 --- a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.fielddata.cache; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentReader; @@ -168,7 +169,7 @@ public > FD load(fina } @Override - public > IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception { + public > IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception { final ShardId shardId = ShardUtils.extractShardId(indexReader); final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId); //noinspection unchecked diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java b/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java index ab6648d85824e..0464dc8c1d83c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java @@ -18,12 +18,7 @@ */ package org.elasticsearch.search.aggregations.support; -import org.apache.lucene.index.DocValues; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.RandomAccessOrds; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.*; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Scorer; import org.apache.lucene.util.Bits; @@ -146,7 +141,7 @@ public RandomAccessOrds ordinalsValues(LeafReaderContext context) { @Override public RandomAccessOrds globalOrdinalsValues(LeafReaderContext context) { - final IndexOrdinalsFieldData global = indexFieldData.loadGlobal(context.parent.reader()); + final IndexOrdinalsFieldData global = indexFieldData.loadGlobal((DirectoryReader)context.parent.reader()); final AtomicOrdinalsFieldData atomicFieldData = global.load(context); return atomicFieldData.getOrdinalsValues(); } @@ -162,7 +157,7 @@ public ParentChild(ParentChildIndexFieldData indexFieldData) { } public long globalMaxOrd(IndexSearcher indexSearcher, String type) { - IndexReader indexReader = indexSearcher.getIndexReader(); + DirectoryReader indexReader = (DirectoryReader) indexSearcher.getIndexReader(); if (indexReader.leaves().isEmpty()) { return 0; } else { @@ -175,7 +170,7 @@ public long globalMaxOrd(IndexSearcher indexSearcher, String type) { } public SortedDocValues globalOrdinalsValues(String type, LeafReaderContext context) { - final IndexParentChildFieldData global = indexFieldData.loadGlobal(context.parent.reader()); + final IndexParentChildFieldData global = indexFieldData.loadGlobal((DirectoryReader)context.parent.reader()); final AtomicParentChildFieldData atomicFieldData = global.load(context); return atomicFieldData.getOrdinalsValues(type); } diff --git a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index a23df0a00f127..7a3944ffbfd77 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.internal; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.index.TermContext; import org.apache.lucene.search.*; @@ -40,9 +41,12 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private AggregatedDfs aggregatedDfs; + private final DirectoryReader directoryReader; + public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) { super(searcher.reader()); in = searcher.searcher(); + directoryReader = searcher.reader(); setSimilarity(searcher.searcher().getSimilarity(true)); setQueryCache(searchContext.getQueryCache()); setQueryCachingPolicy(searchContext.indexShard().getQueryCachingPolicy()); @@ -104,4 +108,8 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio } return collectionStatistics; } + + public DirectoryReader getDirectoryReader() { + return directoryReader; + } } diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java index bdeacc5768317..9200873e1c8f9 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java @@ -54,7 +54,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase { protected MapperService mapperService; protected IndexWriter writer; protected LeafReaderContext readerContext; - protected IndexReader topLevelReader; + protected DirectoryReader topLevelReader; protected IndicesFieldDataCache indicesFieldDataCache; protected abstract FieldDataType getFieldDataType(); diff --git a/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java b/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java index fec406a78411e..0187d4ac0356b 100644 --- a/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java +++ b/core/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java @@ -19,6 +19,7 @@ package org.elasticsearch.test.engine; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.common.logging.ESLogger; @@ -33,7 +34,6 @@ class AssertingSearcher extends Engine.Searcher { private final Engine.Searcher wrappedSearcher; private final ShardId shardId; - private final IndexSearcher indexSearcher; private RuntimeException firstReleaseStack; private final Object lock = new Object(); private final int initialRefCount; @@ -50,7 +50,6 @@ class AssertingSearcher extends Engine.Searcher { this.logger = logger; this.shardId = shardId; initialRefCount = wrappedSearcher.reader().getRefCount(); - this.indexSearcher = indexSearcher; assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; } @@ -82,16 +81,6 @@ public void close() { } } - @Override - public IndexReader reader() { - return indexSearcher.getIndexReader(); - } - - @Override - public IndexSearcher searcher() { - return indexSearcher; - } - public ShardId shardId() { return shardId; } From ba8de124295cc6bebcb33dc982c6955a202517bb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 14:57:51 +0200 Subject: [PATCH 03/11] remove unnecessary cast --- .../org/elasticsearch/index/shard/IndexSearcherWrapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index 914c5398dbf98..a0f98b370e079 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -67,7 +67,7 @@ public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher eng if (elasticsearchDirectoryReader == null) { throw new IllegalStateException("Can't wrap non elasticsearch directory reader"); } - DirectoryReader reader = wrap((DirectoryReader)engineSearcher.reader()); + DirectoryReader reader = wrap(engineSearcher.reader()); IndexSearcher innerIndexSearcher = new IndexSearcher(new CacheFriendlyReaderWrapper(reader, elasticsearchDirectoryReader)); innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); From dac179930563f8338b89aabeabb31dcdcc12b9ac Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 15:54:26 +0200 Subject: [PATCH 04/11] Cast DirectoryReader only when really requested --- .../java/org/elasticsearch/index/engine/Engine.java | 13 +++++++++---- .../index/shard/IndexSearcherWrapper.java | 4 ++-- .../org/elasticsearch/indices/IndicesWarmer.java | 6 +++++- .../org/elasticsearch/search/SearchService.java | 2 +- .../search/internal/ContextIndexSearcher.java | 6 +++--- .../index/engine/InternalEngineTests.java | 2 +- .../index/engine/ShadowEngineTests.java | 2 +- .../index/shard/IndexSearcherWrapperTests.java | 4 ++-- 8 files changed, 24 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 90218431d0dc0..69ac57a3b47be 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -579,12 +579,10 @@ public static class Searcher implements Releasable { private final String source; private final IndexSearcher searcher; - private final DirectoryReader reader; public Searcher(String source, IndexSearcher searcher) { this.source = source; this.searcher = searcher; - this.reader = (DirectoryReader) searcher.getIndexReader(); } /** @@ -594,8 +592,15 @@ public String source() { return source; } - public DirectoryReader reader() { - return reader; + public IndexReader reader() { + return searcher.getIndexReader(); + } + + public DirectoryReader getDirectoryReader() { + if (reader() instanceof DirectoryReader) { + return (DirectoryReader) reader(); + } + throw new IllegalStateException("Can't use " + reader().getClass() + " as an directory reader"); } public IndexSearcher searcher() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index a0f98b370e079..41352b5c9fd09 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -63,11 +63,11 @@ protected IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) * This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search) */ public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException { - final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.reader()); + final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.getDirectoryReader()); if (elasticsearchDirectoryReader == null) { throw new IllegalStateException("Can't wrap non elasticsearch directory reader"); } - DirectoryReader reader = wrap(engineSearcher.reader()); + DirectoryReader reader = wrap(engineSearcher.getDirectoryReader()); IndexSearcher innerIndexSearcher = new IndexSearcher(new CacheFriendlyReaderWrapper(reader, elasticsearchDirectoryReader)); innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java index 6d8d407e9bc8e..4f6f238ef6d5d 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java @@ -177,10 +177,14 @@ public Engine.Searcher searcher() { return searcher; } - public DirectoryReader reader() { + public IndexReader reader() { return searcher.reader(); } + public DirectoryReader getDirectoryReader() { + return searcher.getDirectoryReader(); + } + @Override public String toString() { return "WarmerContext: " + searcher.reader(); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 57cecfceb8f25..e797349466635 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -1016,7 +1016,7 @@ public void run() { try { final long start = System.nanoTime(); IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType); - ifd.loadGlobal(context.reader()); + ifd.loadGlobal(context.getDirectoryReader()); if (indexShard.warmerService().logger().isTraceEnabled()) { indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); } diff --git a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 7a3944ffbfd77..0a9b860edb7c7 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -41,12 +41,12 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private AggregatedDfs aggregatedDfs; - private final DirectoryReader directoryReader; + private final Engine.Searcher engineSearcher; public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) { super(searcher.reader()); in = searcher.searcher(); - directoryReader = searcher.reader(); + engineSearcher = searcher; setSimilarity(searcher.searcher().getSimilarity(true)); setQueryCache(searchContext.getQueryCache()); setQueryCachingPolicy(searchContext.indexShard().getQueryCachingPolicy()); @@ -110,6 +110,6 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio } public DirectoryReader getDirectoryReader() { - return directoryReader; + return engineSearcher.getDirectoryReader(); } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5f6e1db42b917..2a6150267a50f 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1499,7 +1499,7 @@ protected Term newUid(String id) { @Test public void testExtractShardId() { try (Engine.Searcher test = this.engine.acquireSearcher("test")) { - ShardId shardId = ShardUtils.extractShardId(test.reader()); + ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); assertNotNull(shardId); assertEquals(shardId, engine.config().getShardId()); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 2c6ee40b86eb9..7dadafb8a0bb0 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -855,7 +855,7 @@ public void testFailEngineOnCorruption() { @Test public void testExtractShardId() { try (Engine.Searcher test = replicaEngine.acquireSearcher("test")) { - ShardId shardId = ShardUtils.extractShardId(test.reader()); + ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); assertNotNull(shardId); assertEquals(shardId, replicaEngine.config().getShardId()); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java index cdc696fea4f4a..3411f15a3594a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -75,7 +75,7 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher); assertEquals(1, wrap.reader().getRefCount()); - ElasticsearchDirectoryReader.addReaderCloseListener(wrap.reader(), reader -> { + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> { if (reader == open) { count.incrementAndGet(); } @@ -119,7 +119,7 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr final ConcurrentHashMap cache = new ConcurrentHashMap<>(); try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { try (final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher)) { - ElasticsearchDirectoryReader.addReaderCloseListener(wrap.reader(), reader -> { + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> { cache.remove(reader.getCoreCacheKey()); }); TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1); From e3f00e302c4a2e623385c9528342d2c97e7e5bbd Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 17:17:17 +0200 Subject: [PATCH 05/11] Add more pickyness to index warming and searcher wrappping this commit also fixes a bug where we wramed a leaf reader in a top level context which caused atomic segment readers to be used in our top level caches. --- .../index/ElasticsearchDirectoryReader.java | 11 ++++++++- .../lucene/index/ElasticsearchLeafReader.java | 7 ++---- .../index/engine/InternalEngine.java | 14 +++++++---- .../index/query/HasChildQueryBuilder.java | 24 ++++++++++++++----- .../index/shard/IndexSearcherWrapper.java | 5 +++- .../shard/IndexSearcherWrapperTests.java | 3 ++- 6 files changed, 46 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java index c6e78aab20413..8cded639eb6dd 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java @@ -74,6 +74,12 @@ public LeafReader wrap(LeafReader reader) { } } + /** + * Adds the given listener to the provided directory reader. The reader must contain an {@link ElasticsearchDirectoryReader} in it's hierarchy + * otherwise we can't safely install the listener. + * + * @throws IllegalArgumentException if the reader doesn't contain an {@link ElasticsearchDirectoryReader} in it's hierarchy + */ @SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener") public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) { ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader); @@ -82,9 +88,12 @@ public static void addReaderCloseListener(DirectoryReader reader, IndexReader.Re elasticsearchDirectoryReader.addReaderClosedListener(listener); return; } - throw new IllegalStateException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader"); + throw new IllegalArgumentException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader"); } + /** + * Tries to unwrap the given reader until the first {@link ElasticsearchDirectoryReader} instance is found or null if no instance is found; + */ public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) { if (reader instanceof FilterDirectoryReader) { if (reader instanceof ElasticsearchDirectoryReader) { diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java index e666b91d82494..68c20689efc84 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java @@ -18,10 +18,7 @@ */ package org.elasticsearch.common.lucene.index; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterDirectoryReader; -import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.*; import org.elasticsearch.index.shard.ShardId; /** @@ -38,7 +35,7 @@ public final class ElasticsearchLeafReader extends FilterLeafReader { * * @param in specified base reader. */ - ElasticsearchLeafReader(LeafReader in, ShardId shardId) { + public ElasticsearchLeafReader(LeafReader in, ShardId shardId) { super(in); this.shardId = shardId; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3973b47f3ac3a..a55fa0ce66050 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.settings.Settings; @@ -905,9 +906,10 @@ private IndexWriter createWriter(boolean create) throws IOException { @Override public void warm(LeafReader reader) throws IOException { try { - assert isMergedSegment(reader); + LeafReader esLeafReader = new ElasticsearchLeafReader(reader, shardId); + assert isMergedSegment(esLeafReader); if (warmer != null) { - final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(reader, null)); + final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(esLeafReader, null)); final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher); warmer.warmNewReaders(context); } @@ -949,6 +951,9 @@ final static class SearchFactory extends EngineSearcherFactory { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { IndexSearcher searcher = super.newSearcher(reader, previousReader); + if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) { + return searcher; + } if (warmer != null) { // we need to pass a custom searcher that does not release anything on Engine.Search Release, // we will release explicitly @@ -986,10 +991,11 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) } if (newSearcher != null) { - IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher)); + IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("new_reader_warming", newSearcher)); warmer.warmNewReaders(context); } - warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher))); + assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass(); + warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("top_reader_warming", searcher))); } catch (Throwable e) { if (isEngineClosed.get() == false) { logger.warn("failed to prepare/warm", e); diff --git a/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java b/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java index 692f6456a2d99..a7b7506ebae3e 100644 --- a/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java +++ b/core/src/main/java/org/elasticsearch/index/query/HasChildQueryBuilder.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.MultiDocValues; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.join.JoinUtil; import org.apache.lucene.search.join.ScoreMode; @@ -288,12 +289,23 @@ public Query rewrite(IndexReader reader) throws IOException { if (getBoost() != 1.0F) { return super.rewrite(reader); } - String joinField = ParentFieldMapper.joinField(parentType); - IndexSearcher indexSearcher = new IndexSearcher(reader); - indexSearcher.setQueryCache(null); - IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal((DirectoryReader) reader); - MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType); - return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren); + if (reader instanceof DirectoryReader) { + String joinField = ParentFieldMapper.joinField(parentType); + IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal((DirectoryReader) reader); + MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType); + return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren); + } else { + if (reader.leaves().isEmpty() && reader.numDocs() == 0) { + // asserting reader passes down a MultiReader during rewrite which makes this + // blow up since for this query to work we have to have a DirectoryReader otherwise + // we can't load global ordinals - for this to work we simply check if the reader has no leaves + // and rewrite to match nothing + return new MatchNoDocsQuery(); + } + throw new IllegalStateException("can't load global ordinals for reader of type: " + reader.getClass() + " must be a DirectoryReader"); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index 41352b5c9fd09..5aab48ece1b35 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -79,7 +79,7 @@ public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher eng if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) { return engineSearcher; } else { - return new Engine.Searcher(engineSearcher.source(), indexSearcher) { + Engine.Searcher newSearcher = new Engine.Searcher(engineSearcher.source(), indexSearcher) { @Override public void close() throws ElasticsearchException { try { @@ -92,6 +92,9 @@ public void close() throws ElasticsearchException { } }; + // TODO should this be a real exception? this checks that our wrapper doesn't wrap in it's own ElasticsearchDirectoryReader + assert ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(newSearcher.getDirectoryReader()) == elasticsearchDirectoryReader : "Wrapper hides actual ElasticsearchDirectoryReader but shouldn't"; + return newSearcher; } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java index 3411f15a3594a..451acb7d184fd 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -73,7 +73,8 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr final AtomicInteger count = new AtomicInteger(); final AtomicInteger outerCount = new AtomicInteger(); try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { - final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher); + // sometimes double wrap.... + final Engine.Searcher wrap = randomBoolean() ? wrapper.wrap(ENGINE_CONFIG, engineSearcher) : wrapper.wrap(ENGINE_CONFIG, wrapper.wrap(ENGINE_CONFIG, engineSearcher)); assertEquals(1, wrap.reader().getRefCount()); ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> { if (reader == open) { From 1dca0e8f9bea675e7d40055ec2b9cbec42b540d9 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 20:51:10 +0200 Subject: [PATCH 06/11] Add simple tests to ensure we can load and unload global ordinal with a wrapped reader --- .../index/fielddata/IndexFieldData.java | 2 +- .../index/shard/IndexShardTests.java | 36 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java index 75218016d7c58..e549eb32dd6a0 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java @@ -236,7 +236,7 @@ IndexFieldData build(Index index, @IndexSettings Settings indexSettings, Mapp CircuitBreakerService breakerService, MapperService mapperService); } - public static interface Global extends IndexFieldData { + interface Global extends IndexFieldData { IndexFieldData loadGlobal(DirectoryReader indexReader); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 29240b8d08f1a..ad8173a6eb88a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -27,6 +27,7 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -62,13 +63,12 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.fielddata.FieldDataStats; +import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.indexing.IndexingOperationListener; import org.elasticsearch.index.indexing.ShardIndexingService; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.snapshots.IndexShardRepository; @@ -897,7 +897,7 @@ public void testSearcherWrapperIsUsed() throws IOException { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexService("test"); IndexShard shard = indexService.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(randomBoolean()).get(); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get(); Engine.GetResult getResult = shard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); @@ -946,9 +946,31 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr assertTrue(getResult.exists()); assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader assertTrue(getResult.searcher().reader() instanceof FilterDirectoryReader); - assertTrue(((FilterDirectoryReader)getResult.searcher().reader()).getDelegate() instanceof FieldMaskingReader); + assertTrue(((FilterDirectoryReader) getResult.searcher().reader()).getDelegate() instanceof FieldMaskingReader); getResult.release(); - newShard.close("just do it", randomBoolean()); + try { + // test global ordinals are evicted + MappedFieldType foo = newShard.mapperService().indexName("foo"); + IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo); + FieldDataStats before = shard.fieldData().stats("foo"); + FieldDataStats after = null; + try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { + assumeTrue("we have to have more than one segment", searcher.getDirectoryReader().leaves().size() > 1); + IndexFieldData indexFieldData = ifd.loadGlobal(searcher.getDirectoryReader()); + after = shard.fieldData().stats("foo"); + assertEquals(after.getEvictions(), before.getEvictions()); + assertTrue(indexFieldData.toString(), after.getMemorySizeInBytes() > before.getMemorySizeInBytes()); + } + assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); + assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), after.getMemorySizeInBytes()); + newShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + newShard.refresh("test"); + assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes()); + assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); + } finally { + newShard.close("just do it", randomBoolean()); + } + } private static class FieldMaskingReader extends FilterDirectoryReader { From 0ead0faa1b4d4e62247db6b6edf76577b2129d2d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 20:51:43 +0200 Subject: [PATCH 07/11] fix typo --- core/src/main/java/org/elasticsearch/index/engine/Engine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 69ac57a3b47be..1431cbd4f9d17 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -600,7 +600,7 @@ public DirectoryReader getDirectoryReader() { if (reader() instanceof DirectoryReader) { return (DirectoryReader) reader(); } - throw new IllegalStateException("Can't use " + reader().getClass() + " as an directory reader"); + throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader"); } public IndexSearcher searcher() { From ec60018e34865041d113e8a24e9c024f3378ae9a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 21:06:20 +0200 Subject: [PATCH 08/11] add comment why and when we have a leaf reader in the warmer --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a55fa0ce66050..75bcdfa552e9d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -952,6 +952,9 @@ final static class SearchFactory extends EngineSearcherFactory { public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { IndexSearcher searcher = super.newSearcher(reader, previousReader); if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) { + // we call newSearcher from the IndexReaderWarmer which warms segments during merging + // in that case the reader is a LeafReader and all we need to do is to build a new Searcher + // and return it since it does it's own warming for that particular reader. return searcher; } if (warmer != null) { From cac073dafac97156165bb23c870816afa7918795 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Oct 2015 23:31:25 +0200 Subject: [PATCH 09/11] enforce that wrappers delegate core cache key and ban getCombinedCoreAndDeletesKey() entirely --- .../index/ElasticsearchDirectoryReader.java | 2 - .../lucene/index/ElasticsearchLeafReader.java | 5 - .../index/shard/IndexSearcherWrapper.java | 36 +++--- .../lucene/index/ESDirectoryReaderTests.java | 4 - .../shard/IndexSearcherWrapperTests.java | 116 +++++++++++++++++- .../index/shard/IndexShardTests.java | 60 +++++---- .../test/engine/MockEngineSupport.java | 5 - .../resources/forbidden/all-signatures.txt | 2 + 8 files changed, 163 insertions(+), 67 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java index 8cded639eb6dd..b394b50683e06 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java @@ -108,6 +108,4 @@ public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(Direc } return null; } - - } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java index 68c20689efc84..aff0fa69f0986 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java @@ -52,11 +52,6 @@ public Object getCoreCacheKey() { return in.getCoreCacheKey(); } - @Override - public Object getCombinedCoreAndDeletesKey() { - return in.getCombinedCoreAndDeletesKey(); - } - public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) { if (reader instanceof FilterLeafReader) { if (reader instanceof ElasticsearchLeafReader) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index 5aab48ece1b35..7772a0dbc3041 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -24,7 +24,6 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; @@ -67,8 +66,20 @@ public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher eng if (elasticsearchDirectoryReader == null) { throw new IllegalStateException("Can't wrap non elasticsearch directory reader"); } - DirectoryReader reader = wrap(engineSearcher.getDirectoryReader()); - IndexSearcher innerIndexSearcher = new IndexSearcher(new CacheFriendlyReaderWrapper(reader, elasticsearchDirectoryReader)); + NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader()); + DirectoryReader reader = wrap(nonClosingReaderWrapper); + if (reader != nonClosingReaderWrapper) { + if (reader.getCoreCacheKey() != elasticsearchDirectoryReader.getCoreCacheKey()) { + throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey, wrappers must override this method and delegate" + + " to the original readers core cache key. Wrapped readers can't used as cache keys since their are used only per request which would lead to subtile bugs"); + } + if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) { + // prevent that somebody wraps with a non-filter reader + throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't"); + } + } + + IndexSearcher innerIndexSearcher = new IndexSearcher(reader); innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); @@ -76,14 +87,16 @@ public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher eng // For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten // This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher); - if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) { + if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) { return engineSearcher; } else { - Engine.Searcher newSearcher = new Engine.Searcher(engineSearcher.source(), indexSearcher) { + final Engine.Searcher newSearcher = new Engine.Searcher(engineSearcher.source(), indexSearcher) { @Override public void close() throws ElasticsearchException { try { reader().close(); + // we close the reader to make sure wrappers can release resources if needed.... + // our NonClosingReaderWrapper makes sure that our reader is not closed } catch (IOException e) { throw new ElasticsearchException("failed to close reader", e); } finally { @@ -92,28 +105,24 @@ public void close() throws ElasticsearchException { } }; - // TODO should this be a real exception? this checks that our wrapper doesn't wrap in it's own ElasticsearchDirectoryReader - assert ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(newSearcher.getDirectoryReader()) == elasticsearchDirectoryReader : "Wrapper hides actual ElasticsearchDirectoryReader but shouldn't"; return newSearcher; } } - final class CacheFriendlyReaderWrapper extends FilterDirectoryReader { - private final ElasticsearchDirectoryReader elasticsearchReader; + final class NonClosingReaderWrapper extends FilterDirectoryReader { - private CacheFriendlyReaderWrapper(DirectoryReader in, ElasticsearchDirectoryReader elasticsearchReader) throws IOException { + private NonClosingReaderWrapper(DirectoryReader in) throws IOException { super(in, new SubReaderWrapper() { @Override public LeafReader wrap(LeafReader reader) { return reader; } }); - this.elasticsearchReader = elasticsearchReader; } @Override protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return new CacheFriendlyReaderWrapper(in, elasticsearchReader); + return new NonClosingReaderWrapper(in); } @Override @@ -123,8 +132,7 @@ protected void doClose() throws IOException { @Override public Object getCoreCacheKey() { - // this is important = we always use the ES reader core cache key on top level - return elasticsearchReader.getCoreCacheKey(); + return in.getCoreCacheKey(); } } diff --git a/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java b/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java index 0307a3806c9a9..3c4a34d952f35 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/index/ESDirectoryReaderTests.java @@ -67,10 +67,6 @@ public void testCoreCacheKey() throws Exception { assertEquals(1, ir2.numDocs()); assertEquals(1, ir2.leaves().size()); assertSame(ir.leaves().get(0).reader().getCoreCacheKey(), ir2.leaves().get(0).reader().getCoreCacheKey()); - - // this is kind of stupid, but for now its here - assertNotSame(ir.leaves().get(0).reader().getCombinedCoreAndDeletesKey(), ir2.leaves().get(0).reader().getCombinedCoreAndDeletesKey()); - IOUtils.close(ir, ir2, iw, dir); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java index 451acb7d184fd..e8a7a75b1d694 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -58,23 +58,24 @@ public void testReaderCloseListenerIsCalled() throws IOException { DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); IndexSearcher searcher = new IndexSearcher(open); assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + final AtomicInteger closeCalls = new AtomicInteger(0); IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { @Override public DirectoryReader wrap(DirectoryReader reader) throws IOException { - return new FieldMaskingReader("field", reader); + return new FieldMaskingReader("field", reader, closeCalls); } @Override public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { return searcher; } + }; final int sourceRefCount = open.getRefCount(); final AtomicInteger count = new AtomicInteger(); final AtomicInteger outerCount = new AtomicInteger(); try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { - // sometimes double wrap.... - final Engine.Searcher wrap = randomBoolean() ? wrapper.wrap(ENGINE_CONFIG, engineSearcher) : wrapper.wrap(ENGINE_CONFIG, wrapper.wrap(ENGINE_CONFIG, engineSearcher)); + final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher); assertEquals(1, wrap.reader().getRefCount()); ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> { if (reader == open) { @@ -87,11 +88,13 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr assertFalse("wrapped reader is closed", wrap.reader().tryIncRef()); assertEquals(sourceRefCount, open.getRefCount()); } + assertEquals(1, closeCalls.get()); IOUtils.close(open, writer, dir); assertEquals(1, outerCount.get()); assertEquals(1, count.get()); assertEquals(0, open.getRefCount()); + assertEquals(1, closeCalls.get()); } public void testIsCacheable() throws IOException { @@ -106,10 +109,11 @@ public void testIsCacheable() throws IOException { IndexSearcher searcher = new IndexSearcher(open); assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); searcher.setSimilarity(iwc.getSimilarity()); + final AtomicInteger closeCalls = new AtomicInteger(0); IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { @Override public DirectoryReader wrap(DirectoryReader reader) throws IOException { - return new FieldMaskingReader("field", reader); + return new FieldMaskingReader("field", reader, closeCalls); } @Override @@ -127,10 +131,12 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr cache.put(wrap.reader().getCoreCacheKey(), search); } } + assertEquals(1, closeCalls.get()); assertEquals(1, cache.size()); IOUtils.close(open, writer, dir); assertEquals(0, cache.size()); + assertEquals(1, closeCalls.get()); } public void testNoWrap() throws IOException { @@ -153,21 +159,119 @@ public void testNoWrap() throws IOException { IOUtils.close(open, writer, dir); } + public void testWrappedReaderMustDelegateCoreCacheKey() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1)); + IndexSearcher searcher = new IndexSearcher(open); + assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits); + searcher.setSimilarity(iwc.getSimilarity()); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + protected DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new BrokenWrapper(reader, false); + } + }; + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try { + wrapper.wrap(ENGINE_CONFIG, engineSearcher); + fail("reader must delegate cache key"); + } catch (IllegalStateException ex) { + // all is well + } + } + wrapper = new IndexSearcherWrapper() { + @Override + protected DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new BrokenWrapper(reader, true); + } + }; + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try { + wrapper.wrap(ENGINE_CONFIG, engineSearcher); + fail("reader must delegate cache key"); + } catch (IllegalStateException ex) { + // all is well + } + } + IOUtils.close(open, writer, dir); + } + private static class FieldMaskingReader extends FilterDirectoryReader { private final String field; - public FieldMaskingReader(String field, DirectoryReader in) throws IOException { + private final AtomicInteger closeCalls; + + public FieldMaskingReader(String field, DirectoryReader in, AtomicInteger closeCalls) throws IOException { super(in, new SubReaderWrapper() { @Override public LeafReader wrap(LeafReader reader) { return new FieldFilterLeafReader(reader, Collections.singleton(field), true); } }); + this.closeCalls = closeCalls; this.field = field; } @Override protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return new FieldMaskingReader(field, in); + return new FieldMaskingReader(field, in, closeCalls); + } + + @Override + public Object getCoreCacheKey() { + return in.getCoreCacheKey(); + } + + @Override + protected void doClose() throws IOException { + super.doClose(); + closeCalls.incrementAndGet(); + } + } + + private static class BrokenWrapper extends FilterDirectoryReader { + + private final boolean hideDelegate; + + public BrokenWrapper(DirectoryReader in, boolean hideDelegate) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return reader; + } + }); + this.hideDelegate = hideDelegate; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new BrokenWrapper(in, hideDelegate); + } + + @Override + public DirectoryReader getDelegate() { + if (hideDelegate) { + try { + return ElasticsearchDirectoryReader.wrap(super.getDelegate(), new ShardId("foo", 1)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return super.getDelegate(); + } + + @Override + public Object getCoreCacheKey() { + if (hideDelegate == false) { + return super.getCoreCacheKey(); + } else { + return in.getCoreCacheKey(); + } } } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ad8173a6eb88a..c7e63893bbe90 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -88,6 +88,7 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.BrokenBarrierException; @@ -928,27 +929,26 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider); - - ShardRoutingHelper.reinit(routing); - newShard.updateRoutingEntry(routing, false); - DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); - assertTrue(newShard.recoverFromStore(routing, localNode)); - routing = new ShardRouting(routing); - ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); - try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { - TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); - assertEquals(search.totalHits, 0); - search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); - assertEquals(search.totalHits, 1); - } - getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); - assertTrue(getResult.exists()); - assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader - assertTrue(getResult.searcher().reader() instanceof FilterDirectoryReader); - assertTrue(((FilterDirectoryReader) getResult.searcher().reader()).getDelegate() instanceof FieldMaskingReader); - getResult.release(); try { + ShardRoutingHelper.reinit(routing); + newShard.updateRoutingEntry(routing, false); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + assertTrue(newShard.recoverFromStore(routing, localNode)); + routing = new ShardRouting(routing); + ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); + assertEquals(search.totalHits, 0); + search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); + assertEquals(search.totalHits, 1); + } + getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); + assertTrue(getResult.exists()); + assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader + assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); + getResult.release(); + // test global ordinals are evicted MappedFieldType foo = newShard.mapperService().indexName("foo"); IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo); @@ -967,6 +967,9 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr newShard.refresh("test"); assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes()); assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); + } catch (Throwable t) { + t.printStackTrace(); + throw t; } finally { newShard.close("just do it", randomBoolean()); } @@ -981,17 +984,7 @@ public FieldMaskingReader(String field, DirectoryReader in) throws IOException { private final String filteredField = field; @Override public LeafReader wrap(LeafReader reader) { - return new FilterLeafReader(reader) { - @Override - public Fields fields() throws IOException { - return new FilterFields(super.fields()) { - @Override - public Terms terms(String field) throws IOException { - return filteredField.equals(field) ? null : super.terms(field); - } - }; - } - }; + return new FieldFilterLeafReader(reader, Collections.singleton(field), true); } }); this.field = field; @@ -1002,5 +995,10 @@ public Terms terms(String field) throws IOException { protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { return new FieldMaskingReader(field, in); } + + @Override + public Object getCoreCacheKey() { + return in.getCoreCacheKey(); + } } } diff --git a/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java b/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java index 3649a7b108038..ab570afdd9db2 100644 --- a/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java +++ b/core/src/test/java/org/elasticsearch/test/engine/MockEngineSupport.java @@ -169,11 +169,6 @@ public Object getCoreCacheKey() { return in.getCoreCacheKey(); } - @Override - public Object getCombinedCoreAndDeletesKey() { - return in.getCombinedCoreAndDeletesKey(); - } - } public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) { diff --git a/dev-tools/src/main/resources/forbidden/all-signatures.txt b/dev-tools/src/main/resources/forbidden/all-signatures.txt index 3023416d54936..447e994f50716 100644 --- a/dev-tools/src/main/resources/forbidden/all-signatures.txt +++ b/dev-tools/src/main/resources/forbidden/all-signatures.txt @@ -91,3 +91,5 @@ java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which av java.lang.reflect.AccessibleObject#setAccessible(boolean) java.lang.reflect.AccessibleObject#setAccessible(java.lang.reflect.AccessibleObject[], boolean) +@defaultMessage this should not have been added to lucene in the first place +org.apache.lucene.index.IndexReader#getCombinedCoreAndDeletesKey() \ No newline at end of file From 7999027bf5f6fb1eaf86c85ce97a9fb7f82daa8b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 14 Oct 2015 09:19:02 +0200 Subject: [PATCH 10/11] apply review comments --- .../org/elasticsearch/index/shard/IndexSearcherWrapper.java | 4 ++-- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index 7772a0dbc3041..0e7c4772b138f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -71,7 +71,7 @@ public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher eng if (reader != nonClosingReaderWrapper) { if (reader.getCoreCacheKey() != elasticsearchDirectoryReader.getCoreCacheKey()) { throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey, wrappers must override this method and delegate" + - " to the original readers core cache key. Wrapped readers can't used as cache keys since their are used only per request which would lead to subtile bugs"); + " to the original readers core cache key. Wrapped readers can't be used as cache keys since their are used only per request which would lead to subtle bugs"); } if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) { // prevent that somebody wraps with a non-filter reader @@ -109,7 +109,7 @@ public void close() throws ElasticsearchException { } } - final class NonClosingReaderWrapper extends FilterDirectoryReader { + private static final class NonClosingReaderWrapper extends FilterDirectoryReader { private NonClosingReaderWrapper(DirectoryReader in) throws IOException { super(in, new SubReaderWrapper() { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c7e63893bbe90..f7b14192b4936 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -967,9 +967,6 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr newShard.refresh("test"); assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes()); assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); - } catch (Throwable t) { - t.printStackTrace(); - throw t; } finally { newShard.close("just do it", randomBoolean()); } From db710c576f0e825eb1b2a6aab8754bc8086e94cd Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 14 Oct 2015 09:28:05 +0200 Subject: [PATCH 11/11] add javadocs --- .../index/shard/IndexSearcherWrapper.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index 0e7c4772b138f..dff59e9b24431 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.ElasticsearchException; @@ -32,11 +33,18 @@ /** * Extension point to add custom functionality at request time to the {@link DirectoryReader} - * and {@link IndexSearcher} managed by the {@link Engine}. + * and {@link IndexSearcher} managed by the {@link IndexShard}. */ public class IndexSearcherWrapper { /** + * Wraps the given {@link DirectoryReader}. The wrapped reader can filter out document just like delete documents etc. but + * must not change any term or document content. + *

+ * NOTE: The wrapper has a per-request lifecycle, must delegate {@link IndexReader#getCoreCacheKey()} and must be an instance + * of {@link FilterDirectoryReader} that eventually exposes the original reader via {@link FilterDirectoryReader#getDelegate()}. + * The returned reader is closed once it goes out of scope. + *

* @param reader The provided directory reader to be wrapped to add custom functionality * @return a new directory reader wrapping the provided directory reader or if no wrapping was performed * the provided directory reader @@ -79,18 +87,18 @@ public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher eng } } - IndexSearcher innerIndexSearcher = new IndexSearcher(reader); + final IndexSearcher innerIndexSearcher = new IndexSearcher(reader); innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); // TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point // For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten // This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times - IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher); + final IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher); if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) { return engineSearcher; } else { - final Engine.Searcher newSearcher = new Engine.Searcher(engineSearcher.source(), indexSearcher) { + return new Engine.Searcher(engineSearcher.source(), indexSearcher) { @Override public void close() throws ElasticsearchException { try { @@ -105,7 +113,6 @@ public void close() throws ElasticsearchException { } }; - return newSearcher; } }