Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail shard if IndexShard#storeStats runs into an IOException. Closes #29008 #29078

Closed
wants to merge 4 commits into from

Conversation

milan15
Copy link
Contributor

@milan15 milan15 commented Mar 15, 2018

  • Have you signed the contributor license agreement?
  • Have you followed the contributor guidelines?
  • If submitting code, have you built your formula locally prior to submission with gradle check?
  • If submitting code, is your pull request against master? Unless there is a good reason otherwise, we prefer pull requests against master and will backport as needed.
  • If submitting code, have you checked that your submission is for an OS that we support?
  • If you are submitting this code for a class then read our policy for that.

@elasticmachine
Copy link
Collaborator

Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually?

1 similar comment
@elasticmachine
Copy link
Collaborator

Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually?

@cbuescher cbuescher added review :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. labels Mar 15, 2018
@milan15
Copy link
Contributor Author

milan15 commented Mar 20, 2018

@bleskes sorry to take out some of your time. just wanted to check if you had a chance to look at this.

@bleskes
Copy link
Contributor

bleskes commented Mar 21, 2018

@milan15 I turned out more complicated than I hoped, but here's a patch that will give you a test you can start tweaking:

Index: test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java	(revision 602145a562cf5e4dc4bd3f72d084da6b3c968797)
+++ test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java	(date 1521626048000)
@@ -25,7 +25,6 @@
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@@ -46,6 +45,7 @@
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
@@ -140,12 +140,15 @@
         return Settings.EMPTY;
     }
 
-    private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
-        final ShardId shardId = shardPath.getShardId();
+    protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
+        return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
+    }
+
+    protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException {
         final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
             @Override
             public Directory newDirectory() throws IOException {
-                return newFSDirectory(shardPath.resolveIndex());
+                return directory;
             }
         };
         return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
@@ -260,28 +263,30 @@
         final ShardId shardId = routing.shardId();
         final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
         ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
-        return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, listeners);
+        return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, listeners);
     }
 
     /**
      * creates a new initializing shard.
-     *
-     * @param routing                shard routing to use
+     *  @param routing                shard routing to use
      * @param shardPath              path to use for shard data
      * @param indexMetaData          indexMetaData for the shard, including any mapping
+     * @param store                  an optional custom store to use. If null a default file based store will be created
      * @param indexSearcherWrapper   an optional wrapper to be used during searchers
      * @param globalCheckpointSyncer callback for syncing global checkpoints
      * @param listeners              an optional set of listeners to add to the shard
      */
     protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
-                                  @Nullable IndexSearcherWrapper indexSearcherWrapper,
+                                  @Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
                                   @Nullable EngineFactory engineFactory,
                                   Runnable globalCheckpointSyncer,
                                   IndexingOperationListener... listeners) throws IOException {
         final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
         final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
         final IndexShard indexShard;
-        final Store store = createStore(indexSettings, shardPath);
+        if (store == null) {
+            store = createStore(indexSettings, shardPath);
+        }
         boolean success = false;
         try {
             IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
@@ -332,7 +337,7 @@
                 routing,
                 current.shardPath(),
                 current.indexSettings().getIndexMetaData(),
-                null,
+            null, null,
                 current.engineFactory,
                 current.getGlobalCheckpointSyncer(),
                 listeners);
Index: server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java	(revision 602145a562cf5e4dc4bd3f72d084da6b3c968797)
+++ server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java	(date 1521621909000)
@@ -261,7 +261,7 @@
                 RecoverySource.PeerRecoverySource.INSTANCE);
 
             final IndexShard newReplica =
-                    newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {});
+                    newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting), () -> {});
             replicas.add(newReplica);
             updateAllocationIDsOnPrimary();
             return newReplica;
Index: server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java	(revision 602145a562cf5e4dc4bd3f72d084da6b3c968797)
+++ server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java	(date 1521626048000)
@@ -27,9 +27,11 @@
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.util.Constants;
-import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@@ -70,6 +72,7 @@
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
@@ -1171,6 +1174,45 @@
         closeShards(shard);
     }
 
+    public void testShardStatsWithFailures() throws IOException {
+        final ShardRouting shardRouting = newShardRouting(new ShardId("index", "_na_", 0), "node", true, ShardRoutingState.INITIALIZING);
+        final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
+        final ShardId shardId = shardRouting.shardId();
+        ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
+        Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .build();
+        IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName())
+            .settings(settings)
+            .primaryTerm(0, 0)
+            .build();
+        AtomicReference<IOException> exceptionToThrow = new AtomicReference<>();
+        Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) {
+            @Override
+            public String[] listAll() throws IOException {
+                if (exceptionToThrow.get() != null) {
+                    throw exceptionToThrow.get();
+                }
+                return super.listAll();
+            }
+        };
+        Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory);
+        IndexShard shard = newShard(shardRouting, shardPath, metaData, store, null, null, () -> {});
+        recoverShardFromStore(shard);
+        if (randomBoolean()) {
+            exceptionToThrow.set(new CorruptIndexException("test", "test"));
+        } else {
+            exceptionToThrow.set(new IOException("test"));
+        }
+        ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats);
+        assertThat(e.getMessage(), containsString("Some message"));
+        assertThat(shard.state(), equalTo(IndexShardState.CLOSED));
+        if (exceptionToThrow.get() instanceof CorruptIndexException) {
+            assertTrue(store.isMarkedCorrupted());
+        }
+    }
+
     public void testRefreshMetric() throws IOException {
         IndexShard shard = newStartedShard();
         assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery
@@ -1856,7 +1898,7 @@
         closeShards(shard);
         IndexShard newShard = newShard(
             ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
-            shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
+            shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, wrapper, null, () -> {});
 
         recoverShardFromStore(newShard);
 
@@ -2002,7 +2044,7 @@
         closeShards(shard);
         IndexShard newShard = newShard(
             ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
-            shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
+            shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, wrapper, null, () -> {});
 
         recoverShardFromStore(newShard);
 
@@ -2485,7 +2527,7 @@
                 .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
             .build();
         final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
-            null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer());
+            null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer());
 
         Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
         assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
Index: server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java	(revision 602145a562cf5e4dc4bd3f72d084da6b3c968797)
+++ server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java	(date 1521626048000)
@@ -20,13 +20,13 @@
 package org.elasticsearch.repositories.blobstore;
 
 import org.apache.lucene.store.Directory;
-import org.elasticsearch.core.internal.io.IOUtils;
 import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingHelper;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.TestEnvironment;
 import org.elasticsearch.index.shard.IndexShard;
@@ -97,7 +97,7 @@
 
             // build a new shard using the same store directory as the closed shard
             ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(shard.routingEntry(), EXISTING_STORE_INSTANCE);
-            shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {});
+            shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, null, () -> {});
 
             // restore the shard
             recoverShardFromSnapshot(shard, snapshot, repository);

@@ -933,6 +933,7 @@ public StoreStats storeStats() {
try {
return store.stats();
} catch (IOException e) {
failShard("Failing Shard as IOException was found.",e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change the message to the same one used in the exception we throw?

@milan15
Copy link
Contributor Author

milan15 commented Mar 21, 2018

@bleskes thanks! 👍 i will start playing with test you provided.

@bleskes
Copy link
Contributor

bleskes commented Apr 9, 2018

@milan15 are you still planning to pick this up? If not, no problem - we can take over and make this happen.

@bleskes
Copy link
Contributor

bleskes commented Apr 24, 2018

@milan15 a friendly reminder :)

@milan15
Copy link
Contributor Author

milan15 commented Apr 30, 2018

@bleskes Sorry for not responding quick! got super busy among other things. i did spend couple of weekends on this, but was not able to get anywhere. would be awesome if someone can take it up.
and again apologies!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants