Skip to content

Commit

Permalink
Merge pull request #14107 from s1monw/release_searcher_on_wrapper_fai…
Browse files Browse the repository at this point in the history
…lure

Ensure searcher is release if wrapping fails
  • Loading branch information
s1monw committed Oct 14, 2015
2 parents da12290 + c133bec commit 461ce98
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 15 deletions.
15 changes: 13 additions & 2 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -717,11 +719,20 @@ public void failShard(String reason, @Nullable Throwable e) {

public Engine.Searcher acquireSearcher(String source) {
readAllowed();
Engine engine = getEngine();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source);
boolean success = false;
try {
return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source));
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(engineConfig, searcher);
assert wrappedSearcher != null;
success = true;
return wrappedSearcher;
} catch (IOException ex) {
throw new ElasticsearchException("failed to wrap searcher", ex);
} finally {
if (success == false) {
Releasables.close(success, searcher);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
Expand Down Expand Up @@ -911,10 +912,6 @@ public void testSearcherWrapperIsUsed() throws IOException {
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
assertEquals(search.totalHits, 1);
}

ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
IndexServicesProvider indexServices = indexService.getIndexServices();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
Expand All @@ -927,16 +924,8 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr
}
};

IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
assertEquals(search.totalHits, 0);
Expand All @@ -948,7 +937,34 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
getResult.release();
} finally {
newShard.close("just do it", randomBoolean());
}
}

public void testSearcherWrapperWorksWithGlobaOrdinals() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get();

IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new FieldMaskingReader("foo", reader);
}

@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};

IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
// test global ordinals are evicted
MappedFieldType foo = newShard.mapperService().indexName("foo");
IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo);
Expand All @@ -970,7 +986,53 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr
} finally {
newShard.close("just do it", randomBoolean());
}
}

public void testSearchIsReleaseIfWrapperFails() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
throw new RuntimeException("boom");
}

@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};

IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
newShard.acquireSearcher("test");
fail("exception expected");
} catch (RuntimeException ex) {
//
} finally {
newShard.close("just do it", randomBoolean());
}
// test will fail due to unclosed searchers if the searcher is not released
}

private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
IndexServicesProvider indexServices = indexService.getIndexServices();
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
return newShard;
}

private static class FieldMaskingReader extends FilterDirectoryReader {
Expand Down

0 comments on commit 461ce98

Please sign in to comment.