From 0f27c0b70292b59e18039a48033d5672ae51282e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 3 Dec 2019 14:20:32 -0500 Subject: [PATCH] Extend systemd timeout during startup (#49784) When we are notifying systemd that we are fully started up, it can be that we do not notify systemd before its default timeout of sixty seconds elapses (e.g., if we are upgrading on-disk metadata). In this case, we need to notify systemd to extend this timeout so that we are not abruptly terminated. We do this by repeatedly sending EXTEND_TIMEOUT_USEC to extend the timeout by thirty seconds; we do this every fifteen seconds. This will prevent systemd from abruptly terminating us during a long startup. We cancel the scheduled execution of this notification after we have successfully started up. --- .../elasticsearch/systemd/SystemdPlugin.java | 57 +++++++++++++++++- .../systemd/SystemdPluginTests.java | 58 +++++++++++++++---- 2 files changed, 100 insertions(+), 15 deletions(-) diff --git a/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java b/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java index 56a5cee808f23..c886a12afe7bf 100644 --- a/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java +++ b/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java @@ -22,8 +22,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Build; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; public class SystemdPlugin extends Plugin implements ClusterPlugin { @@ -62,8 +76,44 @@ public SystemdPlugin() { enabled = Boolean.TRUE.toString().equals(esSDNotify); } + Scheduler.Cancellable extender; + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry) { + if (enabled) { + /* + * Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the + * READY=1 status via sd_notify. Since our startup can take longer than that (e.g., if we are upgrading on-disk metadata) then + * we need to repeatedly notify systemd that we are still starting up by sending EXTEND_TIMEOUT_USEC with an extension to the + * timeout. Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds. + * We will cancel this scheduled task after we successfully notify systemd that we are ready. + */ + extender = threadPool.scheduleWithFixedDelay( + () -> { + final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000"); + if (rc < 0) { + logger.warn("extending startup timeout via sd_notify failed with [{}]", rc); + } + }, + TimeValue.timeValueSeconds(15), + ThreadPool.Names.SAME); + } + return Collections.emptyList(); + } + int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) { - return Libsystemd.sd_notify(unset_environment, state); + final int rc = Libsystemd.sd_notify(unset_environment, state); + logger.trace("sd_notify({}, {}) returned [{}]", unset_environment, state, rc); + return rc; } @Override @@ -72,11 +122,13 @@ public void onNodeStarted() { return; } final int rc = sd_notify(0, "READY=1"); - logger.trace("sd_notify returned [{}]", rc); if (rc < 0) { // treat failure to notify systemd of readiness as a startup failure throw new RuntimeException("sd_notify returned error [" + rc + "]"); } + assert extender != null; + final boolean cancelled = extender.cancel(); + assert cancelled; } @Override @@ -85,7 +137,6 @@ public void close() { return; } final int rc = sd_notify(0, "STOPPING=1"); - logger.trace("sd_notify returned [{}]", rc); if (rc < 0) { // do not treat failure to notify systemd of stopping as a failure logger.warn("sd_notify returned error [{}]", rc); diff --git a/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java b/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java index 28e1f0514e6a1..3c0abd218b72b 100644 --- a/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java +++ b/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java @@ -21,20 +21,28 @@ import org.elasticsearch.Build; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.hamcrest.OptionalMatchers; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SystemdPluginTests extends ESTestCase { @@ -42,24 +50,41 @@ public class SystemdPluginTests extends ESTestCase { private final Build.Type randomNonPackageBuildType = randomValueOtherThanMany(t -> t == Build.Type.DEB || t == Build.Type.RPM, () -> randomFrom(Build.Type.values())); + final Scheduler.Cancellable extender = mock(Scheduler.Cancellable.class); + final ThreadPool threadPool = mock(ThreadPool.class); + + { + when(extender.cancel()).thenReturn(true); + when(threadPool.scheduleWithFixedDelay(any(Runnable.class), eq(TimeValue.timeValueSeconds(15)), eq(ThreadPool.Names.SAME))) + .thenReturn(extender); + } + public void testIsEnabled() { final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString()); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertTrue(plugin.isEnabled()); + assertNotNull(plugin.extender); } public void testIsNotPackageDistribution() { final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString()); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertFalse(plugin.isEnabled()); + assertNull(plugin.extender); } public void testIsImplicitlyNotEnabled() { final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertFalse(plugin.isEnabled()); + assertNull(plugin.extender); } public void testIsExplicitlyNotEnabled() { final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString()); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertFalse(plugin.isEnabled()); + assertNull(plugin.extender); } public void testInvalid() { @@ -75,7 +100,10 @@ public void testOnNodeStartedSuccess() { runTestOnNodeStarted( Boolean.TRUE.toString(), randomIntBetween(0, Integer.MAX_VALUE), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> { + assertThat(maybe, OptionalMatchers.isEmpty()); + verify(plugin.extender).cancel(); + }); } public void testOnNodeStartedFailure() { @@ -83,7 +111,7 @@ public void testOnNodeStartedFailure() { runTestOnNodeStarted( Boolean.TRUE.toString(), rc, - maybe -> { + (maybe, plugin) -> { assertThat(maybe, OptionalMatchers.isPresent()); // noinspection OptionalGetWithoutIsPresent assertThat(maybe.get(), instanceOf(RuntimeException.class)); @@ -95,13 +123,13 @@ public void testOnNodeStartedNotEnabled() { runTestOnNodeStarted( Boolean.FALSE.toString(), randomInt(), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } private void runTestOnNodeStarted( final String esSDNotify, final int rc, - final Consumer> assertions) { + final BiConsumer, SystemdPlugin> assertions) { runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1"); } @@ -109,34 +137,34 @@ public void testCloseSuccess() { runTestClose( Boolean.TRUE.toString(), randomIntBetween(1, Integer.MAX_VALUE), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } public void testCloseFailure() { runTestClose( Boolean.TRUE.toString(), randomIntBetween(Integer.MIN_VALUE, -1), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } public void testCloseNotEnabled() { runTestClose( Boolean.FALSE.toString(), randomInt(), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } private void runTestClose( final String esSDNotify, final int rc, - final Consumer> assertions) { + final BiConsumer, SystemdPlugin> assertions) { runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1"); } private void runTest( final String esSDNotify, final int rc, - final Consumer> assertions, + final BiConsumer, SystemdPlugin> assertions, final CheckedConsumer invocation, final String expectedState) { final AtomicBoolean invoked = new AtomicBoolean(); @@ -153,16 +181,22 @@ int sd_notify(final int unset_environment, final String state) { } }; + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); + if (Boolean.TRUE.toString().equals(esSDNotify)) { + assertNotNull(plugin.extender); + } else { + assertNull(plugin.extender); + } boolean success = false; try { invocation.accept(plugin); success = true; } catch (final Exception e) { - assertions.accept(Optional.of(e)); + assertions.accept(Optional.of(e), plugin); } if (success) { - assertions.accept(Optional.empty()); + assertions.accept(Optional.empty(), plugin); } if (Boolean.TRUE.toString().equals(esSDNotify)) { assertTrue(invoked.get());