Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail shard if IndexShard#storeStats runs into an IOException #32241

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ public StoreStats storeStats() {
try {
return store.stats();
} catch (IOException e) {
failShard("Failing shard because of exception during storeStats", e);
throw new ElasticsearchException("io exception while building 'store stats'", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -112,6 +114,7 @@
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.ElasticsearchException;

import java.io.IOException;
import java.nio.charset.Charset;
Expand All @@ -138,6 +141,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -1162,6 +1166,81 @@ public void testShardStats() throws IOException {
closeShards(shard);
}


public void testShardStatsWithFailures() throws IOException {
allowShardFailures();
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = newShardRouting(shardId, "node", true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, ShardRoutingState.INITIALIZING);
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());


ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings)
.primaryTerm(0, 1)
.build();

// Override two Directory methods to make them fail at our will
// We use AtomicReference here to inject failure in the middle of the test not immediately
// We use Supplier<IOException> instead of IOException to produce meaningful stacktrace
// (remember stack trace is filled when exception is instantiated)
AtomicReference<Supplier<IOException>> exceptionToThrow = new AtomicReference<>();
AtomicBoolean throwWhenMarkingStoreCorrupted = new AtomicBoolean(false);
Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) {
//fileLength method is called during storeStats try block
//it's not called when store is marked as corrupted
@Override
public long fileLength(String name) throws IOException {
Supplier<IOException> ex = exceptionToThrow.get();
if (ex == null) {
return super.fileLength(name);
} else {
throw ex.get();
}
}

//listAll method is called when marking store as corrupted
@Override
public String[] listAll() throws IOException {
Supplier<IOException> ex = exceptionToThrow.get();
if (throwWhenMarkingStoreCorrupted.get() && ex != null) {
throw ex.get();
} else {
return super.listAll();
}
}
};

try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
null, new InternalEngineFactory(), () -> {
}, EMPTY_EVENT_LISTENER);
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true));

recoverShardFromStore(shard);

final boolean corruptIndexException = randomBoolean();

if (corruptIndexException) {
exceptionToThrow.set(() -> new CorruptIndexException("Test CorruptIndexException", "Test resource"));
throwWhenMarkingStoreCorrupted.set(randomBoolean());
} else {
exceptionToThrow.set(() -> new IOException("Test IOException"));
}
ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats);
assertTrue(failureCallbackTriggered.get());

if (corruptIndexException && !throwWhenMarkingStoreCorrupted.get()) {
assertTrue(store.isMarkedCorrupted());
}
}
}

public void testRefreshMetric() throws IOException {
IndexShard shard = newStartedShard();
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery
Expand Down Expand Up @@ -1868,6 +1947,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
null,
wrapper,
new InternalEngineFactory(),
() -> {},
Expand Down Expand Up @@ -2020,6 +2100,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
null,
wrapper,
new InternalEngineFactory(),
() -> {},
Expand Down Expand Up @@ -2506,7 +2587,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
.build();
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
Expand Down Expand Up @@ -3005,7 +3086,7 @@ public void testFlushOnInactive() throws Exception {
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
AtomicBoolean markedInactive = new AtomicBoolean();
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> {
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> {
}, new IndexEventListener() {
@Override
public void onShardInactive(IndexShard indexShard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
null,
null,
new InternalEngineFactory(),
() -> {},
EMPTY_EVENT_LISTENER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ assert shardRoutings().stream()

public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
nodeId,
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);
shardId,
nodeId,
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);

final IndexShard newReplica =
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting),
() -> {}, EMPTY_EVENT_LISTENER);
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,20 @@ public Settings threadPoolSettings() {
return Settings.EMPTY;
}

private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
final ShardId shardId = shardPath.getShardId();

protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
}

protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return newFSDirectory(shardPath.resolveIndex());
return directory;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));

}

/**
Expand Down Expand Up @@ -284,29 +289,32 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
final ShardId shardId = routing.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
EMPTY_EVENT_LISTENER, listeners);
}

/**
* creates a new initializing shard.
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index even listener
* @param listeners an optional set of listeners to add to the shard
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param store an optional custom store to use. If null a default file based store will be created
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index event listener
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
final IndexShard indexShard;
final Store store = createStore(indexSettings, shardPath);
if (store == null) {
store = createStore(indexSettings, shardPath);
}
boolean success = false;
try {
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
Expand Down Expand Up @@ -357,6 +365,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index
current.shardPath(),
current.indexSettings().getIndexMetaData(),
null,
null,
current.engineFactory,
current.getGlobalCheckpointSyncer(),
EMPTY_EVENT_LISTENER, listeners);
Expand Down