Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster state stats #10670

Merged
merged 12 commits into from
Oct 24, 2023
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839))
- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261))
amkhar marked this conversation as resolved.
Show resolved Hide resolved
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down Expand Up @@ -121,4 +132,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -83,6 +84,7 @@
import org.opensearch.discovery.PeerFinder;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -184,6 +186,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteStoreNodeService remoteStoreNodeService;
private final RemoteClusterStateService remoteClusterStateService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -206,7 +209,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
amkhar marked this conversation as resolved.
Show resolved Hide resolved
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -295,6 +299,7 @@ public Coordinator(
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -865,7 +870,10 @@ protected void doStart() {

@Override
public DiscoveryStats stats() {
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats());
ClusterStateStats clusterStateStats = remoteClusterStateService != null
? clusterManagerService.getStateStats(this.remoteClusterStateService.getRemoteClusterStateStats())
: clusterManagerService.getStateStats();
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.gateway.remote.RemoteClusterStateStats;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Cluster state related stats.
*
* @opensearch.internal
*/
public class ClusterStateStats implements Writeable, ToXContentObject {

private AtomicLong stateUpdated = new AtomicLong(0);
amkhar marked this conversation as resolved.
Show resolved Hide resolved
private AtomicLong stateUpdateTimeTotalInMS = new AtomicLong(0);
amkhar marked this conversation as resolved.
Show resolved Hide resolved
private AtomicLong stateUpdateFailed = new AtomicLong(0);
private RemoteClusterStateStats remoteStateStats = null;

public ClusterStateStats() {}

public long getStateUpdated() {
return stateUpdated.get();
}

public long getStateUpdateTimeTotalInMS() {
return stateUpdateTimeTotalInMS.get();
}

public long getStateUpdateFailed() {
return stateUpdateFailed.get();
}

public void stateUpdated() {
stateUpdated.incrementAndGet();
}

public void stateUpdateFailed() {
stateUpdateFailed.incrementAndGet();
}

public void stateUpdateTook(long stateUpdateTime) {
stateUpdateTimeTotalInMS.addAndGet(stateUpdateTime);
}

public void setRemoteStateStats(RemoteClusterStateStats remoteStateStats) {
this.remoteStateStats = remoteStateStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(stateUpdated.get());
out.writeVLong(stateUpdateTimeTotalInMS.get());
out.writeVLong(stateUpdateFailed.get());
if (remoteStateStats != null) {
remoteStateStats.writeTo(out);
} else {
out.writeBoolean(false);
}
}

public ClusterStateStats(StreamInput in) throws IOException {
this.stateUpdated = new AtomicLong(in.readVLong());
this.stateUpdateTimeTotalInMS = new AtomicLong(in.readVLong());
this.stateUpdateFailed = new AtomicLong(in.readVLong());
if (in.readBoolean()) {
this.remoteStateStats = new RemoteClusterStateStats(in);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.CLUSTER_STATE);
builder.startObject(Fields.OVERALL_STATS);
builder.field(Fields.UPDATE_COUNT, getStateUpdated());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTimeTotalInMS());
builder.field(Fields.FAILED_COUNT, getStateUpdateFailed());
builder.endObject();
if (remoteStateStats != null) {
remoteStateStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}

/**
* Fields for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String CLUSTER_STATE = "cluster_state";
amkhar marked this conversation as resolved.
Show resolved Hide resolved
static final String OVERALL_STATS = "overall_stats";
amkhar marked this conversation as resolved.
Show resolved Hide resolved
static final String UPDATE_COUNT = "update_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.opensearch.core.common.text.Text;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.RemoteClusterStateStats;
import org.opensearch.node.Node;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -112,7 +113,9 @@ public class MasterService extends AbstractLifecycleComponent {

static final String CLUSTER_MANAGER_UPDATE_THREAD_NAME = "clusterManagerService#updateTask";

/** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME} */
/**
* @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME}
*/
@Deprecated
static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

Expand All @@ -130,6 +133,7 @@ public class MasterService extends AbstractLifecycleComponent {
private volatile Batcher taskBatcher;
protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
private final ClusterManagerThrottlingStats throttlingStats;
private final ClusterStateStats stateStats;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
Expand All @@ -147,6 +151,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
this::getMinNodeVersion,
throttlingStats
);
this.stateStats = new ClusterStateStats();
this.threadPool = threadPool;
}

Expand Down Expand Up @@ -339,7 +344,7 @@ private TimeValue getTimeSince(long startTimeNanos) {
return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos));
}

protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNanos) {
final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {
@Override
protected boolean blockingAllowed() {
Expand All @@ -352,8 +357,12 @@ protected boolean blockingAllowed() {
try {
FutureUtils.get(fut);
onPublicationSuccess(clusterChangedEvent, taskOutputs);
final long durationMillis = getTimeSince(startTimeNanos).millis();
stateStats.stateUpdateTook(durationMillis);
stateStats.stateUpdated();
} catch (Exception e) {
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);
stateStats.stateUpdateFailed();
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e);
}
}

Expand Down Expand Up @@ -464,7 +473,6 @@ public Builder incrementVersion(ClusterState clusterState) {
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* task
*
*/
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener> void submitStateUpdateTask(
String source,
Expand All @@ -490,7 +498,6 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & Cluster
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTask(
String source,
Expand Down Expand Up @@ -947,7 +954,7 @@ void onNoLongerClusterManager() {
/**
* Functionality for register task key to cluster manager node.
*
* @param taskKey - task key of task
* @param taskKey - task key of task
* @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not
* @return throttling task key which needs to be passed while submitting task to cluster manager
*/
Expand All @@ -966,7 +973,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(Stri
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTasks(
final String source,
Expand Down Expand Up @@ -996,4 +1002,13 @@ public <T> void submitStateUpdateTasks(
}
}

public ClusterStateStats getStateStats(RemoteClusterStateStats remoteStateStats) {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
stateStats.setRemoteStateStats(remoteStateStats);
return stateStats;
}

public ClusterStateStats getStateStats() {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
return stateStats;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
Expand Down Expand Up @@ -133,7 +134,8 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
amkhar marked this conversation as resolved.
Show resolved Hide resolved
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -211,7 +213,8 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
remoteClusterStateService
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@

package org.opensearch.discovery;

import org.opensearch.Version;
import org.opensearch.cluster.coordination.PendingClusterStateStats;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -51,21 +53,31 @@ public class DiscoveryStats implements Writeable, ToXContentFragment {

private final PendingClusterStateStats queueStats;
private final PublishClusterStateStats publishStats;
private final ClusterStateStats clusterStateStats;

public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) {
public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats, ClusterStateStats clusterStateStats) {
this.queueStats = queueStats;
this.publishStats = publishStats;
this.clusterStateStats = clusterStateStats;
}

public DiscoveryStats(StreamInput in) throws IOException {
queueStats = in.readOptionalWriteable(PendingClusterStateStats::new);
publishStats = in.readOptionalWriteable(PublishClusterStateStats::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
clusterStateStats = in.readOptionalWriteable(ClusterStateStats::new);
} else {
clusterStateStats = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(queueStats);
out.writeOptionalWriteable(publishStats);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(clusterStateStats);
}
}

@Override
Expand All @@ -77,6 +89,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (publishStats != null) {
publishStats.toXContent(builder, params);
}
if (clusterStateStats != null) {
clusterStateStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
Expand All @@ -92,4 +107,8 @@ public PendingClusterStateStats getQueueStats() {
public PublishClusterStateStats getPublishStats() {
return publishStats;
}

public ClusterStateStats getClusterStateStats() {
return clusterStateStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,9 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) ==
// After the above PR is pushed, we can remove this silent failure and throw the exception instead.
logger.error("Remote repository is not yet registered");
lastAcceptedState = clusterState;
remoteClusterStateService.writeMetadataFailed();
amkhar marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
remoteClusterStateService.writeMetadataFailed();
handleExceptionOnWrite(e);
}
}
Expand Down
Loading
Loading