Skip to content

Commit

Permalink
Handle serialization exceptions during publication
Browse files Browse the repository at this point in the history
Today if an exception is thrown when serializing a cluster state during
publication then the master enters a poisoned state where it cannot publish any
more cluster states, but nor does it stand down as master, yielding repeated
exceptions of the following form:

```
failed to commit cluster state version [12345]
org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException: publishing failed
        at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1045) ~[elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.MasterService.publish(MasterService.java:252) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:238) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) [elasticsearch-7.0.0.jar:7.0.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
Caused by: org.elasticsearch.cluster.coordination.CoordinationStateRejectedException: cannot start publishing next value before accepting previous one
        at org.elasticsearch.cluster.coordination.CoordinationState.handleClientValue(CoordinationState.java:280) ~[elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1030) ~[elasticsearch-7.0.0.jar:7.0.0]
        ... 11 more
```

This is because it already created the publication request using
`CoordinationState#handleClientValue()` but then it fails before accepting it.
This commit addresses this by performing the serialization before calling
`handleClientValue()`.

Relates elastic#41090, which was the source of such a serialization exception.
  • Loading branch information
DaveCTurner committed May 3, 2019
1 parent 80f8943 commit 6ee0fbb
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ public void onFailure(Exception e) {
});
}


private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
synchronized (mutex) {
Expand Down Expand Up @@ -997,9 +996,10 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void
assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())) :
getLocalNode() + " should be in published " + clusterState;

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final PublicationTransportHandler.PublicationContext publicationContext =
publicationHandler.newPublicationContext(clusterChangedEvent);

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
new ListenableFuture<>(), ackListener, publishListener);
currentPublication = Optional.of(publication);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -63,6 +65,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -93,6 +96,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -135,6 +139,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -1149,6 +1154,67 @@ public void testSingleNodeDiscoveryWithQuorum() {
cluster.stabilise();
}

private static class BrokenCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {

static final String EXCEPTION_MESSAGE = "simulated";

@Override
public String getWriteableName() {
return "broken";
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_EMPTY;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
throw new ElasticsearchException(EXCEPTION_MESSAGE);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}

public void testClusterRecoversAfterExceptionDuringSerialization() {
final Cluster cluster = new Cluster(randomIntBetween(2, 5)); // 1-node cluster doesn't do any serialization
cluster.runRandomly();
cluster.stabilise();

final ClusterNode leader1 = cluster.getAnyLeader();

logger.info("--> submitting broken task to [{}]", leader1);

final AtomicBoolean failed = new AtomicBoolean();
leader1.submitUpdateTask("broken-task",
cs -> ClusterState.builder(cs).putCustom("broken", new BrokenCustom()).build(),
(source, e) -> {
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE));
failed.set(true);
});
cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task");
assertTrue(failed.get());

cluster.stabilise();

final ClusterNode leader2 = cluster.getAnyLeader();
long finalValue = randomLong();

logger.info("--> submitting value [{}] to [{}]", finalValue, leader2);
leader2.submitValue(finalValue);
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);

for (final ClusterNode clusterNode : cluster.clusterNodes) {
final String nodeId = clusterNode.getId();
final ClusterState appliedState = clusterNode.getLastAppliedClusterState();
assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue));
}
}

private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
Expand Down

0 comments on commit 6ee0fbb

Please sign in to comment.