Skip to content

Commit

Permalink
Merge pull request #14084 from s1monw/close_the_wrapper_only
Browse files Browse the repository at this point in the history
Streamline top level reader close listeners and forbid general usage
  • Loading branch information
s1monw committed Oct 14, 2015
2 parents 2e445d3 + db710c5 commit 5828796
Show file tree
Hide file tree
Showing 35 changed files with 573 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.elasticsearch.common.lucene.index;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand Down Expand Up @@ -76,4 +74,38 @@ public LeafReader wrap(LeafReader reader) {
}
}

/**
* Adds the given listener to the provided directory reader. The reader must contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
* otherwise we can't safely install the listener.
*
* @throws IllegalArgumentException if the reader doesn't contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
*/
@SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener")
public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) {
ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader);
if (elasticsearchDirectoryReader != null) {
assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey();
elasticsearchDirectoryReader.addReaderClosedListener(listener);
return;
}
throw new IllegalArgumentException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader");
}

/**
* Tries to unwrap the given reader until the first {@link ElasticsearchDirectoryReader} instance is found or <code>null</code> if no instance is found;
*/
public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) {
if (reader instanceof FilterDirectoryReader) {
if (reader instanceof ElasticsearchDirectoryReader) {
return (ElasticsearchDirectoryReader) reader;
} else {
// We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/
package org.elasticsearch.common.lucene.index;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.index.shard.ShardId;

/**
Expand All @@ -38,7 +35,7 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
*
* @param in specified base reader.
*/
ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
public ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
super(in);
this.shardId = shardId;
}
Expand All @@ -55,8 +52,18 @@ public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}

@Override
public Object getCombinedCoreAndDeletesKey() {
return in.getCombinedCoreAndDeletesKey();
public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
if (reader instanceof FilterLeafReader) {
if (reader instanceof ElasticsearchLeafReader) {
return (ElasticsearchLeafReader) reader;
} else {
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,13 @@ public IndexReader reader() {
return searcher.getIndexReader();
}

public DirectoryReader getDirectoryReader() {
if (reader() instanceof DirectoryReader) {
return (DirectoryReader) reader();
}
throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
}

public IndexSearcher searcher() {
return searcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -905,9 +906,10 @@ private IndexWriter createWriter(boolean create) throws IOException {
@Override
public void warm(LeafReader reader) throws IOException {
try {
assert isMergedSegment(reader);
LeafReader esLeafReader = new ElasticsearchLeafReader(reader, shardId);
assert isMergedSegment(esLeafReader);
if (warmer != null) {
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(reader, null));
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(esLeafReader, null));
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
warmer.warmNewReaders(context);
}
Expand Down Expand Up @@ -949,6 +951,12 @@ final static class SearchFactory extends EngineSearcherFactory {
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
IndexSearcher searcher = super.newSearcher(reader, previousReader);
if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) {
// we call newSearcher from the IndexReaderWarmer which warms segments during merging
// in that case the reader is a LeafReader and all we need to do is to build a new Searcher
// and return it since it does it's own warming for that particular reader.
return searcher;
}
if (warmer != null) {
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
// we will release explicitly
Expand Down Expand Up @@ -986,10 +994,11 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
}

if (newSearcher != null) {
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher));
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("new_reader_warming", newSearcher));
warmer.warmNewReaders(context);
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("top_reader_warming", searcher)));
} catch (Throwable e) {
if (isEngineClosed.get() == false) {
logger.warn("failed to prepare/warm", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
Expand Down Expand Up @@ -235,11 +236,11 @@ IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, Mapp
CircuitBreakerService breakerService, MapperService mapperService);
}

public static interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {
interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {

IndexFieldData<FD> loadGlobal(IndexReader indexReader);
IndexFieldData<FD> loadGlobal(DirectoryReader indexReader);

IndexFieldData<FD> localGlobalDirect(IndexReader indexReader) throws Exception;
IndexFieldData<FD> localGlobalDirect(DirectoryReader indexReader) throws Exception;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.ShardId;

Expand All @@ -33,7 +33,7 @@ public interface IndexFieldDataCache {

<FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(LeafReaderContext context, IFD indexFieldData) throws Exception;

<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception;
<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception;

/**
* Clears all the field data stored cached in on this index.
Expand Down Expand Up @@ -67,7 +67,7 @@ public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(Leaf

@Override
@SuppressWarnings("unchecked")
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(IndexReader indexReader, IFD indexFieldData) throws Exception {
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(DirectoryReader indexReader, IFD indexFieldData) throws Exception {
return (IFD) indexFieldData.localGlobalDirect(indexReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;


Expand All @@ -33,12 +34,12 @@ public interface IndexOrdinalsFieldData extends IndexFieldData.Global<AtomicOrdi
* potentially from a cache.
*/
@Override
IndexOrdinalsFieldData loadGlobal(IndexReader indexReader);
IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader);

/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;


Expand All @@ -34,12 +35,12 @@ public interface IndexParentChildFieldData extends IndexFieldData.Global<AtomicP
* potentially from a cache.
*/
@Override
IndexParentChildFieldData loadGlobal(IndexReader indexReader);
IndexParentChildFieldData loadGlobal(DirectoryReader indexReader);

/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.fielddata.ordinals;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.Accountable;
Expand Down Expand Up @@ -59,12 +60,12 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context) throws Exce
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
return this;
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, Mu
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
Expand All @@ -76,7 +76,7 @@ public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

package org.elasticsearch.index.fielddata.plain;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.*;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -123,12 +119,12 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context)
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
return this;
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return loadGlobal(indexReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSetting
}

@Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
Expand Down Expand Up @@ -170,7 +170,7 @@ public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldDat
}

@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
final long startTime = System.nanoTime();

long ramBytesUsed = 0;
Expand Down Expand Up @@ -347,15 +347,15 @@ public Collection<Accountable> getChildResources() {
}

@Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) {
return this;
}
throw new IllegalStateException();
}

@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return loadGlobal(indexReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata.plain;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -61,7 +62,7 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context) throws Exce
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
Expand All @@ -78,7 +79,7 @@ public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger);
}
}
Loading

0 comments on commit 5828796

Please sign in to comment.