Skip to content

Commit

Permalink
Merge pull request #14084 from s1monw/close_the_wrapper_only
Browse files Browse the repository at this point in the history
Streamline top level reader close listeners and forbid general usage
  • Loading branch information
s1monw committed Oct 14, 2015
1 parent a1bed55 commit a5cb0bc
Show file tree
Hide file tree
Showing 43 changed files with 719 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.elasticsearch.common.lucene.index;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand Down Expand Up @@ -76,4 +74,38 @@ public LeafReader wrap(LeafReader reader) {
}
}

/**
* Adds the given listener to the provided directory reader. The reader must contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
* otherwise we can't safely install the listener.
*
* @throws IllegalArgumentException if the reader doesn't contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
*/
@SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener")
public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) {
ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader);
if (elasticsearchDirectoryReader != null) {
assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey();
elasticsearchDirectoryReader.addReaderClosedListener(listener);
return;
}
throw new IllegalArgumentException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader");
}

/**
* Tries to unwrap the given reader until the first {@link ElasticsearchDirectoryReader} instance is found or <code>null</code> if no instance is found;
*/
public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) {
if (reader instanceof FilterDirectoryReader) {
if (reader instanceof ElasticsearchDirectoryReader) {
return (ElasticsearchDirectoryReader) reader;
} else {
// We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/
package org.elasticsearch.common.lucene.index;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.index.shard.ShardId;

/**
Expand All @@ -38,7 +35,7 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
*
* @param in specified base reader.
*/
ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
public ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
super(in);
this.shardId = shardId;
}
Expand All @@ -55,8 +52,18 @@ public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}

@Override
public Object getCombinedCoreAndDeletesKey() {
return in.getCombinedCoreAndDeletesKey();
public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
if (reader instanceof FilterLeafReader) {
if (reader instanceof ElasticsearchLeafReader) {
return (ElasticsearchLeafReader) reader;
} else {
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,13 @@ public IndexReader reader() {
return searcher.getIndexReader();
}

public DirectoryReader getDirectoryReader() {
if (reader() instanceof DirectoryReader) {
return (DirectoryReader) reader();
}
throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
}

public IndexSearcher searcher() {
return searcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.IndexSearcher;

import java.io.IOException;

/**
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
* and {@link IndexSearcher} managed by the {@link Engine}.
Expand All @@ -33,7 +35,7 @@ public interface IndexSearcherWrapper {
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
* the provided directory reader
*/
DirectoryReader wrap(DirectoryReader reader);
DirectoryReader wrap(DirectoryReader reader) throws IOException;

/**
* @param engineConfig The engine config which can be used to get the query cache and query cache policy from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.engine.Engine.Searcher;

import java.io.IOException;
import java.util.Set;

/**
Expand Down Expand Up @@ -60,35 +64,86 @@ public IndexSearcherWrappingService(Set<IndexSearcherWrapper> wrappers) {

/**
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
* gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned.
* gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned.
*
* This is invoked each time a {@link Searcher} is requested to do an operation. (for example search)
* This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search)
*/
public Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException {
public final Engine.Searcher wrap(EngineConfig engineConfig, final Engine.Searcher engineSearcher) throws IOException {
final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.getDirectoryReader());
if (elasticsearchDirectoryReader == null) {
throw new IllegalStateException("Can't wrap non elasticsearch directory reader");
}
if (wrapper == null) {
return engineSearcher;
}
NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader());
DirectoryReader reader = wrapper.wrap(nonClosingReaderWrapper);
if (reader != nonClosingReaderWrapper) {
if (reader.getCoreCacheKey() != elasticsearchDirectoryReader.getCoreCacheKey()) {
throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey, wrappers must override this method and delegate" +
" to the original readers core cache key. Wrapped readers can't be used as cache keys since their are used only per request which would lead to subtle bugs");
}
if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) {
// prevent that somebody wraps with a non-filter reader
throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't");
}
}

DirectoryReader reader = wrapper.wrap((DirectoryReader) engineSearcher.reader());
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
final IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
// TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
IndexSearcher indexSearcher = wrapper.wrap(engineConfig, innerIndexSearcher);
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
final IndexSearcher indexSearcher = wrapper.wrap(engineConfig, innerIndexSearcher);
if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {

@Override
public void close() throws ElasticsearchException {
engineSearcher.close();
try {
reader().close();
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
} catch (IOException e) {
throw new ElasticsearchException("failed to close reader", e);
} finally {
engineSearcher.close();
}

}
};
}
}

private static final class NonClosingReaderWrapper extends FilterDirectoryReader {

private NonClosingReaderWrapper(DirectoryReader in) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return reader;
}
});
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new NonClosingReaderWrapper(in);
}

@Override
protected void doClose() throws IOException {
// don't close here - mimic the MultiReader#doClose = false behavior that FilterDirectoryReader doesn't have
}

@Override
public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -1038,9 +1039,10 @@ private IndexWriter createWriter(boolean create) throws IOException {
@Override
public void warm(LeafReader reader) throws IOException {
try {
assert isMergedSegment(reader);
LeafReader esLeafReader = new ElasticsearchLeafReader(reader, shardId);
assert isMergedSegment(esLeafReader);
if (warmer != null) {
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(reader, null));
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(esLeafReader, null));
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
warmer.warmNewReaders(context);
}
Expand Down Expand Up @@ -1081,6 +1083,12 @@ final static class SearchFactory extends EngineSearcherFactory {
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
IndexSearcher searcher = super.newSearcher(reader, previousReader);
if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) {
// we call newSearcher from the IndexReaderWarmer which warms segments during merging
// in that case the reader is a LeafReader and all we need to do is to build a new Searcher
// and return it since it does it's own warming for that particular reader.
return searcher;
}
if (warmer != null) {
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
// we will release explicitly
Expand Down Expand Up @@ -1118,10 +1126,11 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
}

if (newSearcher != null) {
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher));
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("new_reader_warming", newSearcher));
warmer.warmNewReaders(context);
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("top_reader_warming", searcher)));
} catch (Throwable e) {
if (isEngineClosed.get() == false) {
logger.warn("failed to prepare/warm", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.*;
import org.apache.lucene.search.join.BitDocIdSetFilter;
import org.apache.lucene.util.BitDocIdSet;
Expand Down Expand Up @@ -231,11 +231,11 @@ IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, Mapp
CircuitBreakerService breakerService, MapperService mapperService);
}

public static interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {
interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {

IndexFieldData<FD> loadGlobal(IndexReader indexReader);
IndexFieldData<FD> loadGlobal(DirectoryReader indexReader);

IndexFieldData<FD> localGlobalDirect(IndexReader indexReader) throws Exception;
IndexFieldData<FD> localGlobalDirect(DirectoryReader indexReader) throws Exception;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
Expand All @@ -31,7 +32,7 @@ public interface IndexFieldDataCache {

<FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(LeafReaderContext context, IFD indexFieldData) throws Exception;

<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception;
<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception;

/**
* Clears all the field data stored cached in on this index.
Expand Down Expand Up @@ -61,7 +62,7 @@ public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(Leaf

@Override
@SuppressWarnings("unchecked")
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(IndexReader indexReader, IFD indexFieldData) throws Exception {
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(DirectoryReader indexReader, IFD indexFieldData) throws Exception {
return (IFD) indexFieldData.localGlobalDirect(indexReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;


Expand All @@ -33,12 +34,12 @@ public interface IndexOrdinalsFieldData extends IndexFieldData.Global<AtomicOrdi
* potentially from a cache.
*/
@Override
IndexOrdinalsFieldData loadGlobal(IndexReader indexReader);
IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader);

/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;


Expand All @@ -34,12 +35,12 @@ public interface IndexParentChildFieldData extends IndexFieldData.Global<AtomicP
* potentially from a cache.
*/
@Override
IndexParentChildFieldData loadGlobal(IndexReader indexReader);
IndexParentChildFieldData loadGlobal(DirectoryReader indexReader);

/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;

}
Loading

0 comments on commit a5cb0bc

Please sign in to comment.