diff --git a/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java b/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java index 56a5cee808f23..cdc6ba2e31b06 100644 --- a/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java +++ b/modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java @@ -22,8 +22,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Build; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; public class SystemdPlugin extends Plugin implements ClusterPlugin { @@ -62,8 +76,44 @@ public SystemdPlugin() { enabled = Boolean.TRUE.toString().equals(esSDNotify); } + Scheduler.Cancellable extender; + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry) { + if (enabled) { + /* + * Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the + * READY=1 status via sd_notify. Since our startup can take longer than that (e.g., if we are upgrading on-disk metadata) then + * we need to repeatedly notify systemd that we are still starting up by sending EXTEND_TIMEOUT_USEC with an extension to the + * timeout. Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds. + * We will cancel this scheduled task after we successfully notify systemd that we are ready. + */ + extender = threadPool.scheduleWithFixedDelay( + () -> { + final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000"); + if (rc < 0) { + logger.warn("extending startup timeout via sd_notify failed with [{}]", rc); + } + }, + TimeValue.timeValueSeconds(15), + ThreadPool.Names.SAME); + } + return List.of(); + } + int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) { - return Libsystemd.sd_notify(unset_environment, state); + final int rc = Libsystemd.sd_notify(unset_environment, state); + logger.trace("sd_notify({}, {}) returned [{}]", unset_environment, state, rc); + return rc; } @Override @@ -72,11 +122,13 @@ public void onNodeStarted() { return; } final int rc = sd_notify(0, "READY=1"); - logger.trace("sd_notify returned [{}]", rc); if (rc < 0) { // treat failure to notify systemd of readiness as a startup failure throw new RuntimeException("sd_notify returned error [" + rc + "]"); } + assert extender != null; + final boolean cancelled = extender.cancel(); + assert cancelled; } @Override @@ -85,7 +137,6 @@ public void close() { return; } final int rc = sd_notify(0, "STOPPING=1"); - logger.trace("sd_notify returned [{}]", rc); if (rc < 0) { // do not treat failure to notify systemd of stopping as a failure logger.warn("sd_notify returned error [{}]", rc); diff --git a/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java b/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java index 85f1446e4b0ac..13bfe5328f75c 100644 --- a/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java +++ b/modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java @@ -21,20 +21,28 @@ import org.elasticsearch.Build; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.hamcrest.OptionalMatchers; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SystemdPluginTests extends ESTestCase { @@ -42,24 +50,41 @@ public class SystemdPluginTests extends ESTestCase { private Build.Type randomNonPackageBuildType = randomValueOtherThanMany(t -> t == Build.Type.DEB || t == Build.Type.RPM, () -> randomFrom(Build.Type.values())); + final Scheduler.Cancellable extender = mock(Scheduler.Cancellable.class); + final ThreadPool threadPool = mock(ThreadPool.class); + + { + when(extender.cancel()).thenReturn(true); + when(threadPool.scheduleWithFixedDelay(any(Runnable.class), eq(TimeValue.timeValueSeconds(15)), eq(ThreadPool.Names.SAME))) + .thenReturn(extender); + } + public void testIsEnabled() { final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString()); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertTrue(plugin.isEnabled()); + assertNotNull(plugin.extender); } public void testIsNotPackageDistribution() { final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString()); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertFalse(plugin.isEnabled()); + assertNull(plugin.extender); } public void testIsImplicitlyNotEnabled() { final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertFalse(plugin.isEnabled()); + assertNull(plugin.extender); } public void testIsExplicitlyNotEnabled() { final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString()); + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); assertFalse(plugin.isEnabled()); + assertNull(plugin.extender); } public void testInvalid() { @@ -75,7 +100,10 @@ public void testOnNodeStartedSuccess() { runTestOnNodeStarted( Boolean.TRUE.toString(), randomIntBetween(0, Integer.MAX_VALUE), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> { + assertThat(maybe, OptionalMatchers.isEmpty()); + verify(plugin.extender).cancel(); + }); } public void testOnNodeStartedFailure() { @@ -83,7 +111,7 @@ public void testOnNodeStartedFailure() { runTestOnNodeStarted( Boolean.TRUE.toString(), rc, - maybe -> { + (maybe, plugin) -> { assertThat(maybe, OptionalMatchers.isPresent()); // noinspection OptionalGetWithoutIsPresent assertThat(maybe.get(), instanceOf(RuntimeException.class)); @@ -95,13 +123,13 @@ public void testOnNodeStartedNotEnabled() { runTestOnNodeStarted( Boolean.FALSE.toString(), randomInt(), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } private void runTestOnNodeStarted( final String esSDNotify, final int rc, - final Consumer> assertions) { + final BiConsumer, SystemdPlugin> assertions) { runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1"); } @@ -109,34 +137,34 @@ public void testCloseSuccess() { runTestClose( Boolean.TRUE.toString(), randomIntBetween(1, Integer.MAX_VALUE), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } public void testCloseFailure() { runTestClose( Boolean.TRUE.toString(), randomIntBetween(Integer.MIN_VALUE, -1), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } public void testCloseNotEnabled() { runTestClose( Boolean.FALSE.toString(), randomInt(), - maybe -> assertThat(maybe, OptionalMatchers.isEmpty())); + (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())); } private void runTestClose( final String esSDNotify, final int rc, - final Consumer> assertions) { + final BiConsumer, SystemdPlugin> assertions) { runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1"); } private void runTest( final String esSDNotify, final int rc, - final Consumer> assertions, + final BiConsumer, SystemdPlugin> assertions, final CheckedConsumer invocation, final String expectedState) { final AtomicBoolean invoked = new AtomicBoolean(); @@ -153,16 +181,22 @@ int sd_notify(final int unset_environment, final String state) { } }; + plugin.createComponents(null, null, threadPool, null, null, null, null, null, null); + if (Boolean.TRUE.toString().equals(esSDNotify)) { + assertNotNull(plugin.extender); + } else { + assertNull(plugin.extender); + } boolean success = false; try { invocation.accept(plugin); success = true; } catch (final Exception e) { - assertions.accept(Optional.of(e)); + assertions.accept(Optional.of(e), plugin); } if (success) { - assertions.accept(Optional.empty()); + assertions.accept(Optional.empty(), plugin); } if (Boolean.TRUE.toString().equals(esSDNotify)) { assertTrue(invoked.get());