Skip to content

Commit

Permalink
Follower stats API
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Sep 6, 2021
1 parent 6be8425 commit cda7fed
Show file tree
Hide file tree
Showing 15 changed files with 585 additions and 21 deletions.
15 changes: 11 additions & 4 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ import org.opensearch.plugins.EnginePlugin
import org.opensearch.plugins.PersistentTaskPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.RepositoryPlugin
import org.opensearch.replication.action.stats.FollowerStatsAction
import org.opensearch.replication.action.stats.LeaderStatsAction
import org.opensearch.replication.action.stats.TransportFollowerStatsAction
import org.opensearch.replication.action.stats.TransportLeaderStatsAction
import org.opensearch.replication.rest.FollowerStatsHandler
import org.opensearch.replication.rest.LeaderStatsHandler
import org.opensearch.replication.seqno.RemoteClusterStats
import org.opensearch.replication.task.shard.FollowerClusterStats
import org.opensearch.repositories.RepositoriesService
import org.opensearch.repositories.Repository
import org.opensearch.rest.RestController
Expand All @@ -142,6 +146,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
private lateinit var threadPool: ThreadPool
private lateinit var replicationMetadataManager: ReplicationMetadataManager
private lateinit var replicationSettings: ReplicationSettings
private var followerClusterStats = FollowerClusterStats()

companion object {
const val REPLICATION_EXECUTOR_NAME_LEADER = "replication_leader"
Expand Down Expand Up @@ -184,7 +189,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client,
ReplicationMetadataStore(client, clusterService, xContentRegistry))
this.replicationSettings = ReplicationSettings(clusterService)
return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings)
return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings, followerClusterStats)
}

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent>> {
Expand Down Expand Up @@ -213,7 +218,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
ActionHandler(UpdateReplicationStateAction.INSTANCE, TransportUpdateReplicationStateDetails::class.java),
ActionHandler(ShardsInfoAction.INSTANCE, TranportShardsInfoAction::class.java),
ActionHandler(ReplicationStatusAction.INSTANCE,TransportReplicationStatusAction::class.java),
ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java)
ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java),
ActionHandler(FollowerStatsAction.INSTANCE, TransportFollowerStatsAction::class.java)
)
}

Expand All @@ -229,7 +235,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
UpdateIndexHandler(),
StopIndexReplicationHandler(),
ReplicationStatusHandler(),
LeaderStatsHandler())
LeaderStatsHandler(),
FollowerStatsHandler())
}

override fun getExecutorBuilders(settings: Settings): List<ExecutorBuilder<*>> {
Expand Down Expand Up @@ -263,7 +270,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
expressionResolver: IndexNameExpressionResolver)
: List<PersistentTasksExecutor<*>> {
return listOf(
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings),
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, followerClusterStats),
IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, settingsModule),
AutoFollowExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.stats

import org.opensearch.action.support.nodes.BaseNodeResponse
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.task.shard.FollowerShardMetric
import org.opensearch.replication.task.shard.FollowerShardMetric.FollowerShardStats
import java.io.IOException

class FollowerNodeStatsResponse : BaseNodeResponse {
var stats :Map<ShardId, FollowerShardStats>

constructor(inp: StreamInput) : super(inp) {
stats = inp.readMap(::ShardId, ::FollowerShardStats)
}

constructor(node : DiscoveryNode, remoteClusterStats: Map<ShardId, FollowerShardMetric>) : super(node) {
stats = remoteClusterStats.mapValues { (_ , v) -> v.createStats() }
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeMap(stats, { o, k -> k.writeTo(o)}, { o, v -> v.writeTo(o)})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.stats

import org.opensearch.action.ActionType
import org.opensearch.common.io.stream.Writeable

class FollowerStatsAction : ActionType<FollowerStatsResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/plugins/replication/follower/stats"
val INSTANCE = FollowerStatsAction()
val reader = Writeable.Reader { inp -> FollowerStatsResponse(inp) }
}

override fun getResponseReader(): Writeable.Reader<FollowerStatsResponse> = reader
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.stats

import org.opensearch.action.support.nodes.BaseNodesRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import java.io.IOException

/**
* A request to get node (cluster) level replication stats.
*/
class FollowerStatsRequest : BaseNodesRequest<FollowerStatsRequest?> {

/**
*
* @param in A stream input object.
* @throws IOException if the stream cannot be deserialized.
*/
constructor(`in`: StreamInput) : super(`in`)

/**
* Get information from nodes based on the nodes ids specified. If none are passed, information
* for all nodes will be returned.
*/
constructor(vararg nodesIds: String) : super(*nodesIds)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.stats


import org.apache.logging.log4j.LogManager
import org.opensearch.action.FailedNodeException
import org.opensearch.action.support.nodes.BaseNodesResponse
import org.opensearch.cluster.ClusterName
import org.opensearch.common.Strings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS
import org.opensearch.common.xcontent.ToXContent.Params
import org.opensearch.common.xcontent.ToXContentFragment
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
import org.opensearch.replication.metadata.state.ReplicationStateMetadata
import org.opensearch.replication.task.shard.FollowerIndexStats
import org.opensearch.replication.task.shard.FollowerShardMetric.FollowerShardStats
import java.io.IOException

class FollowerStatsResponse : BaseNodesResponse<FollowerNodeStatsResponse?>, ToXContentFragment {
var shardStats :MutableMap<ShardId, FollowerShardStats> = mutableMapOf()
var indexStats :MutableMap<String, FollowerIndexStats> = mutableMapOf()

var pausedIndices :Int = 0
var failedIndices :Int = 0
var bootstrappingIndices :Int = 0
var syncingIndices :Int = 0
var shardTaskCount :Int = 0
var indexTaskCount :Int = 0

var opsWritten: Long = 0
var opsWriteFailures: Long = 0
var opsRead: Long = 0
var opsReadFailures: Long = 0
var localCheckpoint: Long = 0
var remoteCheckpoint: Long = 0
var totalWriteTime: Long = 0

companion object {
private val log = LogManager.getLogger(FollowerStatsResponse::class.java)
}

constructor(inp: StreamInput) : super(inp) {
shardStats = inp.readMap(::ShardId, ::FollowerShardStats)
}

constructor(clusterName: ClusterName?, followerNodeResponse: List<FollowerNodeStatsResponse>?, failures: List<FailedNodeException?>?
, metadata : ReplicationStateMetadata) : super(clusterName, followerNodeResponse, failures) {

var syncing :MutableSet<String> = mutableSetOf()
if (followerNodeResponse != null) {
for (response in followerNodeResponse) {
shardStats.putAll(response.stats)

for (i in response.stats) {
syncing.add(i.key.indexName)

if (i.key.indexName !in indexStats) {
indexStats[i.key.indexName] = FollowerIndexStats()
}
indexStats[i.key.indexName]!!.add(i.value)

add(i.value)
}
}
}

var totalRunning = 0 //includes boostrap and syncing
for (entry in metadata.replicationDetails) {
when (entry.value[REPLICATION_LAST_KNOWN_OVERALL_STATE]) {
ReplicationOverallState.RUNNING.name -> totalRunning++
ReplicationOverallState.FAILED.name -> failedIndices++
ReplicationOverallState.PAUSED.name -> pausedIndices++
}
}

syncingIndices = syncing.size
bootstrappingIndices = totalRunning - syncingIndices

shardTaskCount = shardStats.size
indexTaskCount = totalRunning
}

fun add(stat: FollowerShardStats) {
opsWritten += stat.opsWritten
opsWriteFailures += stat.opsWriteFailures
opsRead += stat.opsRead
opsReadFailures += stat.opsReadFailures
localCheckpoint += stat.localCheckpoint
remoteCheckpoint += stat.remoteCheckpoint
totalWriteTime += stat.totalWriteTime
}

@Throws(IOException::class)
override fun readNodesFrom(`in`: StreamInput): List<FollowerNodeStatsResponse> {
return `in`.readList { FollowerNodeStatsResponse(`in`) }
}

@Throws(IOException::class)
override fun writeNodesTo(out: StreamOutput, leaderNodeRespons: List<FollowerNodeStatsResponse?>?) {
out.writeList(leaderNodeRespons)
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder {
builder.startObject()
builder.field("num_syncing_indices", syncingIndices)
builder.field("num_bootstrapping_indices", bootstrappingIndices)
builder.field("num_paused_indices", pausedIndices)
builder.field("num_shard_tasks", shardTaskCount)
builder.field("num_index_tasks", indexTaskCount)
builder.field("operations_written", opsWritten)
builder.field("operations_read", opsRead)
builder.field("failed_read_requests", opsReadFailures)
builder.field("failed_write_requests", opsWriteFailures)
builder.field("total_local_checkpoint", localCheckpoint)
builder.field("total_remote_checkpoint", remoteCheckpoint)
builder.field("total_write_time_millis", totalWriteTime)
builder.field("index_stats").map(indexStats)
builder.endObject()
return builder
}

override fun toString(): String {
val builder: XContentBuilder = XContentFactory.jsonBuilder().prettyPrint()
toXContent(builder, EMPTY_PARAMS)
return Strings.toString(builder)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.stats

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import org.apache.logging.log4j.LogManager
import org.opensearch.action.FailedNodeException
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.nodes.TransportNodesAction
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.replication.metadata.state.ReplicationStateMetadata
import org.opensearch.replication.seqno.RemoteClusterStats
import org.opensearch.replication.task.shard.FollowerClusterStats
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService

class TransportFollowerStatsAction @Inject constructor(transportService: TransportService,
clusterService: ClusterService,
threadPool: ThreadPool,
actionFilters: ActionFilters,
private val remoteStats: RemoteClusterStats,
private val client: NodeClient,
private val followerStats: FollowerClusterStats) :
TransportNodesAction<FollowerStatsRequest, FollowerStatsResponse, NodeStatsRequest, FollowerNodeStatsResponse>(FollowerStatsAction.NAME,
threadPool, clusterService, transportService, actionFilters, ::FollowerStatsRequest, ::NodeStatsRequest, ThreadPool.Names.MANAGEMENT,
FollowerNodeStatsResponse::class.java), CoroutineScope by GlobalScope {

companion object {
private val log = LogManager.getLogger(TransportFollowerStatsAction::class.java)
}

override fun newNodeRequest(request: FollowerStatsRequest): NodeStatsRequest {
return NodeStatsRequest()
}

override fun newNodeResponse(input: StreamInput): FollowerNodeStatsResponse {
return FollowerNodeStatsResponse(input)
}

override fun newResponse(request: FollowerStatsRequest?, responses: MutableList<FollowerNodeStatsResponse>?, failures: MutableList<FailedNodeException>?): FollowerStatsResponse {
val metadata = clusterService.state().metadata().custom(ReplicationStateMetadata.NAME) ?: ReplicationStateMetadata.EMPTY
return FollowerStatsResponse(clusterService.clusterName, responses, failures, metadata)
}

override fun nodeOperation(nodeStatRequest: NodeStatsRequest?): FollowerNodeStatsResponse {
return FollowerNodeStatsResponse(this.clusterService.localNode(), followerStats.stats)
}
}
Loading

0 comments on commit cda7fed

Please sign in to comment.