Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline top level reader close listeners and forbid general usage #14084

Merged
merged 11 commits into from
Oct 14, 2015
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.elasticsearch.common.lucene.index;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand Down Expand Up @@ -76,4 +74,40 @@ public LeafReader wrap(LeafReader reader) {
}
}

/**
* Adds the given listener to the provided directory reader. The reader must contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
* otherwise we can't safely install the listener.
*
* @throws IllegalArgumentException if the reader doesn't contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
*/
@SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener")
public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) {
ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader);
if (elasticsearchDirectoryReader != null) {
assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey();
elasticsearchDirectoryReader.addReaderClosedListener(listener);
return;
}
throw new IllegalArgumentException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader");
}

/**
* Tries to unwrap the given reader until the first {@link ElasticsearchDirectoryReader} instance is found or <code>null</code> if no instance is found;
*/
public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) {
if (reader instanceof FilterDirectoryReader) {
if (reader instanceof ElasticsearchDirectoryReader) {
return (ElasticsearchDirectoryReader) reader;
} else {
// We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate());
}
}
return null;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/
package org.elasticsearch.common.lucene.index;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.index.shard.ShardId;

/**
Expand All @@ -38,7 +35,7 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
*
* @param in specified base reader.
*/
ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
public ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
super(in);
this.shardId = shardId;
}
Expand All @@ -59,4 +56,19 @@ public Object getCoreCacheKey() {
public Object getCombinedCoreAndDeletesKey() {
return in.getCombinedCoreAndDeletesKey();
}

public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
if (reader instanceof FilterLeafReader) {
if (reader instanceof ElasticsearchLeafReader) {
return (ElasticsearchLeafReader) reader;
} else {
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,13 @@ public IndexReader reader() {
return searcher.getIndexReader();
}

public DirectoryReader getDirectoryReader() {
if (reader() instanceof DirectoryReader) {
return (DirectoryReader) reader();
}
throw new IllegalStateException("Can't use " + reader().getClass() + " as an directory reader");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/an/a/

}

public IndexSearcher searcher() {
return searcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -905,9 +906,10 @@ private IndexWriter createWriter(boolean create) throws IOException {
@Override
public void warm(LeafReader reader) throws IOException {
try {
assert isMergedSegment(reader);
LeafReader esLeafReader = new ElasticsearchLeafReader(reader, shardId);
assert isMergedSegment(esLeafReader);
if (warmer != null) {
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(reader, null));
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(esLeafReader, null));
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
warmer.warmNewReaders(context);
}
Expand Down Expand Up @@ -949,6 +951,9 @@ final static class SearchFactory extends EngineSearcherFactory {
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
IndexSearcher searcher = super.newSearcher(reader, previousReader);
if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do we call this method with a LeafReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the merge warmer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you leave a comment to explain this if statement?

return searcher;
}
if (warmer != null) {
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
// we will release explicitly
Expand Down Expand Up @@ -986,10 +991,11 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
}

if (newSearcher != null) {
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher));
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("new_reader_warming", newSearcher));
warmer.warmNewReaders(context);
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("top_reader_warming", searcher)));
} catch (Throwable e) {
if (isEngineClosed.get() == false) {
logger.warn("failed to prepare/warm", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
Expand Down Expand Up @@ -237,9 +238,9 @@ IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, Mapp

public static interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {

IndexFieldData<FD> loadGlobal(IndexReader indexReader);
IndexFieldData<FD> loadGlobal(DirectoryReader indexReader);

IndexFieldData<FD> localGlobalDirect(IndexReader indexReader) throws Exception;
IndexFieldData<FD> localGlobalDirect(DirectoryReader indexReader) throws Exception;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.ShardId;

Expand All @@ -33,7 +33,7 @@ public interface IndexFieldDataCache {

<FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(LeafReaderContext context, IFD indexFieldData) throws Exception;

<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception;
<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception;

/**
* Clears all the field data stored cached in on this index.
Expand Down Expand Up @@ -67,7 +67,7 @@ public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(Leaf

@Override
@SuppressWarnings("unchecked")
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(IndexReader indexReader, IFD indexFieldData) throws Exception {
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(DirectoryReader indexReader, IFD indexFieldData) throws Exception {
return (IFD) indexFieldData.localGlobalDirect(indexReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;


Expand All @@ -33,12 +34,12 @@ public interface IndexOrdinalsFieldData extends IndexFieldData.Global<AtomicOrdi
* potentially from a cache.
*/
@Override
IndexOrdinalsFieldData loadGlobal(IndexReader indexReader);
IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader);

/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;


Expand All @@ -34,12 +35,12 @@ public interface IndexParentChildFieldData extends IndexFieldData.Global<AtomicP
* potentially from a cache.
*/
@Override
IndexParentChildFieldData loadGlobal(IndexReader indexReader);
IndexParentChildFieldData loadGlobal(DirectoryReader indexReader);

/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.fielddata.ordinals;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.Accountable;
Expand Down Expand Up @@ -59,12 +60,12 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context) throws Exce
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
return this;
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, Mu
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
Expand All @@ -76,7 +76,7 @@ public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

package org.elasticsearch.index.fielddata.plain;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.*;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -123,12 +119,12 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context)
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
return this;
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return loadGlobal(indexReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSetting
}

@Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
Expand Down Expand Up @@ -170,7 +170,7 @@ public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldDat
}

@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
final long startTime = System.nanoTime();

long ramBytesUsed = 0;
Expand Down Expand Up @@ -347,15 +347,15 @@ public Collection<Accountable> getChildResources() {
}

@Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) {
return this;
}
throw new IllegalStateException();
}

@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return loadGlobal(indexReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.fielddata.plain;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -61,7 +62,7 @@ public AtomicOrdinalsFieldData loadDirect(LeafReaderContext context) throws Exce
}

@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
Expand All @@ -78,7 +79,7 @@ public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
}

@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger);
}
}
Loading