-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail shard if IndexShard#storeStats runs into an IOException #32241
Changes from 4 commits
4aaec57
10b83f9
8e1cbaa
6a32438
2576462
f76d924
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,8 @@ | |
import org.apache.lucene.search.TermQuery; | ||
import org.apache.lucene.search.TopDocs; | ||
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.FilterDirectory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.util.Constants; | ||
import org.elasticsearch.Version; | ||
|
@@ -112,6 +114,7 @@ | |
import org.elasticsearch.test.FieldMaskingReader; | ||
import org.elasticsearch.test.VersionUtils; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.ElasticsearchException; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
|
@@ -138,6 +141,7 @@ | |
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.function.LongFunction; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
|
@@ -1162,6 +1166,81 @@ public void testShardStats() throws IOException { | |
closeShards(shard); | ||
} | ||
|
||
|
||
public void testShardStatsWithFailures() throws IOException { | ||
allowShardFailures(); | ||
final ShardId shardId = new ShardId("index", "_na_", 0); | ||
final ShardRouting shardRouting = newShardRouting(shardId, "node", true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, ShardRoutingState.INITIALIZING); | ||
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); | ||
|
||
|
||
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); | ||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) | ||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) | ||
.build(); | ||
IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName()) | ||
.settings(settings) | ||
.primaryTerm(0, 1) | ||
.build(); | ||
|
||
// Override two Directory methods to make them fail at our will | ||
// We use AtomicReference here to inject failure in the middle of the test not immediately | ||
// We use Supplier<IOException> instead of IOException to produce meaningful stacktrace | ||
// (remember stack trace is filled when exception is instantiated) | ||
AtomicReference<Supplier<IOException>> exceptionToThrow = new AtomicReference<>(); | ||
AtomicBoolean throwWhenMarkingStoreCorrupted = new AtomicBoolean(false); | ||
Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) { | ||
//fileLength method is called during storeStats try block | ||
//it's not called when store is marked as corrupted | ||
@Override | ||
public long fileLength(String name) throws IOException { | ||
Supplier<IOException> ex = exceptionToThrow.get(); | ||
if (ex == null) { | ||
return super.fileLength(name); | ||
} else { | ||
throw ex.get(); | ||
} | ||
} | ||
|
||
//listAll method is called when marking store as corrupted | ||
@Override | ||
public String[] listAll() throws IOException { | ||
Supplier<IOException> ex = exceptionToThrow.get(); | ||
if (throwWhenMarkingStoreCorrupted.get() && ex != null) { | ||
throw ex.get(); | ||
} else { | ||
return super.listAll(); | ||
} | ||
} | ||
}; | ||
|
||
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) { | ||
IndexShard shard = newShard(shardRouting, shardPath, metaData, store, | ||
null, new InternalEngineFactory(), () -> { | ||
}, EMPTY_EVENT_LISTENER); | ||
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); | ||
shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true)); | ||
|
||
recoverShardFromStore(shard); | ||
|
||
final boolean corruptIndexException = randomBoolean(); | ||
|
||
if (corruptIndexException) { | ||
exceptionToThrow.set(() -> new CorruptIndexException("Test CorruptIndexException", "Test resource")); | ||
throwWhenMarkingStoreCorrupted.set(randomBoolean()); | ||
} else { | ||
exceptionToThrow.set(() -> new IOException("Test IOException")); | ||
} | ||
ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats); | ||
assertTrue(failureCallbackTriggered.get()); | ||
|
||
if (corruptIndexException && !throwWhenMarkingStoreCorrupted.get()) { | ||
assertTrue(store.isMarkedCorrupted()); | ||
} | ||
} | ||
} | ||
|
||
public void testRefreshMetric() throws IOException { | ||
IndexShard shard = newStartedShard(); | ||
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery | ||
|
@@ -1868,6 +1947,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { | |
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), | ||
shard.shardPath(), | ||
shard.indexSettings().getIndexMetaData(), | ||
null, | ||
wrapper, | ||
new InternalEngineFactory(), | ||
() -> {}, | ||
|
@@ -2020,7 +2100,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { | |
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), | ||
shard.shardPath(), | ||
shard.indexSettings().getIndexMetaData(), | ||
wrapper, | ||
null, wrapper, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is probably better on a separate line like the rest of the parameters. |
||
new InternalEngineFactory(), | ||
() -> {}, | ||
EMPTY_EVENT_LISTENER); | ||
|
@@ -2506,7 +2586,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { | |
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix"))) | ||
.build(); | ||
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, | ||
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
|
||
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); | ||
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); | ||
|
@@ -3005,7 +3085,7 @@ public void testFlushOnInactive() throws Exception { | |
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); | ||
AtomicBoolean markedInactive = new AtomicBoolean(); | ||
AtomicReference<IndexShard> primaryRef = new AtomicReference<>(); | ||
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> { | ||
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { | ||
}, new IndexEventListener() { | ||
@Override | ||
public void onShardInactive(IndexShard indexShard) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,7 +104,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { | |
shardRouting, | ||
shard.shardPath(), | ||
shard.indexSettings().getIndexMetaData(), | ||
null, | ||
null, null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is probably better on a separate line like the rest of the parameters. |
||
new InternalEngineFactory(), | ||
() -> {}, | ||
EMPTY_EVENT_LISTENER); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,15 +163,20 @@ public Settings threadPoolSettings() { | |
return Settings.EMPTY; | ||
} | ||
|
||
private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { | ||
final ShardId shardId = shardPath.getShardId(); | ||
|
||
protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { | ||
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex())); | ||
} | ||
|
||
protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException { | ||
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { | ||
@Override | ||
public Directory newDirectory() throws IOException { | ||
return newFSDirectory(shardPath.resolveIndex()); | ||
return directory; | ||
} | ||
}; | ||
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); | ||
|
||
} | ||
|
||
/** | ||
|
@@ -284,29 +289,32 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, | |
final ShardId shardId = routing.shardId(); | ||
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); | ||
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); | ||
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, | ||
return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, | ||
EMPTY_EVENT_LISTENER, listeners); | ||
} | ||
|
||
/** | ||
* creates a new initializing shard. | ||
* @param routing shard routing to use | ||
* @param shardPath path to use for shard data | ||
* @param indexMetaData indexMetaData for the shard, including any mapping | ||
* @param indexSearcherWrapper an optional wrapper to be used during searchers | ||
* @param globalCheckpointSyncer callback for syncing global checkpoints | ||
* @param indexEventListener index even listener | ||
* @param listeners an optional set of listeners to add to the shard | ||
* @param routing shard routing to use | ||
* @param shardPath path to use for shard data | ||
* @param indexMetaData indexMetaData for the shard, including any mapping | ||
* @param store an optional custom store to use. If null a default file based store will be created | ||
* @param indexSearcherWrapper an optional wrapper to be used during searchers | ||
* @param globalCheckpointSyncer callback for syncing global checkpoints | ||
* @param indexEventListener index event listener | ||
* @param listeners an optional set of listeners to add to the shard | ||
*/ | ||
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, | ||
@Nullable IndexSearcherWrapper indexSearcherWrapper, | ||
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper, | ||
@Nullable EngineFactory engineFactory, | ||
Runnable globalCheckpointSyncer, | ||
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException { | ||
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); | ||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); | ||
final IndexShard indexShard; | ||
final Store store = createStore(indexSettings, shardPath); | ||
if (store == null) { | ||
store = createStore(indexSettings, shardPath); | ||
} | ||
boolean success = false; | ||
try { | ||
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); | ||
|
@@ -356,7 +364,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index | |
routing, | ||
current.shardPath(), | ||
current.indexSettings().getIndexMetaData(), | ||
null, | ||
null, null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is probably better on a separate line like the rest of the parameters. |
||
current.engineFactory, | ||
current.getGlobalCheckpointSyncer(), | ||
EMPTY_EVENT_LISTENER, listeners); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should say
storeStats
, notstoreState
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing, looking at the implementation of
failShard
, we don't need to appende.getMessage()
here.