Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status aggregated #46

Merged
merged 2 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class ReplicationStatusResponse : BroadcastResponse, ToXContentObject {
lateinit var connectionAlias: String
lateinit var leaderIndexName: String
lateinit var followerIndexName: String
lateinit var aggregatedReplayDetails: ReplayDetails
lateinit var aggregatedRestoreDetails: RestoreDetails
var isVerbose: Boolean = true

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp) {
Expand Down Expand Up @@ -62,13 +65,17 @@ class ReplicationStatusResponse : BroadcastResponse, ToXContentObject {
if (::status.isInitialized)
builder.field("status",status)
if (::connectionAlias.isInitialized)
builder.field("connection_alias",connectionAlias)
builder.field("remote_cluster",connectionAlias)
if (::leaderIndexName.isInitialized)
builder.field("leader_index",leaderIndexName)
if (::followerIndexName.isInitialized)
builder.field("follower_index",followerIndexName)
if (::shardInfoResponse.isInitialized)
builder.field("replication_data",shardInfoResponse)
if (::aggregatedReplayDetails.isInitialized)
builder.field("syncing_details",aggregatedReplayDetails)
if (::aggregatedRestoreDetails.isInitialized)
builder.field("bootstrap_details",aggregatedRestoreDetails)
if (isVerbose and ::shardInfoResponse.isInitialized)
builder.field("shard_replication_details",shardInfoResponse)
builder.endObject()
return builder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ import org.elasticsearch.common.xcontent.XContentBuilder
class ShardInfoRequest : BroadcastRequest<ShardInfoRequest> , ToXContentObject {

var indexName: String
var verbose: Boolean = false

constructor(indexName: String) {
this.indexName = indexName
}

constructor(indexName: String,verbose: Boolean) {
this.indexName = indexName
this.verbose = verbose
}

constructor(inp: StreamInput): super(inp) {
indexName = inp.readString()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class ShardInfoResponse : BroadcastShardResponse, ToXContentObject {
}

private val SHARDID = ParseField("shard_id")
private val DOCCOUNT = ParseField("doc_count")
private val REPLAYDETAILS = ParseField("syncing_task_details")
private val RESTOREDETAILS = ParseField("bootstrap_task_details")

Expand All @@ -73,6 +72,9 @@ class ShardInfoResponse : BroadcastShardResponse, ToXContentObject {
fun isReplayDetailsInitialized(): Boolean {
return ::replayDetails.isInitialized
}
fun isRestoreDetailsInitialized(): Boolean {
return ::restoreDetails.isInitialized
}
}

class RestoreDetails : BroadcastResponse, ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.amazon.elasticsearch.replication.action.status


import com.amazon.elasticsearch.replication.ReplicationException
import com.amazon.elasticsearch.replication.metadata.ReplicationMetadataManager
import com.amazon.elasticsearch.replication.util.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ResourceNotFoundException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
Expand Down Expand Up @@ -67,14 +69,66 @@ class TransportReplicationStatusAction @Inject constructor(transportService: Tra
followerResponse.followerIndexName = metadata.followerContext.resource
followerResponse.leaderIndexName = metadata.leaderContext.resource
followerResponse.status = status
populateAggregatedResponse(followerResponse)
if (!request.verbose) {
followerResponse.isVerbose = false
}
followerResponse
} catch(e : Exception) {
// TODO : when we get resoucenotfound exception show replication status
// as not in progess and in case of generic excpetion, throw replication_exception.
} catch (e : ResourceNotFoundException) {
log.error("got ResourceNotFoundException while querying for status ",e)
ReplicationStatusResponse("REPLICATION NOT IN PROGRESS")
} catch(e : Exception) {
log.error("got Exception while querying for status ",e)
throw ReplicationException("failed to fetch replication status")
}
}
}
}

private fun populateAggregatedResponse(followerResponse: ReplicationStatusResponse) {
var aggregatedRemoteCheckpoint: Long = 0
var aggregatedLocalCheckpoint: Long = 0
var aggregatedSeqNo: Long = 0
var anyShardInReplay: Boolean = false
var anyShardInRestore: Boolean = false
var aggregateTotalBytes: Long = 0
var aggregateRecoveredBytes: Long = 0
var aggregateRecovereyPercentage: Float = 0F
var aggregateTotalFiles: Int = 0
var aggregateRecoveredFiles: Int = 0
var aggregateFileRecovereyPercentage: Float = 0F
var startTime: Long = Long.MAX_VALUE
var time: Long = 0
var numberOfShardsiInRestore: Int = 0


followerResponse.shardInfoResponse.forEach {
if (it.isReplayDetailsInitialized()) {
aggregatedRemoteCheckpoint += it.replayDetails.remoteCheckpoint()
aggregatedLocalCheckpoint += it.replayDetails.localCheckpoint()
aggregatedSeqNo += it.replayDetails.seqNo()
anyShardInReplay = true
}
if (it.isRestoreDetailsInitialized()) {
anyShardInRestore = true
aggregateTotalBytes += it.restoreDetails.totalBytes
aggregateRecoveredBytes += it.restoreDetails.recoveredBytes
aggregateRecovereyPercentage = (numberOfShardsiInRestore * aggregateRecovereyPercentage + it.restoreDetails.recovereyPercentage) / (numberOfShardsiInRestore + 1)
aggregateFileRecovereyPercentage = (numberOfShardsiInRestore * aggregateFileRecovereyPercentage + it.restoreDetails.fileRecovereyPercentage) / (numberOfShardsiInRestore + 1)
numberOfShardsiInRestore++
aggregateTotalFiles += it.restoreDetails.totalFiles
aggregateRecoveredFiles += it.restoreDetails.recoveredFiles
startTime = Math.min(startTime, it.restoreDetails.startTime)
time = Math.max(time, it.restoreDetails.time)
}
}
if (anyShardInReplay) {
followerResponse.aggregatedReplayDetails = ReplayDetails(aggregatedRemoteCheckpoint, aggregatedLocalCheckpoint, aggregatedSeqNo)
}
if (anyShardInRestore) {
followerResponse.aggregatedRestoreDetails = RestoreDetails(aggregateTotalBytes, aggregateRecoveredBytes, aggregateRecovereyPercentage
, aggregateTotalFiles, aggregateRecoveredFiles, aggregateFileRecovereyPercentage, startTime, time)
}
}
}

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.amazon.elasticsearch.replication.rest


import com.amazon.elasticsearch.replication.action.status.ShardInfoRequest
import com.amazon.elasticsearch.replication.action.status.ReplicationStatusAction
import com.amazon.elasticsearch.replication.action.status.ShardInfoRequest
import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
import org.elasticsearch.rest.RestHandler
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.action.RestToXContentListener
Expand All @@ -28,7 +29,8 @@ class ReplicationStatusHandler : BaseRestHandler() {
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val index = request.param("index")
val indexReplicationStatusRequest = ShardInfoRequest(index)
var isVerbose = (request.paramAsBoolean("verbose", false))
val indexReplicationStatusRequest = ShardInfoRequest(index,isVerbose)
return RestChannelConsumer {
channel ->
client.execute(ReplicationStatusAction.INSTANCE, indexReplicationStatusRequest, RestToXContentListener(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const val REST_REPLICATION_STOP = "$REST_REPLICATION_PREFIX{index}/_stop"
const val REST_REPLICATION_PAUSE = "$REST_REPLICATION_PREFIX{index}/_pause"
const val REST_REPLICATION_RESUME = "$REST_REPLICATION_PREFIX{index}/_resume"
const val REST_REPLICATION_UPDATE = "$REST_REPLICATION_PREFIX{index}/_update"
const val REST_REPLICATION_STATUS_VERBOSE = "$REST_REPLICATION_PREFIX{index}/_status?verbose=true"
const val REST_REPLICATION_STATUS = "$REST_REPLICATION_PREFIX{index}/_status"
const val REST_AUTO_FOLLOW_PATTERN = "${REST_REPLICATION_PREFIX}_autofollow"

Expand Down Expand Up @@ -71,8 +72,8 @@ fun getAckResponse(lowLevelResponse: Response): AcknowledgedResponse {
return AcknowledgedResponse.fromXContent(xcp)
}

fun RestHighLevelClient.replicationStatus(index: String) : Map<String, Any> {
val lowLevelStopRequest = Request("GET", REST_REPLICATION_STATUS.replace("{index}", index,true))
fun RestHighLevelClient.replicationStatus(index: String,verbose: Boolean = true) : Map<String, Any> {
var lowLevelStopRequest = if(!verbose) Request("GET", REST_REPLICATION_STATUS.replace("{index}", index,true)) else Request("GET", REST_REPLICATION_STATUS_VERBOSE.replace("{index}", index,true))
lowLevelStopRequest.setJsonEntity("{}")
val lowLevelStatusResponse = lowLevelClient.performRequest(lowLevelStopRequest)
val statusResponse: Map<String, Any> = ESRestTestCase.entityAsMap(lowLevelStatusResponse)
Expand All @@ -81,23 +82,41 @@ fun RestHighLevelClient.replicationStatus(index: String) : Map<String, Any> {

fun `validate status syncing resposne`(statusResp: Map<String, Any>) {
Assert.assertEquals(statusResp.getValue("status"),"SYNCING")
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("syncing_task_details"))
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("remote_checkpoint"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("syncing_task_details"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("remote_checkpoint"))
}

fun `validate status syncing aggregated resposne`(statusResp: Map<String, Any>) {
Assert.assertEquals(statusResp.getValue("status"),"SYNCING")
Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("remote_checkpoint"))
}

fun `validate not paused status resposne`(statusResp: Map<String, Any>) {
Assert.assertNotEquals(statusResp.getValue("status"),"PAUSED")
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("syncing_task_details"))
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("remote_checkpoint"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("syncing_task_details"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("remote_checkpoint"))
}

fun `validate not paused status aggregated resposne`(statusResp: Map<String, Any>) {
Assert.assertNotEquals(statusResp.getValue("status"),"PAUSED")
Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("remote_checkpoint"))
}

fun `validate paused status resposne`(statusResp: Map<String, Any>) {
Assert.assertEquals(statusResp.getValue("status"),"PAUSED")
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("syncing_task_details"))
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("remote_checkpoint"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("syncing_task_details"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("remote_checkpoint"))
}

fun `validate aggregated paused status resposne`(statusResp: Map<String, Any>) {
Assert.assertEquals(statusResp.getValue("status"),"PAUSED")
Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("local_checkpoint"))
Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("remote_checkpoint"))
}

fun RestHighLevelClient.stopReplication(index: String, shouldWait: Boolean = true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
followerClient.pauseReplication(randomIndex)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate paused status resposne`(statusResp)
statusResp = followerClient.replicationStatus(followerIndexName,false)
`validate aggregated paused status resposne`(statusResp)
}.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("No replication in progress for index:$randomIndex")
}
Expand All @@ -172,6 +174,8 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
followerClient.pauseReplication(followerIndexName)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate paused status resposne`(statusResp)
statusResp = followerClient.replicationStatus(followerIndexName,false)
`validate aggregated paused status resposne`(statusResp)
// Since, we were still in FOLLOWING phase when pause was called, the index
// in follower index should not have been deleted in follower cluster
assertBusy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {
followerClient.pauseReplication(followerIndexName)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate paused status resposne`(statusResp)
statusResp = followerClient.replicationStatus(followerIndexName,false)
`validate aggregated paused status resposne`(statusResp)
followerClient.resumeReplication(followerIndexName)
} finally {
followerClient.stopReplication(followerIndexName)
Expand All @@ -89,9 +91,13 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {
assertThatThrownBy {
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing resposne`(statusResp)
statusResp = followerClient.replicationStatus(followerIndexName,false)
`validate status syncing aggregated resposne`(statusResp)
followerClient.resumeReplication(followerIndexName)
statusResp = followerClient.replicationStatus(followerIndexName)
`validate not paused status resposne`(statusResp)
statusResp = followerClient.replicationStatus(followerIndexName,false)
`validate not paused status aggregated resposne`(statusResp)
}.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("Replication on Index ${followerIndexName} is already running")
} finally {
Expand Down