From 1ee22b5989f07107114c85e5572fa90c9eadec31 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Tue, 7 Sep 2021 16:03:50 +0530 Subject: [PATCH] Follower stats API (#126) (#134) Signed-off-by: Gaurav Bafna --- .../replication/ReplicationPlugin.kt | 15 ++- .../action/stats/FollowerNodeStatsResponse.kt | 39 ++++++ .../action/stats/FollowerStatsAction.kt | 27 ++++ .../action/stats/FollowerStatsRequest.kt | 42 ++++++ .../action/stats/FollowerStatsResponse.kt | 123 ++++++++++++++++++ .../action/stats/LeaderNodeStatsResponse.kt | 6 +- .../action/stats/LeaderStatsResponse.kt | 38 ++---- .../stats/TransportFollowerStatsAction.kt | 64 +++++++++ .../stats/TransportLeaderStatsAction.kt | 5 +- .../action/status/TranportShardsInfoAction.kt | 6 +- .../replication/rest/FollowerStatsHandler.kt | 48 +++++++ .../replication/seqno/RemoteClusterStats.kt | 38 ++++-- .../task/shard/FollowerClusterStats.kt | 119 +++++++++++++++++ .../task/shard/ShardReplicationExecutor.kt | 5 +- .../task/shard/ShardReplicationTask.kt | 24 +++- .../task/shard/TranslogSequencer.kt | 13 +- .../replication/ReplicationHelpers.kt | 18 ++- .../integ/rest/StartReplicationIT.kt | 58 +++++++++ .../task/shard/TranslogSequencerTests.kt | 3 +- 19 files changed, 621 insertions(+), 70 deletions(-) create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerNodeStatsResponse.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsAction.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsRequest.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsResponse.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportFollowerStatsAction.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/rest/FollowerStatsHandler.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/FollowerClusterStats.kt diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt index 545a4a8d..c196d42a 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt @@ -15,6 +15,10 @@ package com.amazon.elasticsearch.replication +import com.amazon.elasticsearch.replication.action.stats.FollowerStatsHandler +import com.amazon.elasticsearch.replication.action.stats.TransportFollowerStatsAction +import com.amazon.elasticsearch.replication.task.shard.FollowerClusterStats +import org.elasticsearch.replication.action.stats.FollowerStatsAction import com.amazon.elasticsearch.replication.action.autofollow.AutoFollowMasterNodeAction import com.amazon.elasticsearch.replication.action.autofollow.TransportAutoFollowMasterNodeAction import com.amazon.elasticsearch.replication.action.autofollow.TransportUpdateAutoFollowPatternAction @@ -144,6 +148,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, private lateinit var threadPool: ThreadPool private lateinit var replicationMetadataManager: ReplicationMetadataManager private lateinit var replicationSettings: ReplicationSettings + private var followerClusterStats = FollowerClusterStats() companion object { const val REPLICATION_EXECUTOR_NAME_LEADER = "replication_leader" @@ -186,7 +191,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client, ReplicationMetadataStore(client, clusterService, xContentRegistry)) this.replicationSettings = ReplicationSettings(clusterService) - return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings) + return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings, followerClusterStats) } override fun getGuiceServiceClasses(): Collection> { @@ -215,7 +220,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, ActionHandler(UpdateReplicationStateAction.INSTANCE, TransportUpdateReplicationStateDetails::class.java), ActionHandler(ShardsInfoAction.INSTANCE, TranportShardsInfoAction::class.java), ActionHandler(ReplicationStatusAction.INSTANCE,TransportReplicationStatusAction::class.java), - ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java) + ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java), + ActionHandler(FollowerStatsAction.INSTANCE, TransportFollowerStatsAction::class.java) ) } @@ -231,7 +237,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, UpdateIndexHandler(), StopIndexReplicationHandler(), ReplicationStatusHandler(), - LeaderStatsHandler()) + LeaderStatsHandler(), + FollowerStatsHandler()) } override fun getExecutorBuilders(settings: Settings): List> { @@ -265,7 +272,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, expressionResolver: IndexNameExpressionResolver) : List> { return listOf( - ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings), + ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, followerClusterStats), IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, settingsModule), AutoFollowExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings)) } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerNodeStatsResponse.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerNodeStatsResponse.kt new file mode 100644 index 00000000..a35b40df --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerNodeStatsResponse.kt @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The elasticsearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright elasticsearch Contributors. See + * GitHub history for details. + */ + +package org.elasticsearch.replication.action.stats + +import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric +import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric.FollowerStats +import org.elasticsearch.action.support.nodes.BaseNodeResponse +import org.elasticsearch.cluster.node.DiscoveryNode +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.index.shard.ShardId +import java.io.IOException + +class FollowerNodeStatsResponse : BaseNodeResponse { + var stats :Map + + constructor(inp: StreamInput) : super(inp) { + stats = inp.readMap(::ShardId, ::FollowerStats) + } + + constructor(node : DiscoveryNode, remoteClusterStats: Map) : super(node) { + stats = remoteClusterStats.mapValues { (_ , v) -> v.createStats() } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeMap(stats, { o, k -> k.writeTo(o)}, { o, v -> v.writeTo(o)}) + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsAction.kt new file mode 100644 index 00000000..6fc9f638 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsAction.kt @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.elasticsearch.replication.action.stats + +import com.amazon.elasticsearch.replication.action.stats.FollowerStatsResponse +import org.elasticsearch.action.ActionType +import org.elasticsearch.common.io.stream.Writeable + +class FollowerStatsAction : ActionType(NAME, reader) { + companion object { + const val NAME = "indices:admin/plugins/replication/follower/stats" + val INSTANCE = FollowerStatsAction() + val reader = Writeable.Reader { inp -> FollowerStatsResponse(inp) } + } + + override fun getResponseReader(): Writeable.Reader = reader +} + diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsRequest.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsRequest.kt new file mode 100644 index 00000000..e59069cb --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsRequest.kt @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The elasticsearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright elasticsearch Contributors. See + * GitHub history for details. + */ + +package org.elasticsearch.replication.action.stats + +import org.elasticsearch.action.support.nodes.BaseNodesRequest +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import java.io.IOException + +/** + * A request to get node (cluster) level replication stats. + */ +class FollowerStatsRequest : BaseNodesRequest { + + /** + * + * @param in A stream input object. + * @throws IOException if the stream cannot be deserialized. + */ + constructor(`in`: StreamInput) : super(`in`) + + /** + * Get information from nodes based on the nodes ids specified. If none are passed, information + * for all nodes will be returned. + */ + constructor(vararg nodesIds: String) : super(*nodesIds) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + } + +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsResponse.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsResponse.kt new file mode 100644 index 00000000..7e0d5963 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/FollowerStatsResponse.kt @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.elasticsearch.replication.action.stats + + +import com.amazon.elasticsearch.replication.metadata.ReplicationOverallState +import com.amazon.elasticsearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE +import com.amazon.elasticsearch.replication.metadata.state.ReplicationStateMetadata +import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric +import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric.FollowerStats +import org.apache.logging.log4j.LogManager +import org.elasticsearch.action.FailedNodeException +import org.elasticsearch.action.support.nodes.BaseNodesResponse +import org.elasticsearch.cluster.ClusterName +import org.elasticsearch.common.Strings +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS +import org.elasticsearch.common.xcontent.ToXContent.Params +import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentFactory +import org.elasticsearch.index.shard.ShardId +import org.elasticsearch.replication.action.stats.FollowerNodeStatsResponse +import java.io.IOException + +class FollowerStatsResponse : BaseNodesResponse, ToXContentObject { + var shardStats :MutableMap = mutableMapOf() + var indexStats :MutableMap = mutableMapOf() + var stats : FollowerShardMetric.FollowerStatsFragment = FollowerShardMetric.FollowerStatsFragment() + + var pausedIndices :Int = 0 + var failedIndices :Int = 0 + var bootstrappingIndices :Int = 0 + var syncingIndices :Int = 0 + var shardTaskCount :Int = 0 + var indexTaskCount :Int = 0 + + companion object { + private val log = LogManager.getLogger(FollowerStatsResponse::class.java) + } + + constructor(inp: StreamInput) : super(inp) { + shardStats = inp.readMap(::ShardId, ::FollowerStats) + } + + constructor(clusterName: ClusterName?, followerNodeResponse: List?, failures: List? + , metadata : ReplicationStateMetadata) : super(clusterName, followerNodeResponse, failures) { + + var syncing :MutableSet = mutableSetOf() + if (followerNodeResponse != null) { + for (response in followerNodeResponse) { + shardStats.putAll(response.stats) + + for (i in response.stats) { + syncing.add(i.key.indexName) + + if (i.key.indexName !in indexStats) { + indexStats[i.key.indexName] = FollowerShardMetric.FollowerStats() + } + indexStats[i.key.indexName]!!.add(i.value) + + stats.add(i.value) + } + } + } + + var totalRunning = 0 //includes boostrap and syncing + for (entry in metadata.replicationDetails) { + when (entry.value[REPLICATION_LAST_KNOWN_OVERALL_STATE]) { + ReplicationOverallState.RUNNING.name -> totalRunning++ + ReplicationOverallState.FAILED.name -> failedIndices++ + ReplicationOverallState.PAUSED.name -> pausedIndices++ + } + } + + syncingIndices = syncing.size + bootstrappingIndices = totalRunning - syncingIndices + + shardTaskCount = shardStats.size + indexTaskCount = totalRunning + } + + @Throws(IOException::class) + override fun readNodesFrom(inp: StreamInput): List { + return inp.readList { FollowerNodeStatsResponse(inp) } + } + + @Throws(IOException::class) + override fun writeNodesTo(out: StreamOutput, leaderNodeRespons: List?) { + out.writeList(leaderNodeRespons) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder { + builder.startObject() + builder.field("num_syncing_indices", syncingIndices) + builder.field("num_bootstrapping_indices", bootstrappingIndices) + builder.field("num_paused_indices", pausedIndices) + builder.field("num_shard_tasks", shardTaskCount) + builder.field("num_index_tasks", indexTaskCount) + stats.toXContent(builder, params) + builder.field("index_stats").map(indexStats) + builder.endObject() + return builder + } + + override fun toString(): String { + val builder: XContentBuilder = XContentFactory.jsonBuilder().prettyPrint() + toXContent(builder, EMPTY_PARAMS) + return Strings.toString(builder) + } +} + diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderNodeStatsResponse.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderNodeStatsResponse.kt index 4edfc592..b0f36dea 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderNodeStatsResponse.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderNodeStatsResponse.kt @@ -12,7 +12,7 @@ package com.amazon.elasticsearch.replication.action.stats import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric -import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteShardStats +import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteStats import org.elasticsearch.action.support.nodes.BaseNodeResponse import org.elasticsearch.cluster.node.DiscoveryNode import org.elasticsearch.common.io.stream.StreamInput @@ -21,10 +21,10 @@ import org.elasticsearch.index.shard.ShardId import java.io.IOException class LeaderNodeStatsResponse : BaseNodeResponse { - var remoteStats :Map + var remoteStats :Map constructor(inp: StreamInput) : super(inp) { - remoteStats = inp.readMap(::ShardId, ::RemoteShardStats) + remoteStats = inp.readMap(::ShardId, ::RemoteStats) } constructor(node : DiscoveryNode, remoteClusterStats: Map) : super(node) { diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderStatsResponse.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderStatsResponse.kt index 323c4f2d..2590e483 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderStatsResponse.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/LeaderStatsResponse.kt @@ -13,7 +13,7 @@ package com.amazon.elasticsearch.replication.action.stats import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric -import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteShardStats +import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteStats import org.apache.logging.log4j.LogManager import org.elasticsearch.action.FailedNodeException import org.elasticsearch.action.support.nodes.BaseNodesResponse @@ -28,41 +28,27 @@ import org.elasticsearch.common.xcontent.XContentBuilder import org.elasticsearch.common.xcontent.XContentFactory import java.io.IOException + class LeaderStatsResponse : BaseNodesResponse, ToXContentObject { - var remoteStats :MutableMap = mutableMapOf() - var ops :Long = 0 - var tlogSize :Long = 0 - var opsLucene :Long = 0 - var opsTlog :Long = 0 - var latencyLucene :Long = 0 - var latencyTlog :Long = 0 - var bytesRead :Long = 0 + var remoteStats :MutableMap = mutableMapOf() + var stats = RemoteShardMetric.RemoteStatsFrag() companion object { private val log = LogManager.getLogger(LeaderStatsResponse::class.java) } constructor(inp: StreamInput) : super(inp) { - remoteStats = inp.readMap(StreamInput::readString, ::RemoteShardStats) - } - - fun add(stat :RemoteShardStats) { - ops += stat.ops - tlogSize += stat.tlogSize - opsLucene += stat.opsLucene - opsTlog += stat.opsTlog - latencyLucene += stat.latencyLucene - latencyTlog += stat.latencyTlog - bytesRead += stat.bytesRead + remoteStats = inp.readMap(StreamInput::readString, ::RemoteStats) } + constructor(clusterName: ClusterName?, leaderNodeRespons: List?, failures: List?) : super(clusterName, leaderNodeRespons, failures) { if (leaderNodeRespons != null) { for (response in leaderNodeRespons) { for (entry in response.remoteStats) { - remoteStats[entry.key.indexName] = remoteStats.getOrDefault(entry.key.indexName, RemoteShardStats()) + remoteStats[entry.key.indexName] = remoteStats.getOrDefault(entry.key.indexName, RemoteStats()) remoteStats[entry.key.indexName]!!.add(entry.value) - add(entry.value) + stats.add(entry.value) } } } @@ -82,13 +68,7 @@ class LeaderStatsResponse : BaseNodesResponse, ToXCont override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder { builder.startObject() builder.field("num_replicated_indices", remoteStats.size) - builder.field("operations_read", ops) - builder.field("translog_size_bytes", tlogSize) - builder.field("operations_read_lucene", opsLucene) - builder.field("operations_read_translog", opsTlog) - builder.field("total_read_time_lucene_millis", latencyLucene) - builder.field("total_read_time_translog_millis", latencyTlog) - builder.field("bytes_read", bytesRead) + stats.toXContent(builder, params) builder.field("index_details").map(remoteStats) builder.endObject() return builder diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportFollowerStatsAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportFollowerStatsAction.kt new file mode 100644 index 00000000..09dbbf37 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportFollowerStatsAction.kt @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The elasticsearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright elasticsearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.elasticsearch.replication.action.stats + +import com.amazon.elasticsearch.replication.metadata.state.ReplicationStateMetadata +import com.amazon.elasticsearch.replication.seqno.RemoteClusterStats +import com.amazon.elasticsearch.replication.task.shard.FollowerClusterStats +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope +import org.apache.logging.log4j.LogManager +import org.elasticsearch.action.FailedNodeException +import org.elasticsearch.action.support.ActionFilters +import org.elasticsearch.action.support.nodes.TransportNodesAction +import org.elasticsearch.client.node.NodeClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.replication.action.stats.FollowerNodeStatsResponse +import org.elasticsearch.replication.action.stats.FollowerStatsAction +import org.elasticsearch.replication.action.stats.FollowerStatsRequest +import org.elasticsearch.threadpool.ThreadPool +import org.elasticsearch.transport.TransportService + +class TransportFollowerStatsAction @Inject constructor(transportService: TransportService, + clusterService: ClusterService, + threadPool: ThreadPool, + actionFilters: ActionFilters, + private val remoteStats: RemoteClusterStats, + private val client: NodeClient, + private val followerStats: FollowerClusterStats) : + TransportNodesAction(FollowerStatsAction.NAME, + threadPool, clusterService, transportService, actionFilters, ::FollowerStatsRequest, ::NodeStatsRequest, ThreadPool.Names.MANAGEMENT, + FollowerNodeStatsResponse::class.java), CoroutineScope by GlobalScope { + + companion object { + private val log = LogManager.getLogger(TransportFollowerStatsAction::class.java) + } + + override fun newNodeRequest(request: FollowerStatsRequest): NodeStatsRequest { + return NodeStatsRequest() + } + + override fun newNodeResponse(input: StreamInput): FollowerNodeStatsResponse { + return FollowerNodeStatsResponse(input) + } + + override fun newResponse(request: FollowerStatsRequest?, responses: MutableList?, failures: MutableList?): FollowerStatsResponse { + val metadata = clusterService.state().metadata().custom(ReplicationStateMetadata.NAME) ?: ReplicationStateMetadata.EMPTY + return FollowerStatsResponse(clusterService.clusterName, responses, failures, metadata) + } + + override fun nodeOperation(nodeStatRequest: NodeStatsRequest?): FollowerNodeStatsResponse { + return FollowerNodeStatsResponse(this.clusterService.localNode(), followerStats.stats) + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportLeaderStatsAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportLeaderStatsAction.kt index 07c05ded..5f8ec096 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportLeaderStatsAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stats/TransportLeaderStatsAction.kt @@ -67,15 +67,14 @@ class TransportLeaderStatsAction @Inject constructor(transportService: Transport val indexService = indicesService.indexService(shardId.index) ?: return false val indexShard = indexService.getShard(shardId.id) ?: return false - var leaseExist = false val retentionLeases = indexShard.getRetentionLeases().leases() for (retentionLease in retentionLeases) { if (retentionLease.id().startsWith(RETENTION_LEASE_PREFIX)) { - leaseExist = true + return true } } - return leaseExist + return false } override fun nodeOperation(nodeStatRequest: NodeStatsRequest?): LeaderNodeStatsResponse { diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TranportShardsInfoAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TranportShardsInfoAction.kt index 166e7483..2d969501 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TranportShardsInfoAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TranportShardsInfoAction.kt @@ -24,13 +24,11 @@ import java.util.* class TranportShardsInfoAction @Inject constructor(clusterService: ClusterService, transportService: TransportService, - replicationMetadataManager: ReplicationMetadataManager, threadPool: ThreadPool, actionFilters: ActionFilters, - client: Client, indexNameExpressionResolver: IndexNameExpressionResolver?, private val indicesService: IndicesService - ) +) : TransportBroadcastByNodeAction< ShardInfoRequest, ReplicationStatusResponse, @@ -74,7 +72,7 @@ class TranportShardsInfoAction @Inject constructor(clusterService: ClusterServi val indexShard = indexService.getShard(shardRouting.shardId().id()) var indexState = indexShard.recoveryState().index - if (indexShard.recoveryState().recoverySource.type.equals(org.elasticsearch.cluster.routing.RecoverySource.Type.SNAPSHOT) and + if (indexShard.recoveryState().recoverySource.type.equals(RecoverySource.Type.SNAPSHOT) and (indexState.recoveredBytesPercent() <100)) { return ShardInfoResponse(shardRouting.shardId(),"BOOTSTRAPPING", RestoreDetails(indexState.totalBytes(), indexState.recoveredBytes(), diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/rest/FollowerStatsHandler.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/rest/FollowerStatsHandler.kt new file mode 100644 index 00000000..096721e5 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/rest/FollowerStatsHandler.kt @@ -0,0 +1,48 @@ +package com.amazon.elasticsearch.replication.action.stats + +import org.apache.logging.log4j.LogManager +import org.elasticsearch.client.node.NodeClient +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentFactory +import org.elasticsearch.replication.action.stats.FollowerStatsAction +import org.elasticsearch.replication.action.stats.FollowerStatsRequest +import org.elasticsearch.rest.BaseRestHandler +import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer +import org.elasticsearch.rest.BytesRestResponse +import org.elasticsearch.rest.RestChannel +import org.elasticsearch.rest.RestHandler +import org.elasticsearch.rest.RestRequest +import org.elasticsearch.rest.RestResponse +import org.elasticsearch.rest.RestStatus +import org.elasticsearch.rest.action.RestResponseListener +import java.io.IOException + +class FollowerStatsHandler : BaseRestHandler() { + companion object { + private val log = LogManager.getLogger(FollowerStatsHandler::class.java) + } + + override fun routes(): List { + return listOf(RestHandler.Route(RestRequest.Method.GET, "/_plugins/_replication/follower_stats")) + } + + override fun getName(): String { + return "plugins_follower_replication_stats" + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val statsRequest = FollowerStatsRequest() + return RestChannelConsumer { channel: RestChannel? -> + client.admin().cluster() + .execute(FollowerStatsAction.INSTANCE, statsRequest, object : RestResponseListener(channel) { + @Throws(Exception::class) + override fun buildResponse(nodesStatsResponse: FollowerStatsResponse): RestResponse? { + val builder: XContentBuilder = XContentFactory.jsonBuilder().prettyPrint() + return BytesRestResponse(RestStatus.OK, nodesStatsResponse.toXContent(builder, ToXContent.EMPTY_PARAMS)) + } + }) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterStats.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterStats.kt index da87c158..415536fd 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterStats.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterStats.kt @@ -16,7 +16,7 @@ import org.elasticsearch.common.inject.Singleton import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.io.stream.StreamOutput import org.elasticsearch.common.xcontent.ToXContent -import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.ToXContentFragment import org.elasticsearch.common.xcontent.XContentBuilder import org.elasticsearch.index.shard.ShardId import java.util.concurrent.atomic.AtomicLong @@ -36,13 +36,13 @@ class RemoteShardMetric { /** * Creates a serializable representation for these metrics. */ - fun createStats() : RemoteShardStats { - return RemoteShardStats(ops.get(), tlogSize.get(), opsLucene.get(), opsTlog.get(), + fun createStats() : RemoteStats { + return RemoteStats(ops.get(), tlogSize.get(), opsLucene.get(), opsTlog.get(), latencyLucene.get(), latencyTlog.get(), bytesRead.get(), lastFetchTime.get()) } - class RemoteShardStats(ops :Long=0, tlogSize :Long=0, opsLucene :Long=0, opsTlog :Long=0, latencyLucene :Long=0, latencyTlog :Long=0, - bytesRead :Long=0, lastFetchTime :Long=0) : ToXContentObject { + open class RemoteStats(ops :Long=0, tlogSize :Long=0, opsLucene :Long=0, opsTlog :Long=0, latencyLucene :Long=0, latencyTlog :Long=0, + bytesRead :Long=0, lastFetchTime :Long=0) : ToXContentFragment { var ops = ops var tlogSize = tlogSize @@ -55,16 +55,21 @@ class RemoteShardMetric { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { builder.startObject() - builder.field("operations_read", ops) - builder.field("translog_size_bytes", tlogSize) - builder.field("operations_read_lucene", opsLucene) - builder.field("operations_read_translog", opsTlog) - builder.field("total_read_time_lucene_millis", latencyLucene) - builder.field("total_read_time_translog_millis", latencyTlog) - builder.field("bytes_read", bytesRead) + toXContentFragment(builder, params) return builder.endObject() } + fun toXContentFragment(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.field("operations_read", ops) + builder.field("translog_size_bytes", tlogSize) + builder.field("operations_read_lucene", opsLucene) + builder.field("operations_read_translog", opsTlog) + builder.field("total_read_time_lucene_millis", latencyLucene) + builder.field("total_read_time_translog_millis", latencyTlog) + builder.field("bytes_read", bytesRead) + return builder + } + constructor(inp: StreamInput) : this() { ops = inp.readLong() tlogSize = inp.readLong() @@ -87,7 +92,7 @@ class RemoteShardMetric { out.writeLong(lastFetchTime) } - fun add(stat :RemoteShardStats): RemoteShardStats { + fun add(stat :RemoteStats): RemoteStats { var newStat = this newStat.ops += stat.ops newStat.tlogSize += stat.tlogSize @@ -101,6 +106,13 @@ class RemoteShardMetric { return newStat } } + + class RemoteStatsFrag() : RemoteShardMetric.RemoteStats(), ToXContentFragment { + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + return toXContentFragment(builder, params) + } + } + } @Singleton diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/FollowerClusterStats.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/FollowerClusterStats.kt new file mode 100644 index 00000000..a9cfbe11 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/FollowerClusterStats.kt @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The elasticsearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright elasticsearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.elasticsearch.replication.task.shard + +import org.apache.logging.log4j.LogManager +import org.elasticsearch.common.inject.Singleton +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentFragment +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentType +import org.elasticsearch.index.shard.ShardId +import java.util.concurrent.atomic.AtomicLong + +class FollowerShardMetric { + var followerCheckpoint: Long = 0L + var leaderCheckpoint: Long = 0L + var opsWritten :AtomicLong = AtomicLong() + var opsWriteFailures :AtomicLong = AtomicLong() + var opsWriteThrottles :AtomicLong = AtomicLong() + var opsRead :AtomicLong = AtomicLong() + var opsReadFailures :AtomicLong = AtomicLong() + var opsReadThrottles :AtomicLong = AtomicLong() + val totalWriteTime :AtomicLong = AtomicLong() + + constructor() + + companion object { + private val log = LogManager.getLogger(FollowerShardMetric::class.java) + } + + /** + * Creates a serializable representation for these metrics. + */ + fun createStats() : FollowerStats { + return FollowerStats(opsWritten.get(), opsWriteFailures.get(), opsWriteThrottles.get(), opsRead.get(), opsReadFailures.get(), opsReadThrottles.get(), + followerCheckpoint, leaderCheckpoint, totalWriteTime.get()) + } + + // this can represent stats for an index as well as for a shard + open class FollowerStats(var opsWritten: Long=0, var opsWriteFailures: Long=0, var opsWriteThrottles: Long=0, var opsRead: Long=0, var opsReadFailures: Long=0, var opsReadThrottles : Long=0, + var followerCheckpoint: Long=0, var leaderCheckpoint: Long=0, var totalWriteTime: Long=0) : ToXContentFragment { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + toXContentFragment(builder, params) + return builder.endObject() + } + + fun toXContentFragment(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.field("operations_written", opsWritten) + builder.field("operations_read", opsRead) + builder.field("failed_read_requests", opsReadFailures) + builder.field("throttled_read_requests", opsReadThrottles) + builder.field("failed_write_requests", opsWriteFailures) + builder.field("throttled_write_requests", opsWriteThrottles) + builder.field("follower_checkpoint", followerCheckpoint) + builder.field("leader_checkpoint", leaderCheckpoint) + builder.field("total_write_time_millis", totalWriteTime) + return builder + } + + constructor(inp: StreamInput) : this() { + opsWritten = inp.readLong() + opsWriteFailures = inp.readLong() + opsRead = inp.readLong() + opsReadFailures = inp.readLong() + opsReadThrottles = inp.readLong() + followerCheckpoint = inp.readLong() + leaderCheckpoint = inp.readLong() + totalWriteTime = inp.readLong() + } + + fun writeTo(out: StreamOutput) { + out.writeLong(opsWritten) + out.writeLong(opsWriteFailures) + out.writeLong(opsRead) + out.writeLong(opsReadFailures) + out.writeLong(opsReadThrottles) + out.writeLong(followerCheckpoint) + out.writeLong(leaderCheckpoint) + out.writeLong(totalWriteTime) + } + + fun add(stat: FollowerStats) { + opsWritten += stat.opsWritten + opsWriteFailures += stat.opsWriteFailures + opsWriteThrottles += stat.opsWriteThrottles + opsRead += stat.opsRead + opsReadFailures += stat.opsReadFailures + opsReadThrottles += stat.opsReadThrottles + followerCheckpoint += stat.followerCheckpoint + leaderCheckpoint += stat.leaderCheckpoint + totalWriteTime += stat.totalWriteTime + } + } + + // used only for cluster aggregation + class FollowerStatsFragment(): FollowerStats(), ToXContentFragment { + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + return toXContentFragment(builder, params) + } + } +} + +@Singleton +class FollowerClusterStats { + var stats :MutableMap = mutableMapOf() +} \ No newline at end of file diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationExecutor.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationExecutor.kt index 7462fa4b..ede69495 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationExecutor.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationExecutor.kt @@ -36,7 +36,8 @@ import org.elasticsearch.threadpool.ThreadPool class ShardReplicationExecutor(executor: String, private val clusterService : ClusterService, private val threadPool: ThreadPool, private val client: Client, private val replicationMetadataManager: ReplicationMetadataManager, - private val replicationSettings: ReplicationSettings) : + private val replicationSettings: ReplicationSettings, + private val stats: FollowerClusterStats) : PersistentTasksExecutor(TASK_NAME, executor) { companion object { @@ -78,7 +79,7 @@ class ShardReplicationExecutor(executor: String, private val clusterService : Cl headers: Map): AllocatedPersistentTask { return ShardReplicationTask(id, type, action, getDescription(taskInProgress), parentTaskId, taskInProgress.params!!, executor, clusterService, threadPool, - client, replicationMetadataManager, replicationSettings) + client, replicationMetadataManager, replicationSettings, stats) } override fun getDescription(taskInProgress: PersistentTask): String { diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt index 29597721..477e6915 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt @@ -63,7 +63,7 @@ import java.time.Duration class ShardReplicationTask(id: Long, type: String, action: String, description: String, parentTask: TaskId, params: ShardReplicationParams, executor: String, clusterService: ClusterService, threadPool: ThreadPool, client: Client, replicationMetadataManager: ReplicationMetadataManager, - replicationSettings: ReplicationSettings) + replicationSettings: ReplicationSettings, private val followerClusterStats: FollowerClusterStats) : CrossClusterReplicationTask(id, type, action, description, parentTask, emptyMap(), executor, clusterService, threadPool, client, replicationMetadataManager, replicationSettings) { @@ -170,6 +170,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: * it continues to be called even after the task is completed. */ clusterService.removeListener(clusterStateListenerForTaskInterruption) + this.followerClusterStats.stats.remove(followerShardId) if (paused) { logDebug("Pausing and not removing lease for index $followerIndexName and shard $followerShardId task") return @@ -210,11 +211,12 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: logInfo("Adding retentionlease at follower sequence number: ${indexShard.lastSyncedGlobalCheckpoint}") retentionLeaseHelper.addRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint , followerShardId) addListenerToInterruptTask() + this.followerClusterStats.stats[followerShardId] = FollowerShardMetric() // Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job. val rateLimiter = Semaphore(replicationSettings.readersPerShard) val sequencer = TranslogSequencer(scope, replicationMetadata, followerShardId, leaderAlias, leaderShardId.indexName, - TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint) + TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats) val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) @@ -242,20 +244,25 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: logInfo("Timed out waiting for new changes. Current seqNo: $fromSeqNo. $e") changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) } catch (e: NodeNotConnectedException) { + followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1) logInfo("Node not connected. Retrying request using a different node. ${e.stackTraceToString()}") delay(backOffForRetry) backOffForRetry = (backOffForRetry * factor).toLong().coerceAtMost(maxTimeOut) changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) } catch (e: Exception) { + followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1) logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}") changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) // Propagate 4xx exceptions up the chain and halt replication as they are irrecoverable val range4xx = 400.rangeTo(499) if (e is ElasticsearchException && - range4xx.contains(e.status().status) && - e.status().status != RestStatus.TOO_MANY_REQUESTS.status) { - throw e + range4xx.contains(e.status().status) ) { + if (e.status().status == RestStatus.TOO_MANY_REQUESTS.status) { + followerClusterStats.stats[followerShardId]!!.opsReadThrottles.addAndGet(1) + } else { + throw e + } } delay(backOffForRetry) backOffForRetry = (backOffForRetry * factor).toLong().coerceAtMost(maxTimeOut) @@ -267,6 +274,8 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: //renew retention lease with global checkpoint so that any shard that picks up shard replication task has data until then. try { retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint, followerShardId) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint + } catch (ex: Exception) { when (ex) { is RetentionLeaseInvalidRetainingSeqNoException, is RetentionLeaseNotFoundException -> { @@ -283,8 +292,11 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: private suspend fun getChanges(fromSeqNo: Long, toSeqNo: Long): GetChangesResponse { val remoteClient = client.getRemoteClusterClient(leaderAlias) val request = GetChangesRequest(leaderShardId, fromSeqNo, toSeqNo) - return remoteClient.suspendExecuteWithRetries(replicationMetadata = replicationMetadata, + var changesResp = remoteClient.suspendExecuteWithRetries(replicationMetadata = replicationMetadata, action = GetChangesAction.INSTANCE, req = request, log = log) + followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = changesResp.lastSyncedGlobalCheckpoint + followerClusterStats.stats[followerShardId]!!.opsRead.addAndGet(changesResp.changes.size.toLong()) + return changesResp } private fun logDebug(msg: String) { log.debug("${Thread.currentThread().name}: $msg") diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencer.kt index 6e3ad2c7..36df798d 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencer.kt @@ -20,6 +20,7 @@ import com.amazon.elasticsearch.replication.action.changes.GetChangesResponse import com.amazon.elasticsearch.replication.action.replay.ReplayChangesAction import com.amazon.elasticsearch.replication.action.replay.ReplayChangesRequest import com.amazon.elasticsearch.replication.metadata.store.ReplicationMetadata +import com.amazon.elasticsearch.replication.util.suspendExecute import com.amazon.elasticsearch.replication.util.suspendExecuteWithRetries import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope @@ -33,6 +34,7 @@ import org.elasticsearch.index.shard.ShardId import org.elasticsearch.index.translog.Translog import org.elasticsearch.tasks.TaskId import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit /** * A TranslogSequencer allows multiple producers of [Translog.Operation]s to write them in sequence number order to an @@ -49,7 +51,8 @@ import java.util.concurrent.ConcurrentHashMap class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: ReplicationMetadata, private val followerShardId: ShardId, private val leaderAlias: String, private val leaderIndexName: String, - private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long) { + private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long, + private val followerClusterStats: FollowerClusterStats) { private val unAppliedChanges = ConcurrentHashMap() private val log = Loggers.getLogger(javaClass, followerShardId)!! @@ -66,13 +69,19 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: leaderAlias, leaderIndexName) replayRequest.parentTask = parentTaskId launch { - val replayResponse = client.suspendExecuteWithRetries(replicationMetadata, ReplayChangesAction.INSTANCE, replayRequest, log = log) + var relativeStartNanos = System.nanoTime() + val replayResponse = client.suspendExecute(replicationMetadata, ReplayChangesAction.INSTANCE, replayRequest) if (replayResponse.shardInfo.failed > 0) { replayResponse.shardInfo.failures.forEachIndexed { i, failure -> log.error("Failed replaying changes. Failure:$i:$failure") } + followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(replayResponse.shardInfo.failed.toLong()) throw ReplicationException("failed to replay changes", replayResponse.shardInfo.failures) } + + val tookInNanos = System.nanoTime() - relativeStartNanos + followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos)) + followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong()) } highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark } diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt index f086511a..782ea223 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt @@ -53,6 +53,7 @@ const val REST_REPLICATION_STATUS = "$REST_REPLICATION_PREFIX{index}/_status" const val REST_AUTO_FOLLOW_PATTERN = "${REST_REPLICATION_PREFIX}_autofollow" const val REST_REPLICATION_TASKS = "_tasks?actions=*replication*&detailed&pretty" const val REST_LEADER_STATS = "${REST_REPLICATION_PREFIX}leader_stats" +const val REST_FOLLOWER_STATS = "${REST_REPLICATION_PREFIX}follower_stats" const val INDEX_TASK_CANCELLATION_REASON = "Index replication task was cancelled by user" const val STATUS_REASON_USER_INITIATED = "User initiated" const val STATUS_REASON_SHARD_TASK_CANCELLED = "Shard task killed or cancelled." @@ -216,14 +217,14 @@ fun RestHighLevelClient.stopReplication(index: String, shouldWait: Boolean = tru } -fun RestHighLevelClient.pauseReplication(index: String, requestOptions: RequestOptions = RequestOptions.DEFAULT) { +fun RestHighLevelClient.pauseReplication(index: String, shouldWait: Boolean = true, requestOptions: RequestOptions = RequestOptions.DEFAULT) { val lowLevelPauseRequest = Request("POST", REST_REPLICATION_PAUSE.replace("{index}", index,true)) lowLevelPauseRequest.setJsonEntity("{}") lowLevelPauseRequest.setOptions(requestOptions) val lowLevelPauseResponse = lowLevelClient.performRequest(lowLevelPauseRequest) val response = getAckResponse(lowLevelPauseResponse) assertThat(response.isAcknowledged).withFailMessage("Replication could not be paused").isTrue() - waitForReplicationStop(index) + if (shouldWait) waitForReplicationStop(index) } fun RestHighLevelClient.resumeReplication(index: String, requestOptions: RequestOptions = RequestOptions.DEFAULT) { @@ -266,6 +267,14 @@ fun RestHighLevelClient.leaderStats() : Map { return statusResponse } +fun RestHighLevelClient.followerStats() : Map { + var request = Request("GET", REST_FOLLOWER_STATS) + request.setJsonEntity("{}") + val lowLevelStatusResponse = lowLevelClient.performRequest(request) + val statusResponse: Map = ESRestTestCase.entityAsMap(lowLevelStatusResponse) + return statusResponse +} + fun RestHighLevelClient.waitForNoInitializingShards() { val request = ClusterHealthRequest().waitForNoInitializingShards(true) .timeout(TimeValue.timeValueSeconds(70)) @@ -288,7 +297,10 @@ fun RestHighLevelClient.waitForReplicationStop(index: String, waitFor : TimeValu IndexReplicationExecutor.TASK_NAME + "*") val response = tasks().list(request,RequestOptions.DEFAULT) - assertThat(response.tasks) + //Index Task : "description" : "replication:source:[leader_index][0] -> [follower_index_3][0]", + //Shard Task : "description" : "replication:source:[leader_index/92E2lgyoTOW1n5o3sUhHag] -> follower_index_3", + var indexTask = response.tasks.filter { t -> t.description.contains(index) } + assertThat(indexTask) .withFailMessage("replication tasks not stopped.") .isEmpty() }, waitFor.seconds, TimeUnit.SECONDS) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt index 6122216c..806f0a75 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt @@ -27,6 +27,7 @@ import com.amazon.elasticsearch.replication.replicationStatus import com.amazon.elasticsearch.replication.resumeReplication import com.amazon.elasticsearch.replication.`validate paused status response due to leader index deleted` import com.amazon.elasticsearch.replication.`validate status syncing response` +import com.amazon.elasticsearch.replication.followerStats import com.amazon.elasticsearch.replication.leaderStats import com.amazon.elasticsearch.replication.startReplication import com.amazon.elasticsearch.replication.stopReplication @@ -990,6 +991,63 @@ class StartReplicationIT: MultiClusterRestTestCase() { } } + fun `test follower stats`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + val followerIndex2 = "follower_index_2" + val followerIndex3 = "follower_index_3" + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create( + CreateIndexRequest(leaderIndexName), + RequestOptions.DEFAULT + ) + assertThat(createIndexResponse.isAcknowledged).isTrue() + + try { + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true + ) + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndex2), + TimeValue.timeValueSeconds(10), + true + ) + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndex3), + TimeValue.timeValueSeconds(10), + true + ) + val docCount = 50 + for (i in 1..docCount) { + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) + } + + followerClient.pauseReplication(followerIndex2) + followerClient.stopReplication(followerIndex3) + + val stats = followerClient.followerStats() + assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1") + assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1") + assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1") + assertThat(stats.getValue("num_index_tasks").toString()).isEqualTo("1") + assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") + assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") + assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0") + assertThat(stats.containsKey("index_stats")) + assertThat(stats.size).isEqualTo(15) + } finally { + followerClient.stopReplication(followerIndexName) + followerClient.stopReplication(followerIndex2) + } + } + fun `test that replication cannot be started on invalid indexName`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencerTests.kt index e7e249c8..0a187f2a 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencerTests.kt @@ -84,9 +84,10 @@ class TranslogSequencerTests : ESTestCase() { @ExperimentalCoroutinesApi fun `test sequencer out of order`() = runBlockingTest { + val stats = FollowerClusterStats() val startSeqNo = randomNonNegativeLong() val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, - client, startSeqNo) + client, startSeqNo, stats) // Send requests out of order (shuffled seqNo) and await for them to be processed. var batchSeqNo = startSeqNo