Skip to content

Commit

Permalink
Add cluster state stats
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Oct 17, 2023
1 parent 6c02261 commit 198e2fa
Show file tree
Hide file tree
Showing 17 changed files with 358 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839))
- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261))
- Add cluster state stats

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -83,6 +84,7 @@
import org.opensearch.discovery.PeerFinder;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -184,6 +186,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteStoreNodeService remoteStoreNodeService;
private final RemoteClusterStateService remoteClusterStateService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -206,7 +209,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -295,6 +299,7 @@ public Coordinator(
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -865,7 +870,10 @@ protected void doStart() {

@Override
public DiscoveryStats stats() {
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats());
ClusterStateStats clusterStateStats = remoteClusterStateService != null
? clusterManagerService.getStateStats(this.remoteClusterStateService.getRemoteClusterStateStats())
: clusterManagerService.getStateStats();
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.gateway.remote.RemoteClusterStateStats;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Cluster state related stats.
*
* @opensearch.internal
*/
public class ClusterStateStats implements Writeable, ToXContentObject {

private AtomicLong stateUpdated = new AtomicLong(0);
private AtomicLong stateUpdateTimeTotalInMS = new AtomicLong(0);
private AtomicLong stateUpdateFailed = new AtomicLong(0);
private RemoteClusterStateStats remoteStateStats = null;

public ClusterStateStats() {}

public long getStateUpdated() {
return stateUpdated.get();
}

public long getStateUpdateTimeTotalInMS() {
return stateUpdateTimeTotalInMS.get();
}

public long getStateUpdateFailed() {
return stateUpdateFailed.get();
}

public void stateUpdated() {
stateUpdated.incrementAndGet();
}

public void stateUpdateFailed() {
stateUpdateFailed.incrementAndGet();
}

public void stateUpdateTook(long stateUpdateTime) {
stateUpdateTimeTotalInMS.addAndGet(stateUpdateTime);
}

public void setRemoteStateStats(RemoteClusterStateStats remoteStateStats) {
this.remoteStateStats = remoteStateStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(stateUpdated.get());
out.writeVLong(stateUpdateTimeTotalInMS.get());
out.writeVLong(stateUpdateFailed.get());
if (remoteStateStats != null) {
remoteStateStats.writeTo(out);
} else {
out.writeBoolean(false);
}
}

public ClusterStateStats(StreamInput in) throws IOException {
this.stateUpdated = new AtomicLong(in.readVLong());
this.stateUpdateTimeTotalInMS = new AtomicLong(in.readVLong());
this.stateUpdateFailed = new AtomicLong(in.readVLong());
if (in.readBoolean()) {
this.remoteStateStats = new RemoteClusterStateStats(in);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.CLUSTER_STATE);
builder.startObject(Fields.OVERALL_STATS);
builder.field(Fields.UPDATE_COUNT, getStateUpdated());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTimeTotalInMS());
builder.field(Fields.FAILED_COUNT, getStateUpdateFailed());
builder.endObject();
if (remoteStateStats != null) {
remoteStateStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}

/**
* Fields for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String CLUSTER_STATE = "cluster_state";
static final String OVERALL_STATS = "overall_stats";
static final String UPDATE_COUNT = "update_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.opensearch.core.common.text.Text;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.RemoteClusterStateStats;
import org.opensearch.node.Node;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -112,7 +113,9 @@ public class MasterService extends AbstractLifecycleComponent {

static final String CLUSTER_MANAGER_UPDATE_THREAD_NAME = "clusterManagerService#updateTask";

/** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME} */
/**
* @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME}
*/
@Deprecated
static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

Expand All @@ -130,6 +133,7 @@ public class MasterService extends AbstractLifecycleComponent {
private volatile Batcher taskBatcher;
protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
private final ClusterManagerThrottlingStats throttlingStats;
private final ClusterStateStats stateStats;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
Expand All @@ -147,6 +151,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
this::getMinNodeVersion,
throttlingStats
);
this.stateStats = new ClusterStateStats();
this.threadPool = threadPool;
}

Expand Down Expand Up @@ -339,7 +344,7 @@ private TimeValue getTimeSince(long startTimeNanos) {
return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos));
}

protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNanos) {
final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {
@Override
protected boolean blockingAllowed() {
Expand All @@ -352,8 +357,12 @@ protected boolean blockingAllowed() {
try {
FutureUtils.get(fut);
onPublicationSuccess(clusterChangedEvent, taskOutputs);
final long durationMillis = getTimeSince(startTimeNanos).millis();
stateStats.stateUpdateTook(durationMillis);
stateStats.stateUpdated();
} catch (Exception e) {
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);
stateStats.stateUpdateFailed();
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e);
}
}

Expand Down Expand Up @@ -464,7 +473,6 @@ public Builder incrementVersion(ClusterState clusterState) {
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* task
*
*/
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener> void submitStateUpdateTask(
String source,
Expand All @@ -490,7 +498,6 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & Cluster
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTask(
String source,
Expand Down Expand Up @@ -947,7 +954,7 @@ void onNoLongerClusterManager() {
/**
* Functionality for register task key to cluster manager node.
*
* @param taskKey - task key of task
* @param taskKey - task key of task
* @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not
* @return throttling task key which needs to be passed while submitting task to cluster manager
*/
Expand All @@ -966,7 +973,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(Stri
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTasks(
final String source,
Expand Down Expand Up @@ -996,4 +1002,13 @@ public <T> void submitStateUpdateTasks(
}
}

public ClusterStateStats getStateStats(RemoteClusterStateStats remoteStateStats) {
stateStats.setRemoteStateStats(remoteStateStats);
return stateStats;
}

public ClusterStateStats getStateStats() {
return stateStats;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
Expand Down Expand Up @@ -133,7 +134,8 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -211,7 +213,8 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
remoteClusterStateService
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@

package org.opensearch.discovery;

import org.opensearch.Version;
import org.opensearch.cluster.coordination.PendingClusterStateStats;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -51,21 +53,31 @@ public class DiscoveryStats implements Writeable, ToXContentFragment {

private final PendingClusterStateStats queueStats;
private final PublishClusterStateStats publishStats;
private final ClusterStateStats clusterStateStats;

public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) {
public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats, ClusterStateStats clusterStateStats) {
this.queueStats = queueStats;
this.publishStats = publishStats;
this.clusterStateStats = clusterStateStats;
}

public DiscoveryStats(StreamInput in) throws IOException {
queueStats = in.readOptionalWriteable(PendingClusterStateStats::new);
publishStats = in.readOptionalWriteable(PublishClusterStateStats::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
clusterStateStats = in.readOptionalWriteable(ClusterStateStats::new);
} else {
clusterStateStats = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(queueStats);
out.writeOptionalWriteable(publishStats);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(clusterStateStats);
}
}

@Override
Expand All @@ -77,6 +89,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (publishStats != null) {
publishStats.toXContent(builder, params);
}
if (clusterStateStats != null) {
clusterStateStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
Expand All @@ -92,4 +107,8 @@ public PendingClusterStateStats getQueueStats() {
public PublishClusterStateStats getPublishStats() {
return publishStats;
}

public ClusterStateStats getClusterStateStats() {
return clusterStateStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,9 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) ==
// After the above PR is pushed, we can remove this silent failure and throw the exception instead.
logger.error("Remote repository is not yet registered");
lastAcceptedState = clusterState;
remoteClusterStateService.writeMetadataFailed();
} catch (Exception e) {
remoteClusterStateService.writeMetadataFailed();
handleExceptionOnWrite(e);
}
}
Expand Down
Loading

0 comments on commit 198e2fa

Please sign in to comment.