Skip to content

Commit

Permalink
Follower stats API (#126) (#134)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Sep 7, 2021
1 parent c6cf654 commit 1ee22b5
Show file tree
Hide file tree
Showing 19 changed files with 621 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package com.amazon.elasticsearch.replication

import com.amazon.elasticsearch.replication.action.stats.FollowerStatsHandler
import com.amazon.elasticsearch.replication.action.stats.TransportFollowerStatsAction
import com.amazon.elasticsearch.replication.task.shard.FollowerClusterStats
import org.elasticsearch.replication.action.stats.FollowerStatsAction
import com.amazon.elasticsearch.replication.action.autofollow.AutoFollowMasterNodeAction
import com.amazon.elasticsearch.replication.action.autofollow.TransportAutoFollowMasterNodeAction
import com.amazon.elasticsearch.replication.action.autofollow.TransportUpdateAutoFollowPatternAction
Expand Down Expand Up @@ -144,6 +148,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
private lateinit var threadPool: ThreadPool
private lateinit var replicationMetadataManager: ReplicationMetadataManager
private lateinit var replicationSettings: ReplicationSettings
private var followerClusterStats = FollowerClusterStats()

companion object {
const val REPLICATION_EXECUTOR_NAME_LEADER = "replication_leader"
Expand Down Expand Up @@ -186,7 +191,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client,
ReplicationMetadataStore(client, clusterService, xContentRegistry))
this.replicationSettings = ReplicationSettings(clusterService)
return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings)
return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings, followerClusterStats)
}

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent>> {
Expand Down Expand Up @@ -215,7 +220,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
ActionHandler(UpdateReplicationStateAction.INSTANCE, TransportUpdateReplicationStateDetails::class.java),
ActionHandler(ShardsInfoAction.INSTANCE, TranportShardsInfoAction::class.java),
ActionHandler(ReplicationStatusAction.INSTANCE,TransportReplicationStatusAction::class.java),
ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java)
ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java),
ActionHandler(FollowerStatsAction.INSTANCE, TransportFollowerStatsAction::class.java)
)
}

Expand All @@ -231,7 +237,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
UpdateIndexHandler(),
StopIndexReplicationHandler(),
ReplicationStatusHandler(),
LeaderStatsHandler())
LeaderStatsHandler(),
FollowerStatsHandler())
}

override fun getExecutorBuilders(settings: Settings): List<ExecutorBuilder<*>> {
Expand Down Expand Up @@ -265,7 +272,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
expressionResolver: IndexNameExpressionResolver)
: List<PersistentTasksExecutor<*>> {
return listOf(
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings),
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, followerClusterStats),
IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, settingsModule),
AutoFollowExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The elasticsearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright elasticsearch Contributors. See
* GitHub history for details.
*/

package org.elasticsearch.replication.action.stats

import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric
import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric.FollowerStats
import org.elasticsearch.action.support.nodes.BaseNodeResponse
import org.elasticsearch.cluster.node.DiscoveryNode
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class FollowerNodeStatsResponse : BaseNodeResponse {
var stats :Map<ShardId, FollowerStats>

constructor(inp: StreamInput) : super(inp) {
stats = inp.readMap(::ShardId, ::FollowerStats)
}

constructor(node : DiscoveryNode, remoteClusterStats: Map<ShardId, FollowerShardMetric>) : super(node) {
stats = remoteClusterStats.mapValues { (_ , v) -> v.createStats() }
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeMap(stats, { o, k -> k.writeTo(o)}, { o, v -> v.writeTo(o)})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.elasticsearch.replication.action.stats

import com.amazon.elasticsearch.replication.action.stats.FollowerStatsResponse
import org.elasticsearch.action.ActionType
import org.elasticsearch.common.io.stream.Writeable

class FollowerStatsAction : ActionType<FollowerStatsResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/plugins/replication/follower/stats"
val INSTANCE = FollowerStatsAction()
val reader = Writeable.Reader { inp -> FollowerStatsResponse(inp) }
}

override fun getResponseReader(): Writeable.Reader<FollowerStatsResponse> = reader
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The elasticsearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright elasticsearch Contributors. See
* GitHub history for details.
*/

package org.elasticsearch.replication.action.stats

import org.elasticsearch.action.support.nodes.BaseNodesRequest
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import java.io.IOException

/**
* A request to get node (cluster) level replication stats.
*/
class FollowerStatsRequest : BaseNodesRequest<FollowerStatsRequest?> {

/**
*
* @param in A stream input object.
* @throws IOException if the stream cannot be deserialized.
*/
constructor(`in`: StreamInput) : super(`in`)

/**
* Get information from nodes based on the nodes ids specified. If none are passed, information
* for all nodes will be returned.
*/
constructor(vararg nodesIds: String) : super(*nodesIds)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.elasticsearch.replication.action.stats


import com.amazon.elasticsearch.replication.metadata.ReplicationOverallState
import com.amazon.elasticsearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
import com.amazon.elasticsearch.replication.metadata.state.ReplicationStateMetadata
import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric
import com.amazon.elasticsearch.replication.task.shard.FollowerShardMetric.FollowerStats
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.FailedNodeException
import org.elasticsearch.action.support.nodes.BaseNodesResponse
import org.elasticsearch.cluster.ClusterName
import org.elasticsearch.common.Strings
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS
import org.elasticsearch.common.xcontent.ToXContent.Params
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.index.shard.ShardId
import org.elasticsearch.replication.action.stats.FollowerNodeStatsResponse
import java.io.IOException

class FollowerStatsResponse : BaseNodesResponse<FollowerNodeStatsResponse?>, ToXContentObject {
var shardStats :MutableMap<ShardId, FollowerShardMetric.FollowerStats> = mutableMapOf()
var indexStats :MutableMap<String, FollowerShardMetric.FollowerStats> = mutableMapOf()
var stats : FollowerShardMetric.FollowerStatsFragment = FollowerShardMetric.FollowerStatsFragment()

var pausedIndices :Int = 0
var failedIndices :Int = 0
var bootstrappingIndices :Int = 0
var syncingIndices :Int = 0
var shardTaskCount :Int = 0
var indexTaskCount :Int = 0

companion object {
private val log = LogManager.getLogger(FollowerStatsResponse::class.java)
}

constructor(inp: StreamInput) : super(inp) {
shardStats = inp.readMap(::ShardId, ::FollowerStats)
}

constructor(clusterName: ClusterName?, followerNodeResponse: List<FollowerNodeStatsResponse>?, failures: List<FailedNodeException?>?
, metadata : ReplicationStateMetadata) : super(clusterName, followerNodeResponse, failures) {

var syncing :MutableSet<String> = mutableSetOf()
if (followerNodeResponse != null) {
for (response in followerNodeResponse) {
shardStats.putAll(response.stats)

for (i in response.stats) {
syncing.add(i.key.indexName)

if (i.key.indexName !in indexStats) {
indexStats[i.key.indexName] = FollowerShardMetric.FollowerStats()
}
indexStats[i.key.indexName]!!.add(i.value)

stats.add(i.value)
}
}
}

var totalRunning = 0 //includes boostrap and syncing
for (entry in metadata.replicationDetails) {
when (entry.value[REPLICATION_LAST_KNOWN_OVERALL_STATE]) {
ReplicationOverallState.RUNNING.name -> totalRunning++
ReplicationOverallState.FAILED.name -> failedIndices++
ReplicationOverallState.PAUSED.name -> pausedIndices++
}
}

syncingIndices = syncing.size
bootstrappingIndices = totalRunning - syncingIndices

shardTaskCount = shardStats.size
indexTaskCount = totalRunning
}

@Throws(IOException::class)
override fun readNodesFrom(inp: StreamInput): List<FollowerNodeStatsResponse> {
return inp.readList { FollowerNodeStatsResponse(inp) }
}

@Throws(IOException::class)
override fun writeNodesTo(out: StreamOutput, leaderNodeRespons: List<FollowerNodeStatsResponse?>?) {
out.writeList(leaderNodeRespons)
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder {
builder.startObject()
builder.field("num_syncing_indices", syncingIndices)
builder.field("num_bootstrapping_indices", bootstrappingIndices)
builder.field("num_paused_indices", pausedIndices)
builder.field("num_shard_tasks", shardTaskCount)
builder.field("num_index_tasks", indexTaskCount)
stats.toXContent(builder, params)
builder.field("index_stats").map(indexStats)
builder.endObject()
return builder
}

override fun toString(): String {
val builder: XContentBuilder = XContentFactory.jsonBuilder().prettyPrint()
toXContent(builder, EMPTY_PARAMS)
return Strings.toString(builder)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
package com.amazon.elasticsearch.replication.action.stats

import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric
import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteShardStats
import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteStats
import org.elasticsearch.action.support.nodes.BaseNodeResponse
import org.elasticsearch.cluster.node.DiscoveryNode
import org.elasticsearch.common.io.stream.StreamInput
Expand All @@ -21,10 +21,10 @@ import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class LeaderNodeStatsResponse : BaseNodeResponse {
var remoteStats :Map<ShardId, RemoteShardMetric.RemoteShardStats>
var remoteStats :Map<ShardId, RemoteStats>

constructor(inp: StreamInput) : super(inp) {
remoteStats = inp.readMap(::ShardId, ::RemoteShardStats)
remoteStats = inp.readMap(::ShardId, ::RemoteStats)
}

constructor(node : DiscoveryNode, remoteClusterStats: Map<ShardId, RemoteShardMetric>) : super(node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.amazon.elasticsearch.replication.action.stats


import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric
import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteShardStats
import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteStats
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.FailedNodeException
import org.elasticsearch.action.support.nodes.BaseNodesResponse
Expand All @@ -28,41 +28,27 @@ import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentFactory
import java.io.IOException


class LeaderStatsResponse : BaseNodesResponse<LeaderNodeStatsResponse?>, ToXContentObject {
var remoteStats :MutableMap<String, RemoteShardMetric.RemoteShardStats> = mutableMapOf()
var ops :Long = 0
var tlogSize :Long = 0
var opsLucene :Long = 0
var opsTlog :Long = 0
var latencyLucene :Long = 0
var latencyTlog :Long = 0
var bytesRead :Long = 0
var remoteStats :MutableMap<String, RemoteStats> = mutableMapOf()
var stats = RemoteShardMetric.RemoteStatsFrag()

companion object {
private val log = LogManager.getLogger(LeaderStatsResponse::class.java)
}

constructor(inp: StreamInput) : super(inp) {
remoteStats = inp.readMap(StreamInput::readString, ::RemoteShardStats)
}

fun add(stat :RemoteShardStats) {
ops += stat.ops
tlogSize += stat.tlogSize
opsLucene += stat.opsLucene
opsTlog += stat.opsTlog
latencyLucene += stat.latencyLucene
latencyTlog += stat.latencyTlog
bytesRead += stat.bytesRead
remoteStats = inp.readMap(StreamInput::readString, ::RemoteStats)
}


constructor(clusterName: ClusterName?, leaderNodeRespons: List<LeaderNodeStatsResponse>?, failures: List<FailedNodeException?>?) : super(clusterName, leaderNodeRespons, failures) {
if (leaderNodeRespons != null) {
for (response in leaderNodeRespons) {
for (entry in response.remoteStats) {
remoteStats[entry.key.indexName] = remoteStats.getOrDefault(entry.key.indexName, RemoteShardStats())
remoteStats[entry.key.indexName] = remoteStats.getOrDefault(entry.key.indexName, RemoteStats())
remoteStats[entry.key.indexName]!!.add(entry.value)
add(entry.value)
stats.add(entry.value)
}
}
}
Expand All @@ -82,13 +68,7 @@ class LeaderStatsResponse : BaseNodesResponse<LeaderNodeStatsResponse?>, ToXCont
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder {
builder.startObject()
builder.field("num_replicated_indices", remoteStats.size)
builder.field("operations_read", ops)
builder.field("translog_size_bytes", tlogSize)
builder.field("operations_read_lucene", opsLucene)
builder.field("operations_read_translog", opsTlog)
builder.field("total_read_time_lucene_millis", latencyLucene)
builder.field("total_read_time_translog_millis", latencyTlog)
builder.field("bytes_read", bytesRead)
stats.toXContent(builder, params)
builder.field("index_details").map(remoteStats)
builder.endObject()
return builder
Expand Down
Loading

0 comments on commit 1ee22b5

Please sign in to comment.