diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 9f3f9093b..2fb1421e3 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -118,10 +118,14 @@ import org.opensearch.plugins.EnginePlugin import org.opensearch.plugins.PersistentTaskPlugin import org.opensearch.plugins.Plugin import org.opensearch.plugins.RepositoryPlugin +import org.opensearch.replication.action.stats.FollowerStatsAction import org.opensearch.replication.action.stats.LeaderStatsAction +import org.opensearch.replication.action.stats.TransportFollowerStatsAction import org.opensearch.replication.action.stats.TransportLeaderStatsAction +import org.opensearch.replication.rest.FollowerStatsHandler import org.opensearch.replication.rest.LeaderStatsHandler import org.opensearch.replication.seqno.RemoteClusterStats +import org.opensearch.replication.task.shard.FollowerClusterStats import org.opensearch.repositories.RepositoriesService import org.opensearch.repositories.Repository import org.opensearch.rest.RestController @@ -142,6 +146,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, private lateinit var threadPool: ThreadPool private lateinit var replicationMetadataManager: ReplicationMetadataManager private lateinit var replicationSettings: ReplicationSettings + private var followerClusterStats = FollowerClusterStats() companion object { const val REPLICATION_EXECUTOR_NAME_LEADER = "replication_leader" @@ -184,7 +189,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client, ReplicationMetadataStore(client, clusterService, xContentRegistry)) this.replicationSettings = ReplicationSettings(clusterService) - return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings) + return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings, followerClusterStats) } override fun getGuiceServiceClasses(): Collection> { @@ -213,7 +218,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, ActionHandler(UpdateReplicationStateAction.INSTANCE, TransportUpdateReplicationStateDetails::class.java), ActionHandler(ShardsInfoAction.INSTANCE, TranportShardsInfoAction::class.java), ActionHandler(ReplicationStatusAction.INSTANCE,TransportReplicationStatusAction::class.java), - ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java) + ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java), + ActionHandler(FollowerStatsAction.INSTANCE, TransportFollowerStatsAction::class.java) ) } @@ -229,7 +235,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, UpdateIndexHandler(), StopIndexReplicationHandler(), ReplicationStatusHandler(), - LeaderStatsHandler()) + LeaderStatsHandler(), + FollowerStatsHandler()) } override fun getExecutorBuilders(settings: Settings): List> { @@ -263,7 +270,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, expressionResolver: IndexNameExpressionResolver) : List> { return listOf( - ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings), + ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, followerClusterStats), IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, settingsModule), AutoFollowExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings)) } diff --git a/src/main/kotlin/org/opensearch/replication/action/stats/FollowerNodeStatsResponse.kt b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerNodeStatsResponse.kt new file mode 100644 index 000000000..b6a8ef73d --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerNodeStatsResponse.kt @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.stats + +import org.opensearch.action.support.nodes.BaseNodeResponse +import org.opensearch.cluster.node.DiscoveryNode +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.index.shard.ShardId +import org.opensearch.replication.task.shard.FollowerShardMetric +import org.opensearch.replication.task.shard.FollowerShardMetric.FollowerShardStats +import java.io.IOException + +class FollowerNodeStatsResponse : BaseNodeResponse { + var stats :Map + + constructor(inp: StreamInput) : super(inp) { + stats = inp.readMap(::ShardId, ::FollowerShardStats) + } + + constructor(node : DiscoveryNode, remoteClusterStats: Map) : super(node) { + stats = remoteClusterStats.mapValues { (_ , v) -> v.createStats() } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeMap(stats, { o, k -> k.writeTo(o)}, { o, v -> v.writeTo(o)}) + } +} diff --git a/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsAction.kt b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsAction.kt new file mode 100644 index 000000000..8ecafb982 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsAction.kt @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.stats + +import org.opensearch.action.ActionType +import org.opensearch.common.io.stream.Writeable + +class FollowerStatsAction : ActionType(NAME, reader) { + companion object { + const val NAME = "indices:admin/plugins/replication/follower/stats" + val INSTANCE = FollowerStatsAction() + val reader = Writeable.Reader { inp -> FollowerStatsResponse(inp) } + } + + override fun getResponseReader(): Writeable.Reader = reader +} + diff --git a/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsRequest.kt b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsRequest.kt new file mode 100644 index 000000000..e79e4d2e2 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsRequest.kt @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.stats + +import org.opensearch.action.support.nodes.BaseNodesRequest +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import java.io.IOException + +/** + * A request to get node (cluster) level replication stats. + */ +class FollowerStatsRequest : BaseNodesRequest { + + /** + * + * @param in A stream input object. + * @throws IOException if the stream cannot be deserialized. + */ + constructor(`in`: StreamInput) : super(`in`) + + /** + * Get information from nodes based on the nodes ids specified. If none are passed, information + * for all nodes will be returned. + */ + constructor(vararg nodesIds: String) : super(*nodesIds) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + } + +} diff --git a/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsResponse.kt b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsResponse.kt new file mode 100644 index 000000000..2f1de07ff --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/stats/FollowerStatsResponse.kt @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.stats + + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.FailedNodeException +import org.opensearch.action.support.nodes.BaseNodesResponse +import org.opensearch.cluster.ClusterName +import org.opensearch.common.Strings +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS +import org.opensearch.common.xcontent.ToXContent.Params +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.index.shard.ShardId +import org.opensearch.replication.metadata.ReplicationOverallState +import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE +import org.opensearch.replication.metadata.state.ReplicationStateMetadata +import org.opensearch.replication.task.shard.FollowerIndexStats +import org.opensearch.replication.task.shard.FollowerShardMetric.FollowerShardStats +import java.io.IOException + +class FollowerStatsResponse : BaseNodesResponse, ToXContentFragment { + var shardStats :MutableMap = mutableMapOf() + var indexStats :MutableMap = mutableMapOf() + + var pausedIndices :Int = 0 + var failedIndices :Int = 0 + var bootstrappingIndices :Int = 0 + var syncingIndices :Int = 0 + var shardTaskCount :Int = 0 + var indexTaskCount :Int = 0 + + var opsWritten: Long = 0 + var opsWriteFailures: Long = 0 + var opsRead: Long = 0 + var opsReadFailures: Long = 0 + var localCheckpoint: Long = 0 + var remoteCheckpoint: Long = 0 + var totalWriteTime: Long = 0 + + companion object { + private val log = LogManager.getLogger(FollowerStatsResponse::class.java) + } + + constructor(inp: StreamInput) : super(inp) { + shardStats = inp.readMap(::ShardId, ::FollowerShardStats) + } + + constructor(clusterName: ClusterName?, followerNodeResponse: List?, failures: List? + , metadata : ReplicationStateMetadata) : super(clusterName, followerNodeResponse, failures) { + + var syncing :MutableSet = mutableSetOf() + if (followerNodeResponse != null) { + for (response in followerNodeResponse) { + shardStats.putAll(response.stats) + + for (i in response.stats) { + syncing.add(i.key.indexName) + + if (i.key.indexName !in indexStats) { + indexStats[i.key.indexName] = FollowerIndexStats() + } + indexStats[i.key.indexName]!!.add(i.value) + + add(i.value) + } + } + } + + var totalRunning = 0 //includes boostrap and syncing + for (entry in metadata.replicationDetails) { + when (entry.value[REPLICATION_LAST_KNOWN_OVERALL_STATE]) { + ReplicationOverallState.RUNNING.name -> totalRunning++ + ReplicationOverallState.FAILED.name -> failedIndices++ + ReplicationOverallState.PAUSED.name -> pausedIndices++ + } + } + + syncingIndices = syncing.size + bootstrappingIndices = totalRunning - syncingIndices + + shardTaskCount = shardStats.size + indexTaskCount = totalRunning + } + + fun add(stat: FollowerShardStats) { + opsWritten += stat.opsWritten + opsWriteFailures += stat.opsWriteFailures + opsRead += stat.opsRead + opsReadFailures += stat.opsReadFailures + localCheckpoint += stat.localCheckpoint + remoteCheckpoint += stat.remoteCheckpoint + totalWriteTime += stat.totalWriteTime + } + + @Throws(IOException::class) + override fun readNodesFrom(`in`: StreamInput): List { + return `in`.readList { FollowerNodeStatsResponse(`in`) } + } + + @Throws(IOException::class) + override fun writeNodesTo(out: StreamOutput, leaderNodeRespons: List?) { + out.writeList(leaderNodeRespons) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder { + builder.startObject() + builder.field("num_syncing_indices", syncingIndices) + builder.field("num_bootstrapping_indices", bootstrappingIndices) + builder.field("num_paused_indices", pausedIndices) + builder.field("num_shard_tasks", shardTaskCount) + builder.field("num_index_tasks", indexTaskCount) + builder.field("operations_written", opsWritten) + builder.field("operations_read", opsRead) + builder.field("failed_read_requests", opsReadFailures) + builder.field("failed_write_requests", opsWriteFailures) + builder.field("total_local_checkpoint", localCheckpoint) + builder.field("total_remote_checkpoint", remoteCheckpoint) + builder.field("total_write_time_millis", totalWriteTime) + builder.field("index_stats").map(indexStats) + builder.endObject() + return builder + } + + override fun toString(): String { + val builder: XContentBuilder = XContentFactory.jsonBuilder().prettyPrint() + toXContent(builder, EMPTY_PARAMS) + return Strings.toString(builder) + } +} + diff --git a/src/main/kotlin/org/opensearch/replication/action/stats/TransportFollowerStatsAction.kt b/src/main/kotlin/org/opensearch/replication/action/stats/TransportFollowerStatsAction.kt new file mode 100644 index 000000000..e33d9a0c0 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/stats/TransportFollowerStatsAction.kt @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.stats + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope +import org.apache.logging.log4j.LogManager +import org.opensearch.action.FailedNodeException +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.nodes.TransportNodesAction +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.replication.metadata.state.ReplicationStateMetadata +import org.opensearch.replication.seqno.RemoteClusterStats +import org.opensearch.replication.task.shard.FollowerClusterStats +import org.opensearch.threadpool.ThreadPool +import org.opensearch.transport.TransportService + +class TransportFollowerStatsAction @Inject constructor(transportService: TransportService, + clusterService: ClusterService, + threadPool: ThreadPool, + actionFilters: ActionFilters, + private val remoteStats: RemoteClusterStats, + private val client: NodeClient, + private val followerStats: FollowerClusterStats) : + TransportNodesAction(FollowerStatsAction.NAME, + threadPool, clusterService, transportService, actionFilters, ::FollowerStatsRequest, ::NodeStatsRequest, ThreadPool.Names.MANAGEMENT, + FollowerNodeStatsResponse::class.java), CoroutineScope by GlobalScope { + + companion object { + private val log = LogManager.getLogger(TransportFollowerStatsAction::class.java) + } + + override fun newNodeRequest(request: FollowerStatsRequest): NodeStatsRequest { + return NodeStatsRequest() + } + + override fun newNodeResponse(input: StreamInput): FollowerNodeStatsResponse { + return FollowerNodeStatsResponse(input) + } + + override fun newResponse(request: FollowerStatsRequest?, responses: MutableList?, failures: MutableList?): FollowerStatsResponse { + val metadata = clusterService.state().metadata().custom(ReplicationStateMetadata.NAME) ?: ReplicationStateMetadata.EMPTY + return FollowerStatsResponse(clusterService.clusterName, responses, failures, metadata) + } + + override fun nodeOperation(nodeStatRequest: NodeStatsRequest?): FollowerNodeStatsResponse { + return FollowerNodeStatsResponse(this.clusterService.localNode(), followerStats.stats) + } +} diff --git a/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt b/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt index 63cd97e95..629b23360 100644 --- a/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt @@ -11,12 +11,10 @@ package org.opensearch.replication.action.status -import org.opensearch.replication.metadata.ReplicationMetadataManager import org.apache.logging.log4j.LogManager import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.DefaultShardOperationFailedException import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction -import org.opensearch.client.Client import org.opensearch.cluster.ClusterState import org.opensearch.cluster.block.ClusterBlockException import org.opensearch.cluster.metadata.IndexNameExpressionResolver @@ -25,7 +23,6 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.Writeable -import org.opensearch.index.IndexNotFoundException import org.opensearch.index.IndexService import org.opensearch.indices.IndicesService import org.opensearch.threadpool.ThreadPool @@ -35,13 +32,11 @@ import java.util.* class TranportShardsInfoAction @Inject constructor(clusterService: ClusterService, transportService: TransportService, - replicationMetadataManager: ReplicationMetadataManager, threadPool: ThreadPool, actionFilters: ActionFilters, - client: Client, indexNameExpressionResolver: IndexNameExpressionResolver?, private val indicesService: IndicesService - ) +) : TransportBroadcastByNodeAction< ShardInfoRequest, ReplicationStatusResponse, @@ -85,7 +80,7 @@ class TranportShardsInfoAction @Inject constructor(clusterService: ClusterServi val indexShard = indexService.getShard(shardRouting.shardId().id()) var indexState = indexShard.recoveryState().index - if (indexShard.recoveryState().recoverySource.type.equals(org.opensearch.cluster.routing.RecoverySource.Type.SNAPSHOT) and + if (indexShard.recoveryState().recoverySource.type.equals(RecoverySource.Type.SNAPSHOT) and (indexState.recoveredBytesPercent() <100)) { return ShardInfoResponse(shardRouting.shardId(),"BOOTSTRAPPING", RestoreDetails(indexState.totalBytes(), indexState.recoveredBytes(), diff --git a/src/main/kotlin/org/opensearch/replication/rest/FollowerStatsHandler.kt b/src/main/kotlin/org/opensearch/replication/rest/FollowerStatsHandler.kt new file mode 100644 index 000000000..120264661 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/rest/FollowerStatsHandler.kt @@ -0,0 +1,49 @@ +package org.opensearch.replication.rest + +import org.apache.logging.log4j.LogManager +import org.opensearch.client.node.NodeClient +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.replication.action.stats.FollowerStatsAction +import org.opensearch.replication.action.stats.FollowerStatsRequest +import org.opensearch.replication.action.stats.FollowerStatsResponse +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestChannel +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestResponse +import org.opensearch.rest.RestStatus +import org.opensearch.rest.action.RestResponseListener +import java.io.IOException + +class FollowerStatsHandler : BaseRestHandler() { + companion object { + private val log = LogManager.getLogger(FollowerStatsHandler::class.java) + } + + override fun routes(): List { + return listOf(RestHandler.Route(RestRequest.Method.GET, "/_plugins/_replication/follower_stats")) + } + + override fun getName(): String { + return "plugins_follower_replication_stats" + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val statsRequest = FollowerStatsRequest() + return RestChannelConsumer { channel: RestChannel? -> + client.admin().cluster() + .execute(FollowerStatsAction.INSTANCE, statsRequest, object : RestResponseListener(channel) { + @Throws(Exception::class) + override fun buildResponse(nodesStatsResponse: FollowerStatsResponse): RestResponse? { + val builder: XContentBuilder = XContentFactory.jsonBuilder().prettyPrint() + return BytesRestResponse(RestStatus.OK, nodesStatsResponse.toXContent(builder, ToXContent.EMPTY_PARAMS)) + } + }) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/FollowerClusterStats.kt b/src/main/kotlin/org/opensearch/replication/task/shard/FollowerClusterStats.kt new file mode 100644 index 000000000..9fbeb94c1 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/task/shard/FollowerClusterStats.kt @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.task.shard + +import org.apache.logging.log4j.LogManager +import org.opensearch.common.inject.Singleton +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.index.shard.ShardId +import java.util.concurrent.atomic.AtomicLong + +class FollowerShardMetric { + var localCheckpoint: Long = 0L + var remoteCheckpoint: Long = 0L + var opsWritten :AtomicLong = AtomicLong() + var opsWriteFailures :AtomicLong = AtomicLong() + var opsRead :AtomicLong = AtomicLong() + var opsReadFailures :AtomicLong = AtomicLong() + val totalWriteTime :AtomicLong = AtomicLong() + + constructor() + + companion object { + private val log = LogManager.getLogger(FollowerShardMetric::class.java) + } + + /** + * Creates a serializable representation for these metrics. + */ + fun createStats() : FollowerShardStats { + return FollowerShardStats(opsWritten.get(), opsWriteFailures.get(), opsRead.get(), opsReadFailures.get(), localCheckpoint, remoteCheckpoint, totalWriteTime.get()) + } + + class FollowerShardStats(var opsWritten: Long = 0, var opsWriteFailures: Long = 0, var opsRead: Long = 0, var opsReadFailures: Long = 0, + var localCheckpoint: Long = 0, var remoteCheckpoint: Long = 0, var totalWriteTime: Long = 0) : ToXContentObject { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + builder.field("operations_written", opsWritten) + builder.field("operations_read", opsRead) + builder.field("failed_read_requests", opsReadFailures) + builder.field("failed_write_requests", opsWriteFailures) + builder.field("local_checkpoint", localCheckpoint) + builder.field("remote_checkpoint", remoteCheckpoint) + builder.field("total_write_time_millis", totalWriteTime) + return builder.endObject() + } + + constructor(inp: StreamInput) : this() { + opsWritten = inp.readLong() + opsRead = inp.readLong() + localCheckpoint = inp.readLong() + remoteCheckpoint = inp.readLong() + totalWriteTime = inp.readLong() + } + + fun writeTo(out: StreamOutput) { + out.writeLong(opsWritten) + out.writeLong(opsRead) + out.writeLong(localCheckpoint) + out.writeLong(remoteCheckpoint) + out.writeLong(totalWriteTime) + } + } +} + +// Aggregated across all shards for an index +class FollowerIndexStats(var opsWritten: Long = 0, var opsWriteFailures: Long = 0, var opsRead: Long = 0, var opsReadFailures: Long = 0, + var localCheckpoint: Long = 0, var remoteCheckpoint: Long = 0, var totalWriteTime: Long = 0) : ToXContentObject { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + builder.field("operations_written", opsWritten) + builder.field("operations_read", opsRead) + builder.field("failed_read_requests", opsReadFailures) + builder.field("failed_write_requests", opsWriteFailures) + builder.field("local_checkpoint", localCheckpoint) + builder.field("remote_checkpoint", remoteCheckpoint) + builder.field("total_write_time_millis", totalWriteTime) + return builder.endObject() + } + + fun add(stat: FollowerShardMetric.FollowerShardStats) { + opsWritten += stat.opsWritten + opsWriteFailures += stat.opsWriteFailures + opsRead += stat.opsRead + opsReadFailures += stat.opsReadFailures + localCheckpoint += stat.localCheckpoint + remoteCheckpoint += stat.remoteCheckpoint + totalWriteTime += stat.totalWriteTime + } +} + +@Singleton +class FollowerClusterStats { + var stats :MutableMap = mutableMapOf() +} \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationExecutor.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationExecutor.kt index 9693074d5..9e509a791 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationExecutor.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationExecutor.kt @@ -32,7 +32,8 @@ import org.opensearch.threadpool.ThreadPool class ShardReplicationExecutor(executor: String, private val clusterService : ClusterService, private val threadPool: ThreadPool, private val client: Client, private val replicationMetadataManager: ReplicationMetadataManager, - private val replicationSettings: ReplicationSettings) : + private val replicationSettings: ReplicationSettings, + private val stats: FollowerClusterStats) : PersistentTasksExecutor(TASK_NAME, executor) { companion object { @@ -74,7 +75,7 @@ class ShardReplicationExecutor(executor: String, private val clusterService : Cl headers: Map): AllocatedPersistentTask { return ShardReplicationTask(id, type, action, getDescription(taskInProgress), parentTaskId, taskInProgress.params!!, executor, clusterService, threadPool, - client, replicationMetadataManager, replicationSettings) + client, replicationMetadataManager, replicationSettings, stats) } override fun getDescription(taskInProgress: PersistentTask): String { diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index 53e00d279..d109772ae 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -59,7 +59,7 @@ import java.time.Duration class ShardReplicationTask(id: Long, type: String, action: String, description: String, parentTask: TaskId, params: ShardReplicationParams, executor: String, clusterService: ClusterService, threadPool: ThreadPool, client: Client, replicationMetadataManager: ReplicationMetadataManager, - replicationSettings: ReplicationSettings) + replicationSettings: ReplicationSettings, private val followerClusterStats: FollowerClusterStats) : CrossClusterReplicationTask(id, type, action, description, parentTask, emptyMap(), executor, clusterService, threadPool, client, replicationMetadataManager, replicationSettings) { @@ -158,6 +158,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: * it continues to be called even after the task is completed. */ clusterService.removeListener(clusterStateListenerForTaskInterruption) + this.followerClusterStats.stats.remove(followerShardId) if (paused) { logDebug("Pausing and not removing lease for index $followerIndexName and shard $followerShardId task") return @@ -198,11 +199,12 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: logInfo("Adding retentionlease at follower sequence number: ${indexShard.lastSyncedGlobalCheckpoint}") retentionLeaseHelper.addRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint , followerShardId) addListenerToInterruptTask() + this.followerClusterStats.stats[followerShardId] = FollowerShardMetric() // Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job. val rateLimiter = Semaphore(replicationSettings.readersPerShard) val sequencer = TranslogSequencer(scope, replicationMetadata, followerShardId, leaderAlias, leaderShardId.indexName, - TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint) + TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats) val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) @@ -222,14 +224,20 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: logDebug("pushed to sequencer $fromSeqNo-$toSeqNo") changeTracker.updateBatchFetched(true, fromSeqNo, toSeqNo, changesResponse.changes.lastOrNull()?.seqNo() ?: fromSeqNo - 1, changesResponse.lastSyncedGlobalCheckpoint) + + followerClusterStats.stats[followerShardId]!!.remoteCheckpoint = changesResponse.lastSyncedGlobalCheckpoint + followerClusterStats.stats[followerShardId]!!.opsRead.addAndGet(changesResponse.changes.size.toLong()) + } catch (e: OpenSearchTimeoutException) { logInfo("Timed out waiting for new changes. Current seqNo: $fromSeqNo. $e") changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) } catch (e: NodeNotConnectedException) { + followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1) logInfo("Node not connected. Retrying request using a different node. ${e.stackTraceToString()}") delay(backOffForNodeDiscovery) changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) } catch (e: Exception) { + followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1) logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}") changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) @@ -248,6 +256,8 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: //renew retention lease with global checkpoint so that any shard that picks up shard replication task has data until then. try { retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint, followerShardId) + followerClusterStats.stats[followerShardId]!!.localCheckpoint = indexShard.lastSyncedGlobalCheckpoint + } catch (ex: Exception) { when (ex) { is RetentionLeaseInvalidRetainingSeqNoException, is RetentionLeaseNotFoundException -> { diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index 7b6fdceb7..412346ef3 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -29,6 +29,7 @@ import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog import org.opensearch.tasks.TaskId import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit /** * A TranslogSequencer allows multiple producers of [Translog.Operation]s to write them in sequence number order to an @@ -45,7 +46,8 @@ import java.util.concurrent.ConcurrentHashMap class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: ReplicationMetadata, private val followerShardId: ShardId, private val leaderAlias: String, private val leaderIndexName: String, - private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long) { + private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long, + private val followerClusterStats: FollowerClusterStats) { private val unAppliedChanges = ConcurrentHashMap() private val log = Loggers.getLogger(javaClass, followerShardId)!! @@ -62,13 +64,19 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: leaderAlias, leaderIndexName) replayRequest.parentTask = parentTaskId launch { + var relativeStartNanos = System.nanoTime() val replayResponse = client.suspendExecute(replicationMetadata, ReplayChangesAction.INSTANCE, replayRequest) if (replayResponse.shardInfo.failed > 0) { replayResponse.shardInfo.failures.forEachIndexed { i, failure -> log.error("Failed replaying changes. Failure:$i:$failure") } - throw org.opensearch.replication.ReplicationException("failed to replay changes", replayResponse.shardInfo.failures) + followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(replayResponse.shardInfo.failed.toLong()) + throw ReplicationException("failed to replay changes", replayResponse.shardInfo.failures) } + + val tookInNanos = System.nanoTime() - relativeStartNanos + followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos)) + followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong()) } highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark } diff --git a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt index d1a373bac..e7ad9c069 100644 --- a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt @@ -30,6 +30,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.test.OpenSearchTestCase.assertBusy import org.opensearch.test.rest.OpenSearchRestTestCase import org.junit.Assert +import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion.log import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -49,6 +50,7 @@ const val REST_REPLICATION_STATUS = "$REST_REPLICATION_PREFIX{index}/_status" const val REST_AUTO_FOLLOW_PATTERN = "${REST_REPLICATION_PREFIX}_autofollow" const val REST_REPLICATION_TASKS = "_tasks?actions=*replication*&detailed&pretty" const val REST_LEADER_STATS = "${REST_REPLICATION_PREFIX}leader_stats" +const val REST_FOLLOWER_STATS = "${REST_REPLICATION_PREFIX}follower_stats" const val INDEX_TASK_CANCELLATION_REASON = "Index replication task was cancelled by user" const val STATUS_REASON_USER_INITIATED = "User initiated" const val STATUS_REASON_SHARD_TASK_CANCELLED = "Shard task killed or cancelled." @@ -212,14 +214,14 @@ fun RestHighLevelClient.stopReplication(index: String, shouldWait: Boolean = tru } -fun RestHighLevelClient.pauseReplication(index: String, requestOptions: RequestOptions = RequestOptions.DEFAULT) { +fun RestHighLevelClient.pauseReplication(index: String, shouldWait: Boolean = true, requestOptions: RequestOptions = RequestOptions.DEFAULT) { val lowLevelPauseRequest = Request("POST", REST_REPLICATION_PAUSE.replace("{index}", index,true)) lowLevelPauseRequest.setJsonEntity("{}") lowLevelPauseRequest.setOptions(requestOptions) val lowLevelPauseResponse = lowLevelClient.performRequest(lowLevelPauseRequest) val response = getAckResponse(lowLevelPauseResponse) assertThat(response.isAcknowledged).withFailMessage("Replication could not be paused").isTrue() - waitForReplicationStop(index) + if (shouldWait) waitForReplicationStop(index) } fun RestHighLevelClient.resumeReplication(index: String, requestOptions: RequestOptions = RequestOptions.DEFAULT) { @@ -262,6 +264,14 @@ fun RestHighLevelClient.leaderStats() : Map { return statusResponse } +fun RestHighLevelClient.followerStats() : Map { + var request = Request("GET", REST_FOLLOWER_STATS) + request.setJsonEntity("{}") + val lowLevelStatusResponse = lowLevelClient.performRequest(request) + val statusResponse: Map = OpenSearchRestTestCase.entityAsMap(lowLevelStatusResponse) + return statusResponse +} + fun RestHighLevelClient.waitForNoInitializingShards() { val request = ClusterHealthRequest().waitForNoInitializingShards(true) .timeout(TimeValue.timeValueSeconds(70)) @@ -284,7 +294,10 @@ fun RestHighLevelClient.waitForReplicationStop(index: String, waitFor : TimeValu IndexReplicationExecutor.TASK_NAME + "*") val response = tasks().list(request,RequestOptions.DEFAULT) - assertThat(response.tasks) + //Index Task : "description" : "replication:source:[leader_index][0] -> [follower_index_3][0]", + //Shard Task : "description" : "replication:source:[leader_index/92E2lgyoTOW1n5o3sUhHag] -> follower_index_3", + var indexTask = response.tasks.filter { t -> t.description.contains(index) } + assertThat(indexTask) .withFailMessage("replication tasks not stopped.") .isEmpty() }, waitFor.seconds, TimeUnit.SECONDS) diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index f7aced74e..d6737cae8 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -66,6 +66,7 @@ import org.opensearch.index.mapper.MapperService import org.opensearch.repositories.fs.FsRepository import org.opensearch.test.OpenSearchTestCase.assertBusy import org.junit.Assert +import org.opensearch.replication.followerStats import org.opensearch.replication.leaderStats import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion.log import java.lang.Thread.sleep @@ -999,6 +1000,63 @@ class StartReplicationIT: MultiClusterRestTestCase() { } } + fun `test follower stats`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + val followerIndex2 = "follower_index_2" + val followerIndex3 = "follower_index_3" + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create( + CreateIndexRequest(leaderIndexName), + RequestOptions.DEFAULT + ) + assertThat(createIndexResponse.isAcknowledged).isTrue() + + try { + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true + ) + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndex2), + TimeValue.timeValueSeconds(10), + true + ) + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndex3), + TimeValue.timeValueSeconds(10), + true + ) + val docCount = 50 + for (i in 1..docCount) { + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) + } + + followerClient.pauseReplication(followerIndex2) + followerClient.stopReplication(followerIndex3) + + val stats = followerClient.followerStats() + assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1") + assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1") + assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1") + assertThat(stats.getValue("num_index_tasks").toString()).isEqualTo("1") + assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") + assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") + assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0") + assertThat(stats.containsKey("index_stats")) + assertThat(stats.size).isEqualTo(13) + } finally { + followerClient.stopReplication(followerIndexName) + followerClient.stopReplication(followerIndex2) + } + } + fun `test that replication cannot be started on invalid indexName`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index ae7c9bc3f..9449d4b88 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -80,9 +80,10 @@ class TranslogSequencerTests : OpenSearchTestCase() { @ExperimentalCoroutinesApi fun `test sequencer out of order`() = runBlockingTest { + val stats = FollowerClusterStats() val startSeqNo = randomNonNegativeLong() val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, - client, startSeqNo) + client, startSeqNo, stats) // Send requests out of order (shuffled seqNo) and await for them to be processed. var batchSeqNo = startSeqNo