Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Sep 7, 2021
1 parent 5ce9213 commit c6cf654
Show file tree
Hide file tree
Showing 14 changed files with 597 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import com.amazon.elasticsearch.replication.action.setup.SetupChecksAction
import com.amazon.elasticsearch.replication.action.setup.TransportSetupChecksAction
import com.amazon.elasticsearch.replication.action.setup.TransportValidatePermissionsAction
import com.amazon.elasticsearch.replication.action.setup.ValidatePermissionsAction
import com.amazon.elasticsearch.replication.action.stats.LeaderStatsAction
import com.amazon.elasticsearch.replication.action.stats.TransportLeaderStatsAction
import com.amazon.elasticsearch.replication.action.status.ReplicationStatusAction
import com.amazon.elasticsearch.replication.action.status.ShardsInfoAction
import com.amazon.elasticsearch.replication.action.status.TranportShardsInfoAction
Expand All @@ -69,6 +71,7 @@ import com.amazon.elasticsearch.replication.rest.ResumeIndexReplicationHandler
import com.amazon.elasticsearch.replication.rest.StopIndexReplicationHandler
import com.amazon.elasticsearch.replication.rest.UpdateAutoFollowPatternsHandler
import com.amazon.elasticsearch.replication.rest.UpdateIndexHandler
import com.amazon.elasticsearch.replication.seqno.RemoteClusterStats
import com.amazon.elasticsearch.replication.seqno.RemoteClusterTranslogService
import com.amazon.elasticsearch.replication.task.IndexCloseListener
import com.amazon.elasticsearch.replication.task.autofollow.AutoFollowExecutor
Expand Down Expand Up @@ -131,6 +134,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder
import org.elasticsearch.threadpool.ScalingExecutorBuilder
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.watcher.ResourceWatcherService
import org.opensearch.replication.rest.LeaderStatsHandler
import java.util.Optional
import java.util.function.Supplier

Expand Down Expand Up @@ -186,7 +190,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent>> {
return listOf(Injectables::class.java,
return listOf(Injectables::class.java, RemoteClusterStats::class.java,
RemoteClusterRestoreLeaderService::class.java, RemoteClusterTranslogService::class.java)
}

Expand All @@ -210,7 +214,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
ActionHandler(SetupChecksAction.INSTANCE, TransportSetupChecksAction::class.java),
ActionHandler(UpdateReplicationStateAction.INSTANCE, TransportUpdateReplicationStateDetails::class.java),
ActionHandler(ShardsInfoAction.INSTANCE, TranportShardsInfoAction::class.java),
ActionHandler(ReplicationStatusAction.INSTANCE,TransportReplicationStatusAction::class.java)
ActionHandler(ReplicationStatusAction.INSTANCE,TransportReplicationStatusAction::class.java),
ActionHandler(LeaderStatsAction.INSTANCE, TransportLeaderStatsAction::class.java)
)
}

Expand All @@ -225,7 +230,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
ResumeIndexReplicationHandler(),
UpdateIndexHandler(),
StopIndexReplicationHandler(),
ReplicationStatusHandler())
ReplicationStatusHandler(),
LeaderStatsHandler())
}

override fun getExecutorBuilders(settings: Settings): List<ExecutorBuilder<*>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import com.amazon.elasticsearch.replication.util.completeWith
import com.amazon.elasticsearch.replication.util.coroutineContext
import com.amazon.elasticsearch.replication.util.waitForGlobalCheckpoint
import com.amazon.elasticsearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER
import com.amazon.elasticsearch.replication.seqno.RemoteClusterStats
import com.amazon.elasticsearch.replication.seqno.RemoteClusterTranslogService
import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
Expand All @@ -43,13 +45,15 @@ import org.elasticsearch.indices.IndicesService
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportActionProxy
import org.elasticsearch.transport.TransportService
import java.util.concurrent.TimeUnit
import kotlin.math.min

class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clusterService: ClusterService,
transportService: TransportService, actionFilters: ActionFilters,
indexNameExpressionResolver: IndexNameExpressionResolver,
private val indicesService: IndicesService,
private val translogService: RemoteClusterTranslogService) :
private val translogService: RemoteClusterTranslogService,
private val remoteStatsService: RemoteClusterStats) :
TransportSingleShardAction<GetChangesRequest, GetChangesResponse>(
GetChangesAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ::GetChangesRequest, REPLICATION_EXECUTOR_NAME_LEADER) {
Expand All @@ -72,6 +76,12 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) {
// TODO: Figure out if we need to acquire a primary permit here
listener.completeWith {
var relativeStartNanos = System.nanoTime()
remoteStatsService.stats[shardId] = remoteStatsService.stats.getOrDefault(shardId, RemoteShardMetric())
val indexMetric = remoteStatsService.stats[shardId]!!

indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
Expand All @@ -88,6 +98,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo)

Expand All @@ -104,6 +115,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// Translog fetch is disabled or not found
if(!fetchFromTranslog) {
log.info("Fetching changes from lucene for ${request.shardId} - from:${request.fromSeqNo}, to:$toSeqNo")
relativeStartNanos = System.nanoTime()
indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot ->
ops = ArrayList(snapshot.totalOperations())
var op = snapshot.next()
Expand All @@ -113,6 +125,21 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
}
}
}

val tookInNanos = System.nanoTime() - relativeStartNanos
val tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos)
if (fetchFromTranslog) {
indexMetric.latencyTlog.addAndGet(tookInMillis)
indexMetric.opsTlog.addAndGet(ops.size.toLong())
} else {
indexMetric.latencyLucene.addAndGet(tookInMillis)
indexMetric.opsLucene.addAndGet(ops.size.toLong())
}
indexMetric.tlogSize.set(indexShard.translogStats().translogSizeInBytes)
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }

GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.elasticsearch.replication.action.stats

import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric
import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteShardStats
import org.elasticsearch.action.support.nodes.BaseNodeResponse
import org.elasticsearch.cluster.node.DiscoveryNode
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class LeaderNodeStatsResponse : BaseNodeResponse {
var remoteStats :Map<ShardId, RemoteShardMetric.RemoteShardStats>

constructor(inp: StreamInput) : super(inp) {
remoteStats = inp.readMap(::ShardId, ::RemoteShardStats)
}

constructor(node : DiscoveryNode, remoteClusterStats: Map<ShardId, RemoteShardMetric>) : super(node) {
remoteStats = remoteClusterStats.mapValues { (_ , v) -> v.createStats() }
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeMap(remoteStats, { o, k -> k.writeTo(o)}, { o, v -> v.writeTo(o)})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.elasticsearch.replication.action.stats

import org.elasticsearch.action.ActionType
import org.elasticsearch.common.io.stream.Writeable

class LeaderStatsAction : ActionType<LeaderStatsResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/plugins/replication/index/stats"
val INSTANCE = LeaderStatsAction()
val reader = Writeable.Reader { inp -> LeaderStatsResponse(inp) }
}

override fun getResponseReader(): Writeable.Reader<LeaderStatsResponse> = reader
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.elasticsearch.replication.action.stats

import org.elasticsearch.action.support.nodes.BaseNodesRequest
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import java.io.IOException

/**
* A request to get node (cluster) level replication stats.
*/
class LeaderStatsRequest : BaseNodesRequest<LeaderStatsRequest?> {

/**
*
* @param in A stream input object.
* @throws IOException if the stream cannot be deserialized.
*/
constructor(inp: StreamInput) : super(inp)

/**
* Get information from nodes based on the nodes ids specified. If none are passed, information
* for all nodes will be returned.
*/
constructor(vararg nodesIds: String) : super(*nodesIds)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.elasticsearch.replication.action.stats


import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric
import com.amazon.elasticsearch.replication.seqno.RemoteShardMetric.RemoteShardStats
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.FailedNodeException
import org.elasticsearch.action.support.nodes.BaseNodesResponse
import org.elasticsearch.cluster.ClusterName
import org.elasticsearch.common.Strings
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS
import org.elasticsearch.common.xcontent.ToXContent.Params
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentFactory
import java.io.IOException

class LeaderStatsResponse : BaseNodesResponse<LeaderNodeStatsResponse?>, ToXContentObject {
var remoteStats :MutableMap<String, RemoteShardMetric.RemoteShardStats> = mutableMapOf()
var ops :Long = 0
var tlogSize :Long = 0
var opsLucene :Long = 0
var opsTlog :Long = 0
var latencyLucene :Long = 0
var latencyTlog :Long = 0
var bytesRead :Long = 0

companion object {
private val log = LogManager.getLogger(LeaderStatsResponse::class.java)
}

constructor(inp: StreamInput) : super(inp) {
remoteStats = inp.readMap(StreamInput::readString, ::RemoteShardStats)
}

fun add(stat :RemoteShardStats) {
ops += stat.ops
tlogSize += stat.tlogSize
opsLucene += stat.opsLucene
opsTlog += stat.opsTlog
latencyLucene += stat.latencyLucene
latencyTlog += stat.latencyTlog
bytesRead += stat.bytesRead
}

constructor(clusterName: ClusterName?, leaderNodeRespons: List<LeaderNodeStatsResponse>?, failures: List<FailedNodeException?>?) : super(clusterName, leaderNodeRespons, failures) {
if (leaderNodeRespons != null) {
for (response in leaderNodeRespons) {
for (entry in response.remoteStats) {
remoteStats[entry.key.indexName] = remoteStats.getOrDefault(entry.key.indexName, RemoteShardStats())
remoteStats[entry.key.indexName]!!.add(entry.value)
add(entry.value)
}
}
}
}

@Throws(IOException::class)
override fun readNodesFrom(inp: StreamInput): List<LeaderNodeStatsResponse> {
return inp.readList { LeaderNodeStatsResponse(inp) }
}

@Throws(IOException::class)
override fun writeNodesTo(out: StreamOutput, leaderNodeRespons: List<LeaderNodeStatsResponse?>?) {
out.writeList(leaderNodeRespons)
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder {
builder.startObject()
builder.field("num_replicated_indices", remoteStats.size)
builder.field("operations_read", ops)
builder.field("translog_size_bytes", tlogSize)
builder.field("operations_read_lucene", opsLucene)
builder.field("operations_read_translog", opsTlog)
builder.field("total_read_time_lucene_millis", latencyLucene)
builder.field("total_read_time_translog_millis", latencyTlog)
builder.field("bytes_read", bytesRead)
builder.field("index_details").map(remoteStats)
builder.endObject()
return builder
}

override fun toString(): String {
val builder: XContentBuilder = XContentFactory.jsonBuilder().prettyPrint()
toXContent(builder, EMPTY_PARAMS)
return Strings.toString(builder)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.elasticsearch.replication.action.stats

import org.elasticsearch.action.support.nodes.BaseNodeRequest
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import java.io.IOException

class NodeStatsRequest : BaseNodeRequest {

constructor(inp :StreamInput) : super(inp)

constructor()

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
}
}

Loading

0 comments on commit c6cf654

Please sign in to comment.