Skip to content

Commit

Permalink
Extend systemd timeout during startup (elastic#49784)
Browse files Browse the repository at this point in the history
When we are notifying systemd that we are fully started up, it can be
that we do not notify systemd before its default timeout of sixty
seconds elapses (e.g., if we are upgrading on-disk metadata). In this
case, we need to notify systemd to extend this timeout so that we are
not abruptly terminated. We do this by repeatedly sending
EXTEND_TIMEOUT_USEC to extend the timeout by thirty seconds; we do this
every fifteen seconds. This will prevent systemd from abruptly
terminating us during a long startup. We cancel the scheduled execution
of this notification after we have successfully started up.
  • Loading branch information
jasontedor authored and SivagurunathanV committed Jan 21, 2020
1 parent 3afe182 commit a0bc152
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,22 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Build;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.List;

public class SystemdPlugin extends Plugin implements ClusterPlugin {

Expand Down Expand Up @@ -62,8 +76,44 @@ public SystemdPlugin() {
enabled = Boolean.TRUE.toString().equals(esSDNotify);
}

Scheduler.Cancellable extender;

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
if (enabled) {
/*
* Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the
* READY=1 status via sd_notify. Since our startup can take longer than that (e.g., if we are upgrading on-disk metadata) then
* we need to repeatedly notify systemd that we are still starting up by sending EXTEND_TIMEOUT_USEC with an extension to the
* timeout. Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds.
* We will cancel this scheduled task after we successfully notify systemd that we are ready.
*/
extender = threadPool.scheduleWithFixedDelay(
() -> {
final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000");
if (rc < 0) {
logger.warn("extending startup timeout via sd_notify failed with [{}]", rc);
}
},
TimeValue.timeValueSeconds(15),
ThreadPool.Names.SAME);
}
return List.of();
}

int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) {
return Libsystemd.sd_notify(unset_environment, state);
final int rc = Libsystemd.sd_notify(unset_environment, state);
logger.trace("sd_notify({}, {}) returned [{}]", unset_environment, state, rc);
return rc;
}

@Override
Expand All @@ -72,11 +122,13 @@ public void onNodeStarted() {
return;
}
final int rc = sd_notify(0, "READY=1");
logger.trace("sd_notify returned [{}]", rc);
if (rc < 0) {
// treat failure to notify systemd of readiness as a startup failure
throw new RuntimeException("sd_notify returned error [" + rc + "]");
}
assert extender != null;
final boolean cancelled = extender.cancel();
assert cancelled;
}

@Override
Expand All @@ -85,7 +137,6 @@ public void close() {
return;
}
final int rc = sd_notify(0, "STOPPING=1");
logger.trace("sd_notify returned [{}]", rc);
if (rc < 0) {
// do not treat failure to notify systemd of stopping as a failure
logger.warn("sd_notify returned error [{}]", rc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,70 @@

import org.elasticsearch.Build;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.hamcrest.OptionalMatchers;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SystemdPluginTests extends ESTestCase {

private Build.Type randomPackageBuildType = randomFrom(Build.Type.DEB, Build.Type.RPM);
private Build.Type randomNonPackageBuildType =
randomValueOtherThanMany(t -> t == Build.Type.DEB || t == Build.Type.RPM, () -> randomFrom(Build.Type.values()));

final Scheduler.Cancellable extender = mock(Scheduler.Cancellable.class);
final ThreadPool threadPool = mock(ThreadPool.class);

{
when(extender.cancel()).thenReturn(true);
when(threadPool.scheduleWithFixedDelay(any(Runnable.class), eq(TimeValue.timeValueSeconds(15)), eq(ThreadPool.Names.SAME)))
.thenReturn(extender);
}

public void testIsEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertTrue(plugin.isEnabled());
assertNotNull(plugin.extender);
}

public void testIsNotPackageDistribution() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertFalse(plugin.isEnabled());
assertNull(plugin.extender);
}

public void testIsImplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null);
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertFalse(plugin.isEnabled());
assertNull(plugin.extender);
}

public void testIsExplicitlyNotEnabled() {
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString());
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
assertFalse(plugin.isEnabled());
assertNull(plugin.extender);
}

public void testInvalid() {
Expand All @@ -75,15 +100,18 @@ public void testOnNodeStartedSuccess() {
runTestOnNodeStarted(
Boolean.TRUE.toString(),
randomIntBetween(0, Integer.MAX_VALUE),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isEmpty());
verify(plugin.extender).cancel();
});
}

public void testOnNodeStartedFailure() {
final int rc = randomIntBetween(Integer.MIN_VALUE, -1);
runTestOnNodeStarted(
Boolean.TRUE.toString(),
rc,
maybe -> {
(maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isPresent());
// noinspection OptionalGetWithoutIsPresent
assertThat(maybe.get(), instanceOf(RuntimeException.class));
Expand All @@ -95,48 +123,48 @@ public void testOnNodeStartedNotEnabled() {
runTestOnNodeStarted(
Boolean.FALSE.toString(),
randomInt(),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}

private void runTestOnNodeStarted(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions) {
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1");
}

public void testCloseSuccess() {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(1, Integer.MAX_VALUE),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}

public void testCloseFailure() {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(Integer.MIN_VALUE, -1),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}

public void testCloseNotEnabled() {
runTestClose(
Boolean.FALSE.toString(),
randomInt(),
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}

private void runTestClose(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions) {
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1");
}

private void runTest(
final String esSDNotify,
final int rc,
final Consumer<Optional<Exception>> assertions,
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions,
final CheckedConsumer<SystemdPlugin, IOException> invocation,
final String expectedState) {
final AtomicBoolean invoked = new AtomicBoolean();
Expand All @@ -153,16 +181,22 @@ int sd_notify(final int unset_environment, final String state) {
}

};
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertNotNull(plugin.extender);
} else {
assertNull(plugin.extender);
}

boolean success = false;
try {
invocation.accept(plugin);
success = true;
} catch (final Exception e) {
assertions.accept(Optional.of(e));
assertions.accept(Optional.of(e), plugin);
}
if (success) {
assertions.accept(Optional.empty());
assertions.accept(Optional.empty(), plugin);
}
if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertTrue(invoked.get());
Expand Down

0 comments on commit a0bc152

Please sign in to comment.