Skip to content

Commit

Permalink
Don't fail replica if FlushNotAllowedEngineException is thrown (#20632)
Browse files Browse the repository at this point in the history
This is a issue in all 2.x releases that if we run into a FlushNotAllowedEngineException
on a replica (ie. a flush is already running) we fail the replica. We should just ignore this
excepiton and not fail the shard.

Note: this is against 2.x only. Master changed in #20597
Relates to #20569
  • Loading branch information
s1monw authored Sep 23, 2016
1 parent c487a2b commit a3fd725
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -91,4 +93,15 @@ protected ClusterBlockLevel indexBlockLevel() {
protected boolean shouldExecuteReplication(Settings settings) {
return true;
}

@Override
protected boolean mustFailReplica(Throwable e) {
// if we are running flush ith wait_if_ongoing=false (default) we might get a FlushNotAllowedEngineException from the
// replica that is a signal that there is another flush ongoing and we stepped out. This behavior has changed in 5.x
// where we don't throw an exception anymore. In such a case we ignore the exception an do NOT fail the replica.
if (ExceptionsHelper.unwrapCause(e).getClass() == FlushNotAllowedEngineException.class) {
return false;
}
return super.mustFailReplica(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ protected boolean ignoreReplicaException(Throwable e) {
return false;
}

/**
* Returns <code>true</code> iff the replica must be failed if it threw the given exception.
* This defaults to the inverse of {@link #ignoreReplicaException(Throwable)}
*/
protected boolean mustFailReplica(Throwable e) {
return ignoreReplicaException(e) == false;
}

protected boolean isConflictException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
// on version conflict or document missing, it means
Expand Down Expand Up @@ -360,7 +368,8 @@ private void failReplicaIfNeeded(Throwable t) {
String index = request.shardId().getIndex();
int shardId = request.shardId().id();
logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
if (ignoreReplicaException(t) == false) {
if (mustFailReplica(t)) {
assert ignoreReplicaException(t) == false;
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
Expand Down Expand Up @@ -927,7 +936,8 @@ public void handleResponse(TransportResponse.Empty vResponse) {
public void handleException(TransportException exp) {
onReplicaFailure(nodeId, exp);
logger.trace("[{}] transport failure during replica request [{}], action [{}]", exp, node, replicaRequest, transportReplicaAction);
if (ignoreReplicaException(exp) == false) {
if (mustFailReplica(exp)) {
assert ignoreReplicaException(exp) == false;
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
shardStateAction.shardFailed(shard, indexUUID, "failed to perform " + actionName + " on replica on node " + node, exp);
}
Expand Down
87 changes: 87 additions & 0 deletions core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package org.elasticsearch.indices.flush;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
Expand All @@ -53,6 +61,7 @@ public class FlushIT extends ESIntegTestCase {
public void testWaitIfOngoing() throws InterruptedException {
createIndex("test");
ensureGreen("test");
ClusterStateResponse beforeTestResponse = client().admin().cluster().prepareState().get();
final int numIters = scaledRandomIntBetween(10, 30);
for (int i = 0; i < numIters; i++) {
for (int j = 0; j < 10; j++) {
Expand Down Expand Up @@ -84,6 +93,84 @@ public void onFailure(Throwable e) {
latch.await();
assertThat(errors, emptyIterable());
}
ClusterStateResponse afterTestResponse = client().admin().cluster().prepareState().get();
IndexRoutingTable afterRoutingTable = afterTestResponse.getState().getRoutingTable().index("test");
IndexRoutingTable beforeRoutingTable = beforeTestResponse.getState().getRoutingTable().index("test");
assertEquals(afterRoutingTable, beforeRoutingTable);

}

/**
* We test here that failing with FlushNotAllowedEngineException doesn't fail the shards since it's whitelisted.
* see #20632
* @throws InterruptedException
*/
@Test
public void testDontWaitIfOngoing() throws InterruptedException {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get();
ensureGreen("test");
ClusterStateResponse beforeTestResponse = client().admin().cluster().prepareState().get();
List<ShardRouting> shardRoutings = beforeTestResponse.getState().getRoutingTable().index("test")
.shardsWithState(ShardRoutingState.STARTED);
ShardRouting theReplica = null;
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.primary() == false) {
theReplica = shardRouting;
break;
}
}
assertNotNull(theReplica);
DiscoveryNode discoveryNode = beforeTestResponse.getState().nodes().get(theReplica.currentNodeId());
final IndicesService instance = internalCluster().getInstance(IndicesService.class, discoveryNode.getName());
final ShardRouting routing = theReplica;
final AtomicBoolean run = new AtomicBoolean(true);
Thread t = new Thread() {
@Override
public void run() {
IndexService indexService = instance.indexService(routing.index());
IndexShard shard = indexService.shard(routing.id());
while(run.get()) {
shard.flush(new FlushRequest().waitIfOngoing(true));
}
}
};
t.start();
final int numIters = scaledRandomIntBetween(10, 30);
for (int i = 0; i < numIters; i++) {
for (int j = 0; j < 10; j++) {
client().prepareIndex("test", "test").setSource("{}").get();
}
final CountDownLatch latch = new CountDownLatch(10);
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();
for (int j = 0; j < 10; j++) {
client().admin().indices().prepareFlush("test").setWaitIfOngoing(false).execute(new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
try {
latch.countDown();
} catch (Throwable ex) {
onFailure(ex);
}
}

@Override
public void onFailure(Throwable e) {
errors.add(e);
latch.countDown();
}
});
}
latch.await();
assertThat(errors, emptyIterable());
}
run.set(false);
t.join();
ClusterStateResponse afterTestResponse = client().admin().cluster().prepareState().get();
IndexRoutingTable afterRoutingTable = afterTestResponse.getState().getRoutingTable().index("test");
IndexRoutingTable beforeRoutingTable = beforeTestResponse.getState().getRoutingTable().index("test");
assertEquals(afterRoutingTable, beforeRoutingTable);

}

public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
Expand Down

0 comments on commit a3fd725

Please sign in to comment.