Skip to content

Commit

Permalink
Add cluster applier stats (#77552)
Browse files Browse the repository at this point in the history
Keep track of the time spent in each registered cluster state applier
/ listener and other cluster state applier activities. 

Also adjusted the discovery stats to include these time spent stats
broken down by individual registered appliers / listeners and other
cluster state applier activities.

Also replaced the usage of `StopWatch` in `ClusterStateApplierService` with
`ClusterApplierRecordingService.Recorder`.

Relates to #77466
  • Loading branch information
martijnvg authored Oct 7, 2021
1 parent 4d24273 commit e5f4588
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,25 @@
- is_true: nodes.$master.discovery.cluster_state_update.failure.completion_time
- is_true: nodes.$master.discovery.cluster_state_update.failure.master_apply_time
- is_true: nodes.$master.discovery.cluster_state_update.failure.notification_time

---
"Master cluster applier stats":
- skip:
features: [arbitrary_key]
version: "- 7.99.99"
reason: "not yet backported to 7.x branch"

- do:
nodes.info:
node_id: _master
- set:
nodes._arbitrary_key_: master

- do:
nodes.stats:
metric: [ discovery ]

- is_true: nodes.$master.discovery.cluster_applier_stats.recordings
- is_true: nodes.$master.discovery.cluster_applier_stats.recordings.0.name
- gte: { nodes.$master.discovery.cluster_applier_stats.recordings.0.cumulative_execution_count: 1 }
- gte: { nodes.$master.discovery.cluster_applier_stats.recordings.0.cumulative_execution_time_millis: 1 }
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,8 @@ public DiscoveryStats stats() {
return new DiscoveryStats(
new PendingClusterStateStats(0, 0, 0),
publicationHandler.stats(),
getLocalNode().isMasterNode() ? masterService.getClusterStateUpdateStats() : null
);
getLocalNode().isMasterNode() ? masterService.getClusterStateUpdateStats() : null,
clusterApplier.getStats());
}

public void startInitialJoin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ public interface ClusterApplier {
* themselves, typically using a more specific logger and at a less dramatic log level.
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ActionListener<Void> listener);

ClusterApplierRecordingService.Stats getStats();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.cluster.service;

import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats.Recording;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.LongSupplier;

public final class ClusterApplierRecordingService {

private final Map<String, MeanMetric> recordedActions = new HashMap<>();

synchronized Stats getStats() {
return new Stats(
recordedActions.entrySet().stream()
.sorted(Comparator.<Map.Entry<String, MeanMetric>>comparingLong(o -> o.getValue().sum()).reversed())
.collect(Maps.toUnmodifiableOrderedMap(Map.Entry::getKey, v -> new Recording(v.getValue().count(), v.getValue().sum())))
);
}

synchronized void updateStats(Recorder recorder) {
Set<String> seenActions = new HashSet<>();
for (Tuple<String, Long> entry : recorder.recordings) {
String action = entry.v1();
long timeSpentMS = entry.v2();

MeanMetric metric = recordedActions.computeIfAbsent(action, key -> new MeanMetric());
metric.inc(timeSpentMS);
seenActions.add(action);
}
recordedActions.entrySet().removeIf(entry -> seenActions.contains(entry.getKey()) == false);
}

static final class Recorder {

private String currentAction;
private long startTimeMS;
private boolean recording;
private final List<Tuple<String, Long>> recordings = new LinkedList<>();
private final LongSupplier currentTimeSupplier;

Recorder(LongSupplier currentTimeSupplier) {
this.currentTimeSupplier = currentTimeSupplier;
}

Releasable record(String action) {
if (recording) {
throw new IllegalStateException("already recording");
}

this.recording = true;
this.currentAction = action;
this.startTimeMS = currentTimeSupplier.getAsLong();
return this::stop;
}

void stop() {
recording = false;
long timeSpentMS = currentTimeSupplier.getAsLong() - this.startTimeMS;
recordings.add(new Tuple<>(currentAction, timeSpentMS));
}

List<Tuple<String, Long>> getRecordings() {
return recordings;
}
}

public static class Stats implements Writeable, ToXContentFragment {

private final Map<String, Recording> recordings;

public Stats(Map<String, Recording> recordings) {
this.recordings = recordings;
}

public Map<String, Recording> getRecordings() {
return recordings;
}

public Stats(StreamInput in) throws IOException {
this(in.readOrderedMap(StreamInput::readString, Recording::new));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("cluster_applier_stats");
builder.startArray("recordings");
for (Map.Entry<String, Recording> entry : recordings.entrySet()) {
builder.startObject();
builder.field("name", entry.getKey());
String name = "cumulative_execution";
builder.field(name + "_count", entry.getValue().count);
builder.humanReadableField(name + "_time_millis", name + "_time", TimeValue.timeValueMillis(entry.getValue().sum));
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(recordings, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Stats stats = (Stats) o;
return Objects.equals(recordings, stats.recordings);
}

@Override
public int hashCode() {
return Objects.hash(recordings);
}

public static class Recording implements Writeable {

private final long count;
private final long sum;

public Recording(long count, long sum) {
this.count = count;
this.sum = sum;
}

public Recording(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(sum);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Recording recording = (Recording) o;
return count == recording.count && sum == recording.sum;
}

@Override
public int hashCode() {
return Objects.hash(count, sum);
}

@Override
public String toString() {
return "Recording{" +
"count=" + count +
", sum=" + sum +
'}';
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -38,7 +38,6 @@
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -84,13 +83,16 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final String nodeName;

private final ClusterApplierRecordingService recordingService;

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.nodeName = nodeName;
this.recordingService = new ClusterApplierRecordingService();

this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
Expand Down Expand Up @@ -389,10 +391,10 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
final ClusterState previousClusterState = state.get();

final long startTimeMillis = threadPool.relativeTimeInMillis();
final StopWatch stopWatch = new StopWatch();
final Recorder stopWatch = new Recorder(threadPool::rawRelativeTimeInMillis);
final ClusterState newClusterState;
try {
try (Releasable ignored = stopWatch.timing("running task [" + source + ']')) {
try (Releasable ignored = stopWatch.record("running task [" + source + ']')) {
newClusterState = updateFunction.apply(previousClusterState);
}
} catch (Exception e) {
Expand Down Expand Up @@ -448,7 +450,7 @@ private TimeValue getTimeSince(long startTimeMillis) {
return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis));
}

private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, StopWatch stopWatch) {
private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, Recorder stopWatch) {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
Expand All @@ -465,15 +467,15 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
}

logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
try (Releasable ignored = stopWatch.timing("connecting to new nodes")) {
try (Releasable ignored = stopWatch.record("connecting to new nodes")) {
connectToNodesAndWait(newClusterState);
}

// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metadataChanged()) {
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
try (Releasable ignored = stopWatch.timing("applying settings")) {
try (Releasable ignored = stopWatch.record("applying settings")) {
clusterSettings.applySettings(incomingSettings);
}
}
Expand Down Expand Up @@ -505,33 +507,35 @@ protected final void connectToNodesAsync(ClusterState newClusterState, Runnable
nodeConnectionsService.connectToNodes(newClusterState.nodes(), onCompletion);
}

private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) {
callClusterStateAppliers(clusterChangedEvent, stopWatch, highPriorityStateAppliers);
callClusterStateAppliers(clusterChangedEvent, stopWatch, normalPriorityStateAppliers);
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
}

private static void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch,
private static void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch,
Collection<ClusterStateApplier> clusterStateAppliers) {
for (ClusterStateApplier applier : clusterStateAppliers) {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
final String name = applier.toString();
try (Releasable ignored = stopWatch.record(name)) {
applier.applyClusterState(clusterChangedEvent);
}
}
}

private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) {
callClusterStateListener(clusterChangedEvent, stopWatch, clusterStateListeners);
callClusterStateListener(clusterChangedEvent, stopWatch, timeoutClusterStateListeners.keySet());
}

private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch,
private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch,
Collection<? extends ClusterStateListener> listeners) {
for (ClusterStateListener listener : listeners) {
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
final String name = listener.toString();
try (Releasable ignored = stopWatch.record(name)) {
listener.clusterChanged(clusterChangedEvent);
}
} catch (Exception ex) {
Expand Down Expand Up @@ -579,12 +583,13 @@ public void onResponse(Void unused) {
}
}

private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, StopWatch stopWatch) {
private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, Recorder recorder) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of [{}]: {}", source, executionTime,
slowTaskLoggingThreshold, Arrays.stream(stopWatch.taskInfo())
.map(ti -> '[' + ti.getTaskName() + "] took [" + ti.getTime().millis() + "ms]").collect(Collectors.joining(", ")));
slowTaskLoggingThreshold, recorder.getRecordings().stream()
.map(ti -> '[' + ti.v1() + "] took [" + ti.v2() + "ms]").collect(Collectors.joining(", ")));
}
recordingService.updateStats(recorder);
}

private class NotifyTimeout implements Runnable {
Expand Down Expand Up @@ -623,4 +628,9 @@ public void run() {
protected boolean applicationMayFail() {
return false;
}

@Override
public ClusterApplierRecordingService.Stats getStats() {
return recordingService.getStats();
}
}
Loading

0 comments on commit e5f4588

Please sign in to comment.