Skip to content

Commit

Permalink
Interpret ?timeout=-1 as infinite ack timeout (elastic#107675)
Browse files Browse the repository at this point in the history
APIs which perform cluster state updates typically accept the
`?master_timeout=` and `?timeout=` parameters to respectively set the
pending task queue timeout and the acking timeout for the cluster state
update. Both of these parameters accept the value `-1`, but
`?master_timeout=-1` means to wait indefinitely whereas `?timeout=-1`
means the same thing as `?timeout=0`, namely that acking times out
immediately on commit.

There are some situations where it makes sense to wait for as long as
possible for nodes to ack a cluster state update. In practice this wait
is bounded by other mechanisms (e.g. the lag detector will remove the
node from the cluster after a couple of minutes of failing to apply
cluster state updates) but these are not really the concern of clients.

Therefore with this commit we change the meaning of `?timeout=-1` to
mean that the acking timeout is infinite.
  • Loading branch information
DaveCTurner authored Apr 30, 2024
1 parent b0f58ab commit fc287bd
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 8 deletions.
17 changes: 17 additions & 0 deletions docs/changelog/107675.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pr: 107675
summary: Interpret `?timeout=-1` as infinite ack timeout
area: Cluster Coordination
type: breaking
issues: []
breaking:
title: Interpret `?timeout=-1` as infinite ack timeout
area: REST API
details: |
Today {es} accepts the parameter `?timeout=-1` in many APIs, but interprets
this to mean the same as `?timeout=0`. From 8.15 onwards `?timeout=-1` will
mean to wait indefinitely, aligning the behaviour of this parameter with
other similar parameters such as `?master_timeout`.
impact: |
Use `?timeout=0` to force relevant operations to time out immediately
instead of `?timeout=-1`
notable: false
9 changes: 5 additions & 4 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1223,12 +1223,13 @@ the timeout expires, the request fails and returns an error. Defaults to `30s`.
Can also be set to `-1` to indicate that the request should never timeout.
end::master-timeout[]

tag::timeout[]
`timeout`::
(Optional, <<time-units, time units>>)
Period to wait for a response. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
end::timeout[]
Period to wait for a response from all relevant nodes in the cluster after
updating the cluster metadata. If no response is received before the timeout
expires, the cluster metadata update still applies but the response will
indicate that it was not completely acknowledged. Defaults to `30s`.
Can also be set to `-1` to indicate that the request should never timeout.
end::timeoutparms[]

tag::transform-id[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,9 @@ public void removeGlobalRetention(
List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
) {
final var ackTimeout = request.masterNodeTimeout().millis() < 0 ? TimeValue.MAX_VALUE : request.masterNodeTimeout();
// NB a negative master node timeout means never to time out, but a negative ack timeout means to time out immediately.
// TODO when https://github.com/elastic/elasticsearch/issues/107044 is fixed, we can just use request.masterNodeTimeout() directly
taskQueue.submitTask(
"remove-data-stream-global-retention",
new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, ackTimeout),
new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()),
request.masterNodeTimeout()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,28 @@

package org.elasticsearch.action.admin.indices.create;

import io.netty.handler.codec.http.HttpMethod;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
Expand All @@ -31,17 +39,24 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentFactory;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.elasticsearch.test.rest.ESRestTestCase.entityAsMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -53,6 +68,11 @@
@ClusterScope(scope = Scope.TEST)
public class CreateIndexIT extends ESIntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // expose HTTP requests
}

public void testCreationDateGivenFails() {
try {
prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_CREATION_DATE, 4L)).get();
Expand Down Expand Up @@ -370,4 +390,38 @@ public void testIndexNameInResponse() {
assertEquals("Should have index name in response", "foo", response.index());
}

public void testInfiniteAckTimeout() throws IOException {
final var clusterService = internalCluster().getInstance(ClusterService.class);
final var barrier = new CyclicBarrier(2);
clusterService.getClusterApplierService().runOnApplierThread("block for test", Priority.NORMAL, cs -> {
safeAwait(barrier);
safeAwait(barrier);
}, ActionListener.noop());

safeAwait(barrier);

final var request = ESRestTestCase.newXContentRequest(
HttpMethod.PUT,
"testindex",
(builder, params) -> builder.startObject("settings")
.field(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.field(SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1)
.endObject()
);
request.addParameter("timeout", "-1");
final var responseFuture = new PlainActionFuture<Response>();
getRestClient().performRequestAsync(request, ActionTestUtils.wrapAsRestResponseListener(responseFuture));

if (randomBoolean()) {
safeSleep(scaledRandomIntBetween(1, 100));
}

assertFalse(responseFuture.isDone());
safeAwait(barrier);

final var response = FutureUtils.get(responseFuture, 10, TimeUnit.SECONDS);
assertEquals(200, response.getStatusLine().getStatusCode());
assertTrue((boolean) extractValue("acknowledged", entityAsMap(response)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,14 @@ public void onCommit(TimeValue commitTime) {
assert false : "ackTimeout must always be present: " + contextPreservingAckListener;
ackTimeout = TimeValue.ZERO;
}

if (ackTimeout.millis() < 0) {
if (countDown.countDown()) {
finish();
}
return;
}

final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
if (timeLeft.nanos() == 0L) {
onTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,57 @@ public void onAckTimeout() {
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that -1 means an infinite ack timeout
{
final CountDownLatch latch = new CountDownLatch(2);

publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
publishListener.onResponse(null);
ackListener.onCommit(TimeValue.timeValueMillis(randomLongBetween(0, TimeValue.timeValueDays(1).millis())));
for (final var node : new DiscoveryNode[] { node1, node2, node3 }) {
deterministicTaskQueue.scheduleAt(
deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0, TimeValue.timeValueDays(1).millis()),
() -> ackListener.onNodeAck(node, null)
);
}
});

masterService.submitUnbatchedStateUpdateTask(
"test2",
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.MINUS_ONE, null), null) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
assertTrue(acknowledged);
latch.countDown();
return AcknowledgedResponse.TRUE;
}

@Override
public void onFailure(Exception e) {
fail();
}

@Override
public void onAckTimeout() {
fail();
}
}
);

deterministicTaskQueue.runAllTasks(); // NB not in time order, there's no timeout to avoid
safeAwait(latch);
}
}
}

Expand Down

0 comments on commit fc287bd

Please sign in to comment.