Skip to content

Commit

Permalink
Move monitoring collection timeouts to coordinator (#67084)
Browse files Browse the repository at this point in the history
With #66993 there is now support for coordinator-side timeouts on a
`BroadcastRequest`, which includes requests for node stats and
recoveries. This commit adjusts Monitoring to use these coordinator-side
timeouts where applicable, which will prevent partial stats responses
from accumulating on the master while one or more nodes are not
responding quickly enough. It also enhances the message logged on a
timeout to include the IDs of the nodes which did not respond in time.

Closes #60188.
  • Loading branch information
DaveCTurner authored Jan 11, 2021
1 parent 1cbccb1 commit 1d2462e
Show file tree
Hide file tree
Showing 15 changed files with 359 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

package org.elasticsearch.action.support.broadcast;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;

public abstract class BroadcastOperationRequestBuilder<
Request extends BroadcastRequest<Request>,
Expand All @@ -45,4 +46,10 @@ public final RequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return (RequestBuilder) this;
}

@SuppressWarnings("unchecked")
public RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final RequestBuilder setNodesIds(String... nodesIds) {
}

@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(TimeValue timeout) {
public RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(allowNoMatch, other.allowNoMatch);
return Objects.equals(jobId, other.jobId)
&& Objects.equals(allowNoMatch, other.allowNoMatch)
&& Objects.equals(getTimeout(), other.getTimeout());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Collection<MonitoringDoc> collect(final long timestamp, final long interv
return doCollect(convertNode(timestamp, clusterService.localNode()), interval, clusterState);
}
} catch (ElasticsearchTimeoutException e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("collector [{}] timed out when collecting data", name()));
logger.error("collector [{}] timed out when collecting data: {}", name(), e.getMessage());
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("collector [{}] failed to collect data", name()), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.monitoring.collector;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.HashSet;
import java.util.concurrent.TimeoutException;

/**
* Utilities for identifying timeouts in responses to collection requests, since we prefer to fail the whole collection attempt if any of
* the involved nodes times out.
*/
public final class TimeoutUtils {
private TimeoutUtils() {
}

/**
* @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
* nodes that timed out and mentions {@code collectionTimeout}.
*/
public static <T extends BaseNodeResponse> void ensureNoTimeouts(TimeValue collectionTimeout, BaseNodesResponse<T> response) {
HashSet<String> timedOutNodeIds = null;
for (FailedNodeException failedNodeException : response.failures()) {
if (isTimeoutFailure(failedNodeException)) {
if (timedOutNodeIds == null) {
timedOutNodeIds = new HashSet<>();
}
timedOutNodeIds.add(failedNodeException.nodeId());
}
}
ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
}

/**
* @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
* nodes that timed out and mentions {@code collectionTimeout}.
*/
public static void ensureNoTimeouts(TimeValue collectionTimeout, BaseTasksResponse response) {
HashSet<String> timedOutNodeIds = null;
for (ElasticsearchException nodeFailure : response.getNodeFailures()) {
if (nodeFailure instanceof FailedNodeException) {
FailedNodeException failedNodeException = (FailedNodeException) nodeFailure;
if (isTimeoutFailure(failedNodeException)) {
if (timedOutNodeIds == null) {
timedOutNodeIds = new HashSet<>();
}
timedOutNodeIds.add(failedNodeException.nodeId());
}
}
}
ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
}

/**
* @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
* nodes that timed out and mentions {@code collectionTimeout}.
*/
public static void ensureNoTimeouts(TimeValue collectionTimeout, BroadcastResponse response) {
HashSet<String> timedOutNodeIds = null;
for (DefaultShardOperationFailedException shardFailure : response.getShardFailures()) {
final Throwable shardFailureCause = shardFailure.getCause();
if (shardFailureCause instanceof FailedNodeException) {
FailedNodeException failedNodeException = (FailedNodeException) shardFailureCause;
if (isTimeoutFailure(failedNodeException)) {
if (timedOutNodeIds == null) {
timedOutNodeIds = new HashSet<>();
}
timedOutNodeIds.add(failedNodeException.nodeId());
}
}
}
ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
}

private static boolean isTimeoutFailure(FailedNodeException failedNodeException) {
final Throwable cause = failedNodeException.getCause();
return cause instanceof ElasticsearchTimeoutException
|| cause instanceof TimeoutException
|| cause instanceof ReceiveTimeoutTransportException;
}

private static void ensureNoTimeouts(TimeValue collectionTimeout, HashSet<String> timedOutNodeIds) {
if (timedOutNodeIds != null) {
throw new ElasticsearchTimeoutException((timedOutNodeIds.size() == 1 ? "node " : "nodes ") + timedOutNodeIds +
" did not respond within [" + collectionTimeout + "]");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static org.elasticsearch.xpack.core.XPackSettings.SECURITY_ENABLED;
import static org.elasticsearch.xpack.core.XPackSettings.TRANSPORT_SSL_ENABLED;
import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for cluster stats.
Expand Down Expand Up @@ -82,13 +83,12 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final Supplier<ClusterStatsResponse> clusterStatsSupplier =
() -> client.admin().cluster().prepareClusterStats().get(getCollectionTimeout());
final ClusterState clusterState) {
final Supplier<List<XPackFeatureSet.Usage>> usageSupplier =
() -> new XPackUsageRequestBuilder(client).get().getUsages();

final ClusterStatsResponse clusterStats = clusterStatsSupplier.get();
final ClusterStatsResponse clusterStats = client.admin().cluster().prepareClusterStats().setTimeout(getCollectionTimeout()).get();
ensureNoTimeouts(getCollectionTimeout(), clusterStats);

final String clusterName = clusterService.getClusterName().value();
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for the Recovery API.
Expand Down Expand Up @@ -64,13 +65,16 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final ClusterState clusterState) {
List<MonitoringDoc> results = new ArrayList<>(1);
RecoveryResponse recoveryResponse = client.admin().indices().prepareRecoveries()
.setIndices(getCollectionIndices())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setActiveOnly(getActiveRecoveriesOnly())
.get(getCollectionTimeout());
.setTimeout(getCollectionTimeout())
.get();

ensureNoTimeouts(getCollectionTimeout(), recoveryResponse);

if (recoveryResponse.hasRecoveries()) {
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for indices and singular index statistics.
* <p>
Expand Down Expand Up @@ -54,7 +56,7 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final ClusterState clusterState) {
final List<MonitoringDoc> results = new ArrayList<>();
final IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats()
.setIndices(getCollectionIndices())
Expand All @@ -71,7 +73,10 @@ protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
.setQueryCache(true)
.setRequestCache(true)
.setBulk(true)
.get(getCollectionTimeout());
.setTimeout(getCollectionTimeout())
.get();

ensureNoTimeouts(getCollectionTimeout(), indicesStatsResponse);

final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for Machine Learning Job Stats.
Expand Down Expand Up @@ -71,9 +72,10 @@ protected List<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final ClusterState clusterState) throws Exception {
// fetch details about all jobs
try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(MONITORING_ORIGIN)) {
final GetJobsStatsAction.Response jobs =
client.execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(Metadata.ALL))
.actionGet(getCollectionTimeout());
final GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(Metadata.ALL).setTimeout(getCollectionTimeout());
final GetJobsStatsAction.Response jobs = client.execute(GetJobsStatsAction.INSTANCE, request).actionGet();

ensureNoTimeouts(getCollectionTimeout(), jobs);

final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.Objects;

import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for nodes statistics.
* <p>
Expand Down Expand Up @@ -65,7 +67,7 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final ClusterState clusterState) {
NodesStatsRequest request = new NodesStatsRequest("_local");
request.indices(FLAGS);
request.addMetrics(
Expand All @@ -74,8 +76,10 @@ protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.THREAD_POOL.metricName(),
NodesStatsRequest.Metric.FS.metricName());
request.timeout(getCollectionTimeout());

final NodesStatsResponse response = client.admin().cluster().nodesStats(request).actionGet(getCollectionTimeout());
final NodesStatsResponse response = client.admin().cluster().nodesStats(request).actionGet();
ensureNoTimeouts(getCollectionTimeout(), response);

// if there's a failure, then we failed to work with the
// _local node (guaranteed a single exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.monitoring.collector.cluster;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequestBuilder;
Expand Down Expand Up @@ -37,6 +39,7 @@
import org.junit.Assert;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.UUID;

Expand Down Expand Up @@ -189,7 +192,8 @@ public void testDoCollect() throws Exception {
when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);

final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
when(clusterStatsRequestBuilder.get(eq(timeout))).thenReturn(mockClusterStatsResponse);
when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);

final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);
Expand Down Expand Up @@ -280,7 +284,7 @@ public void testDoCollectNoLicense() throws Exception {
{
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(indexNameExpressionResolver.concreteIndices(clusterState, IndicesOptions.lenientExpandOpen(), "apm-*"))
.thenReturn(new Index[0]);
.thenReturn(Index.EMPTY_ARRAY);
}

final Client client = mock(Client.class);
Expand All @@ -296,7 +300,8 @@ public void testDoCollectNoLicense() throws Exception {
when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);

final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
when(clusterStatsRequestBuilder.get(eq(timeout))).thenReturn(mockClusterStatsResponse);
when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);

final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);
Expand Down Expand Up @@ -325,4 +330,58 @@ public void testDoCollectNoLicense() throws Exception {
final ClusterStatsMonitoringDoc doc = (ClusterStatsMonitoringDoc) results.iterator().next();
assertThat(doc.getLicense(), nullValue());
}

public void testDoCollectThrowsTimeoutException() throws Exception {
final TimeValue timeout;
{
final String clusterName = randomAlphaOfLength(10);
whenClusterStateWithName(clusterName);
final String clusterUUID = UUID.randomUUID().toString();
whenClusterStateWithUUID(clusterUUID);
timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
withCollectionTimeout(ClusterStatsCollector.CLUSTER_STATS_TIMEOUT, timeout);
}
final IndexNameExpressionResolver indexNameExpressionResolver;
{
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(indexNameExpressionResolver.concreteIndices(clusterState, IndicesOptions.lenientExpandOpen(), "apm-*"))
.thenReturn(Index.EMPTY_ARRAY);
}

final Client client = mock(Client.class);
{
final ClusterStatsResponse mockClusterStatsResponse = mock(ClusterStatsResponse.class);
final ClusterHealthStatus clusterStatus = randomFrom(ClusterHealthStatus.values());
when(mockClusterStatsResponse.getStatus()).thenReturn(clusterStatus);
when(mockClusterStatsResponse.getNodesStats()).thenReturn(mock(ClusterStatsNodes.class));
when(mockClusterStatsResponse.failures()).thenReturn(List.of(new FailedNodeException("node", "msg",
new ElasticsearchTimeoutException("timed out"))));

final ClusterStatsIndices mockClusterStatsIndices = mock(ClusterStatsIndices.class);

when(mockClusterStatsIndices.getIndexCount()).thenReturn(0);
when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);

final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);

final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);

final AdminClient adminClient = mock(AdminClient.class);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
when(client.admin()).thenReturn(adminClient);
}

final long interval = randomNonNegativeLong();
final Settings.Builder settings = Settings.builder();
final MonitoringDoc.Node node = MonitoringTestUtils.randomMonitoringNode(random());

final ClusterStatsCollector collector =
new ClusterStatsCollector(settings.build(), clusterService, licenseState,
client, licenseService, indexNameExpressionResolver);
expectThrows(ElasticsearchTimeoutException.class, () -> collector.doCollect(node, interval, clusterState));
}

}
Loading

0 comments on commit 1d2462e

Please sign in to comment.