From e5f4588318a56b8df59748e021fefdd6ccc36c5b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 7 Oct 2021 14:25:53 +0200 Subject: [PATCH] Add cluster applier stats (#77552) Keep track of the time spent in each registered cluster state applier / listener and other cluster state applier activities. Also adjusted the discovery stats to include these time spent stats broken down by individual registered appliers / listeners and other cluster state applier activities. Also replaced the usage of `StopWatch` in `ClusterStateApplierService` with `ClusterApplierRecordingService.Recorder`. Relates to #77466 --- .../test/nodes.stats/30_discovery.yml | 22 +++ .../cluster/coordination/Coordinator.java | 4 +- .../cluster/service/ClusterApplier.java | 3 + .../ClusterApplierRecordingService.java | 185 ++++++++++++++++++ .../service/ClusterApplierService.java | 42 ++-- .../org/elasticsearch/common/util/Maps.java | 16 ++ .../discovery/DiscoveryStats.java | 22 ++- .../cluster/node/stats/NodeStatsTests.java | 15 +- .../coordination/NoOpClusterApplier.java | 7 + ...sterApplierRecordingServiceStatsTests.java | 35 ++++ .../ClusterApplierRecordingServiceTests.java | 105 ++++++++++ 11 files changed, 435 insertions(+), 21 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierRecordingService.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceStatsTests.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceTests.java diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/30_discovery.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/30_discovery.yml index 7ea19acbabb48..1ddfc28c77f29 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/30_discovery.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/30_discovery.yml @@ -136,3 +136,25 @@ - is_true: nodes.$master.discovery.cluster_state_update.failure.completion_time - is_true: nodes.$master.discovery.cluster_state_update.failure.master_apply_time - is_true: nodes.$master.discovery.cluster_state_update.failure.notification_time + +--- +"Master cluster applier stats": + - skip: + features: [arbitrary_key] + version: "- 7.99.99" + reason: "not yet backported to 7.x branch" + + - do: + nodes.info: + node_id: _master + - set: + nodes._arbitrary_key_: master + + - do: + nodes.stats: + metric: [ discovery ] + + - is_true: nodes.$master.discovery.cluster_applier_stats.recordings + - is_true: nodes.$master.discovery.cluster_applier_stats.recordings.0.name + - gte: { nodes.$master.discovery.cluster_applier_stats.recordings.0.cumulative_execution_count: 1 } + - gte: { nodes.$master.discovery.cluster_applier_stats.recordings.0.cumulative_execution_time_millis: 1 } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 2a1501ea8eb7a..a58049da27e8e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -794,8 +794,8 @@ public DiscoveryStats stats() { return new DiscoveryStats( new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), - getLocalNode().isMasterNode() ? masterService.getClusterStateUpdateStats() : null - ); + getLocalNode().isMasterNode() ? masterService.getClusterStateUpdateStats() : null, + clusterApplier.getStats()); } public void startInitialJoin() { diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java index 743d0b3f7153e..e6abc8bca01ed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java @@ -30,4 +30,7 @@ public interface ClusterApplier { * themselves, typically using a more specific logger and at a less dramatic log level. */ void onNewClusterState(String source, Supplier clusterStateSupplier, ActionListener listener); + + ClusterApplierRecordingService.Stats getStats(); + } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierRecordingService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierRecordingService.java new file mode 100644 index 0000000000000..499f32992e3c0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierRecordingService.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.cluster.service; + +import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats.Recording; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.LongSupplier; + +public final class ClusterApplierRecordingService { + + private final Map recordedActions = new HashMap<>(); + + synchronized Stats getStats() { + return new Stats( + recordedActions.entrySet().stream() + .sorted(Comparator.>comparingLong(o -> o.getValue().sum()).reversed()) + .collect(Maps.toUnmodifiableOrderedMap(Map.Entry::getKey, v -> new Recording(v.getValue().count(), v.getValue().sum()))) + ); + } + + synchronized void updateStats(Recorder recorder) { + Set seenActions = new HashSet<>(); + for (Tuple entry : recorder.recordings) { + String action = entry.v1(); + long timeSpentMS = entry.v2(); + + MeanMetric metric = recordedActions.computeIfAbsent(action, key -> new MeanMetric()); + metric.inc(timeSpentMS); + seenActions.add(action); + } + recordedActions.entrySet().removeIf(entry -> seenActions.contains(entry.getKey()) == false); + } + + static final class Recorder { + + private String currentAction; + private long startTimeMS; + private boolean recording; + private final List> recordings = new LinkedList<>(); + private final LongSupplier currentTimeSupplier; + + Recorder(LongSupplier currentTimeSupplier) { + this.currentTimeSupplier = currentTimeSupplier; + } + + Releasable record(String action) { + if (recording) { + throw new IllegalStateException("already recording"); + } + + this.recording = true; + this.currentAction = action; + this.startTimeMS = currentTimeSupplier.getAsLong(); + return this::stop; + } + + void stop() { + recording = false; + long timeSpentMS = currentTimeSupplier.getAsLong() - this.startTimeMS; + recordings.add(new Tuple<>(currentAction, timeSpentMS)); + } + + List> getRecordings() { + return recordings; + } + } + + public static class Stats implements Writeable, ToXContentFragment { + + private final Map recordings; + + public Stats(Map recordings) { + this.recordings = recordings; + } + + public Map getRecordings() { + return recordings; + } + + public Stats(StreamInput in) throws IOException { + this(in.readOrderedMap(StreamInput::readString, Recording::new)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("cluster_applier_stats"); + builder.startArray("recordings"); + for (Map.Entry entry : recordings.entrySet()) { + builder.startObject(); + builder.field("name", entry.getKey()); + String name = "cumulative_execution"; + builder.field(name + "_count", entry.getValue().count); + builder.humanReadableField(name + "_time_millis", name + "_time", TimeValue.timeValueMillis(entry.getValue().sum)); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(recordings, StreamOutput::writeString, (out1, value) -> value.writeTo(out1)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Stats stats = (Stats) o; + return Objects.equals(recordings, stats.recordings); + } + + @Override + public int hashCode() { + return Objects.hash(recordings); + } + + public static class Recording implements Writeable { + + private final long count; + private final long sum; + + public Recording(long count, long sum) { + this.count = count; + this.sum = sum; + } + + public Recording(StreamInput in) throws IOException { + this(in.readVLong(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(count); + out.writeVLong(sum); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Recording recording = (Recording) o; + return count == recording.count && sum == recording.sum; + } + + @Override + public int hashCode() { + return Objects.hash(count, sum); + } + + @Override + public String toString() { + return "Recording{" + + "count=" + count + + ", sum=" + sum + + '}'; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 57700eb7a4f08..f1330d49a6bad 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -21,8 +21,8 @@ import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -38,7 +38,6 @@ import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -84,6 +83,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final String nodeName; + private final ClusterApplierRecordingService recordingService; + private NodeConnectionsService nodeConnectionsService; public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { @@ -91,6 +92,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings this.threadPool = threadPool; this.state = new AtomicReference<>(); this.nodeName = nodeName; + this.recordingService = new ClusterApplierRecordingService(); this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, @@ -389,10 +391,10 @@ private void runTask(String source, Function updateF final ClusterState previousClusterState = state.get(); final long startTimeMillis = threadPool.relativeTimeInMillis(); - final StopWatch stopWatch = new StopWatch(); + final Recorder stopWatch = new Recorder(threadPool::rawRelativeTimeInMillis); final ClusterState newClusterState; try { - try (Releasable ignored = stopWatch.timing("running task [" + source + ']')) { + try (Releasable ignored = stopWatch.record("running task [" + source + ']')) { newClusterState = updateFunction.apply(previousClusterState); } } catch (Exception e) { @@ -448,7 +450,7 @@ private TimeValue getTimeSince(long startTimeMillis) { return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis)); } - private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, StopWatch stopWatch) { + private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, Recorder stopWatch) { ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); @@ -465,7 +467,7 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu } logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version()); - try (Releasable ignored = stopWatch.timing("connecting to new nodes")) { + try (Releasable ignored = stopWatch.record("connecting to new nodes")) { connectToNodesAndWait(newClusterState); } @@ -473,7 +475,7 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metadataChanged()) { logger.debug("applying settings from cluster state with version {}", newClusterState.version()); final Settings incomingSettings = clusterChangedEvent.state().metadata().settings(); - try (Releasable ignored = stopWatch.timing("applying settings")) { + try (Releasable ignored = stopWatch.record("applying settings")) { clusterSettings.applySettings(incomingSettings); } } @@ -505,33 +507,35 @@ protected final void connectToNodesAsync(ClusterState newClusterState, Runnable nodeConnectionsService.connectToNodes(newClusterState.nodes(), onCompletion); } - private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) { + private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) { callClusterStateAppliers(clusterChangedEvent, stopWatch, highPriorityStateAppliers); callClusterStateAppliers(clusterChangedEvent, stopWatch, normalPriorityStateAppliers); callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers); } - private static void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch, + private static void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch, Collection clusterStateAppliers) { for (ClusterStateApplier applier : clusterStateAppliers) { logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); - try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) { + final String name = applier.toString(); + try (Releasable ignored = stopWatch.record(name)) { applier.applyClusterState(clusterChangedEvent); } } } - private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) { + private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) { callClusterStateListener(clusterChangedEvent, stopWatch, clusterStateListeners); callClusterStateListener(clusterChangedEvent, stopWatch, timeoutClusterStateListeners.keySet()); } - private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch, + private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch, Collection listeners) { for (ClusterStateListener listener : listeners) { try { logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version()); - try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) { + final String name = listener.toString(); + try (Releasable ignored = stopWatch.record(name)) { listener.clusterChanged(clusterChangedEvent); } } catch (Exception ex) { @@ -579,12 +583,13 @@ public void onResponse(Void unused) { } } - private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, StopWatch stopWatch) { + private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, Recorder recorder) { if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) { logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of [{}]: {}", source, executionTime, - slowTaskLoggingThreshold, Arrays.stream(stopWatch.taskInfo()) - .map(ti -> '[' + ti.getTaskName() + "] took [" + ti.getTime().millis() + "ms]").collect(Collectors.joining(", "))); + slowTaskLoggingThreshold, recorder.getRecordings().stream() + .map(ti -> '[' + ti.v1() + "] took [" + ti.v2() + "ms]").collect(Collectors.joining(", "))); } + recordingService.updateStats(recorder); } private class NotifyTimeout implements Runnable { @@ -623,4 +628,9 @@ public void run() { protected boolean applicationMayFail() { return false; } + + @Override + public ClusterApplierRecordingService.Stats getStats() { + return recordingService.getStats(); + } } diff --git a/server/src/main/java/org/elasticsearch/common/util/Maps.java b/server/src/main/java/org/elasticsearch/common/util/Maps.java index 96b4334a9fc68..2757c224afa17 100644 --- a/server/src/main/java/org/elasticsearch/common/util/Maps.java +++ b/server/src/main/java/org/elasticsearch/common/util/Maps.java @@ -11,6 +11,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -18,6 +19,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -208,4 +210,18 @@ private static Map flatten(List list, boolean ordered, S }, () -> new TreeMap()), Collections::unmodifiableNavigableMap); } + /** + * Returns a {@link Collector} that accumulates the input elements into a linked hash map and finishes the resulting set into an + * unmodifiable map. The resulting read-only view through the unmodifiable map is a linked hash map. + * + * @param the type of the input elements + * @return an unmodifiable {@link Map} where the underlying map has a consistent order + */ + public static Collector> toUnmodifiableOrderedMap(Function keyMapper, + Function valueMapper) { + return Collectors.collectingAndThen(Collectors.toMap(keyMapper, valueMapper, (v1, v2) -> { + throw new IllegalStateException("Duplicate key (attempted merging values " + v1 + " and " + v2 + ")"); + }, (Supplier>) LinkedHashMap::new), Collections::unmodifiableMap); + } + } diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java index 0c991499cb527..d604d6075fa1c 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java @@ -9,6 +9,7 @@ package org.elasticsearch.discovery; import org.elasticsearch.Version; +import org.elasticsearch.cluster.service.ClusterApplierRecordingService; import org.elasticsearch.cluster.service.ClusterStateUpdateStats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,15 +26,17 @@ public class DiscoveryStats implements Writeable, ToXContentFragment { private final PendingClusterStateStats queueStats; private final PublishClusterStateStats publishStats; private final ClusterStateUpdateStats clusterStateUpdateStats; + private final ClusterApplierRecordingService.Stats applierRecordingStats; public DiscoveryStats( PendingClusterStateStats queueStats, PublishClusterStateStats publishStats, - ClusterStateUpdateStats clusterStateUpdateStats - ) { + ClusterStateUpdateStats clusterStateUpdateStats, + ClusterApplierRecordingService.Stats applierRecordingStats) { this.queueStats = queueStats; this.publishStats = publishStats; this.clusterStateUpdateStats = clusterStateUpdateStats; + this.applierRecordingStats = applierRecordingStats; } public DiscoveryStats(StreamInput in) throws IOException { @@ -44,6 +47,11 @@ public DiscoveryStats(StreamInput in) throws IOException { } else { clusterStateUpdateStats = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + applierRecordingStats = in.readOptionalWriteable(ClusterApplierRecordingService.Stats::new); + } else { + applierRecordingStats = null; + } } @Override @@ -53,6 +61,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_16_0)) { out.writeOptionalWriteable(clusterStateUpdateStats); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(applierRecordingStats); + } } @Override @@ -67,6 +78,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (clusterStateUpdateStats != null) { clusterStateUpdateStats.toXContent(builder, params); } + if (applierRecordingStats != null) { + applierRecordingStats.toXContent(builder, params); + } builder.endObject(); return builder; } @@ -86,4 +100,8 @@ public PendingClusterStateStats getQueueStats() { public PublishClusterStateStats getPublishStats() { return publishStats; } + + public ClusterApplierRecordingService.Stats getApplierRecordingStats() { + return applierRecordingStats; + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index b6d90e9e15ed3..6ffa99fea2f7e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -11,9 +11,12 @@ import org.elasticsearch.cluster.coordination.PendingClusterStateStats; import org.elasticsearch.cluster.coordination.PublishClusterStateStats; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterApplierRecordingService; +import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats.Recording; import org.elasticsearch.cluster.service.ClusterStateUpdateStats; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.core.Tuple; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; @@ -573,6 +576,15 @@ public static NodeStats createNodeStats() { } scriptStats = new ScriptStats(stats); } + ClusterApplierRecordingService.Stats timeTrackerStats; + if (randomBoolean()) { + timeTrackerStats = new ClusterApplierRecordingService.Stats( + randomMap(2, 32, () -> new Tuple<>(randomAlphaOfLength(4), new Recording(randomNonNegativeLong(), randomNonNegativeLong()))) + ); + } else { + timeTrackerStats = null; + } + DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats( randomBoolean() @@ -605,7 +617,8 @@ public static NodeStats createNodeStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) - : null) + : null, + timeTrackerStats) : null; IngestStats ingestStats = null; if (frequently()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java index e76bb5ba94d55..8b8e0834d1f9b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java @@ -10,7 +10,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplierRecordingService; +import java.util.Map; import java.util.function.Supplier; public class NoOpClusterApplier implements ClusterApplier { @@ -23,4 +25,9 @@ public void setInitialState(ClusterState initialState) { public void onNewClusterState(String source, Supplier clusterStateSupplier, ActionListener listener) { listener.onResponse(null); } + + @Override + public ClusterApplierRecordingService.Stats getStats() { + return new ClusterApplierRecordingService.Stats(Map.of()); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceStatsTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceStatsTests.java new file mode 100644 index 0000000000000..581beb8f873dc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceStatsTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.service; + +import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats; +import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats.Recording; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.HashMap; +import java.util.Map; + +public class ClusterApplierRecordingServiceStatsTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Stats::new; + } + + @Override + protected Stats createTestInstance() { + int numRecordings = randomInt(256); + Map recordings = new HashMap<>(numRecordings); + for (int i = 0; i < numRecordings; i++) { + recordings.put(randomAlphaOfLength(16), new Recording(randomNonNegativeLong(), randomNonNegativeLong())); + } + return new Stats(recordings); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceTests.java new file mode 100644 index 0000000000000..fdcac4d3e54cc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierRecordingServiceTests.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.service; + +import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder; +import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats.Recording; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; + +import static org.hamcrest.Matchers.contains; + +public class ClusterApplierRecordingServiceTests extends ESTestCase { + + public void testRecorder() { + long[] currentTime = new long[1]; + var recorder = new Recorder(() -> currentTime[0]); + { + Releasable releasable = recorder.record("action1"); + currentTime[0] = 5; + releasable.close(); + } + { + Releasable releasable = recorder.record("action2"); + currentTime[0] = 42; + releasable.close(); + } + { + Releasable releasable = recorder.record("action3"); + currentTime[0] = 45; + releasable.close(); + } + + var recordings = recorder.getRecordings(); + assertThat(recordings, contains(Tuple.tuple("action1", 5L), Tuple.tuple("action2", 37L), Tuple.tuple("action3", 3L))); + } + + public void testRecorderAlreadyRecording() { + var recorder = new Recorder(() -> 1L); + Releasable releasable = recorder.record("action1"); + expectThrows(IllegalStateException.class, () -> recorder.record("action2")); + } + + public void testRecordingServiceStats() { + var service = new ClusterApplierRecordingService(); + + { + long[] currentTime = new long[1]; + var recorder = new Recorder(() -> currentTime[0]); + try (var r = recorder.record("action1")) { + currentTime[0] = 5; + } + try (var r = recorder.record("action2")) { + currentTime[0] = 42; + } + try (var r = recorder.record("action3")) { + currentTime[0] = 45; + } + service.updateStats(recorder); + var stats = service.getStats(); + assertThat(stats.getRecordings().entrySet(), contains(Map.entry("action2", new Recording(1, 37)), + Map.entry("action1", new Recording(1, 5)), Map.entry("action3", new Recording(1, 3)))); + } + { + long[] currentTime = new long[1]; + var recorder = new Recorder(() -> currentTime[0]); + try (var r = recorder.record("action1")) { + currentTime[0] = 3; + } + try (var r = recorder.record("action2")) { + currentTime[0] = 35; + } + try (var r = recorder.record("action3")) { + currentTime[0] = 41; + } + service.updateStats(recorder); + var stats = service.getStats(); + assertThat(stats.getRecordings().entrySet(), contains(Map.entry("action2", new Recording(2, 69)), + Map.entry("action3", new Recording(2, 9)), Map.entry("action1", new Recording(2, 8)))); + } + { + long[] currentTime = new long[1]; + var recorder = new Recorder(() -> currentTime[0]); + try (var r = recorder.record("action1")) { + currentTime[0] = 2; + } + try (var r = recorder.record("action3")) { + currentTime[0] = 6; + } + service.updateStats(recorder); + var stats = service.getStats(); + assertThat(stats.getRecordings().entrySet(), contains(Map.entry("action3", new Recording(3, 13)), + Map.entry("action1", new Recording(3, 10)))); + } + } + +}