diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 154f4ab162d71..457cfcb15486e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -478,7 +478,6 @@ public void onFailure(Exception e) { }); } - private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { final Optional optionalJoin = joinRequest.getOptionalJoin(); synchronized (mutex) { @@ -997,9 +996,10 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener(), ackListener, publishListener); currentPublication = Optional.of(publication); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 6f078217e4f45..b4d337a1bf57e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; @@ -54,6 +55,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -63,6 +65,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver; import org.elasticsearch.env.NodeEnvironment; @@ -93,6 +96,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -135,6 +139,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -1149,6 +1154,67 @@ public void testSingleNodeDiscoveryWithQuorum() { cluster.stabilise(); } + private static class BrokenCustom extends AbstractDiffable implements ClusterState.Custom { + + static final String EXCEPTION_MESSAGE = "simulated"; + + @Override + public String getWriteableName() { + return "broken"; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new ElasticsearchException(EXCEPTION_MESSAGE); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + } + + public void testClusterRecoversAfterExceptionDuringSerialization() { + final Cluster cluster = new Cluster(randomIntBetween(2, 5)); // 1-node cluster doesn't do any serialization + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader1 = cluster.getAnyLeader(); + + logger.info("--> submitting broken task to [{}]", leader1); + + final AtomicBoolean failed = new AtomicBoolean(); + leader1.submitUpdateTask("broken-task", + cs -> ClusterState.builder(cs).putCustom("broken", new BrokenCustom()).build(), + (source, e) -> { + assertThat(e.getCause(), instanceOf(ElasticsearchException.class)); + assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE)); + failed.set(true); + }); + cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task"); + assertTrue(failed.get()); + + cluster.stabilise(); + + final ClusterNode leader2 = cluster.getAnyLeader(); + long finalValue = randomLong(); + + logger.info("--> submitting value [{}] to [{}]", finalValue, leader2); + leader2.submitValue(finalValue); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + final String nodeId = clusterNode.getId(); + final ClusterState appliedState = clusterNode.getLastAppliedClusterState(); + assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue)); + } + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; }