diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ReplicationStatusResponse.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ReplicationStatusResponse.kt index bec57c04..41651d8c 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ReplicationStatusResponse.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ReplicationStatusResponse.kt @@ -17,6 +17,9 @@ class ReplicationStatusResponse : BroadcastResponse, ToXContentObject { lateinit var connectionAlias: String lateinit var leaderIndexName: String lateinit var followerIndexName: String + lateinit var aggregatedReplayDetails: ReplayDetails + lateinit var aggregatedRestoreDetails: RestoreDetails + var isVerbose: Boolean = true @Throws(IOException::class) constructor(inp: StreamInput) : super(inp) { @@ -62,13 +65,17 @@ class ReplicationStatusResponse : BroadcastResponse, ToXContentObject { if (::status.isInitialized) builder.field("status",status) if (::connectionAlias.isInitialized) - builder.field("connection_alias",connectionAlias) + builder.field("remote_cluster",connectionAlias) if (::leaderIndexName.isInitialized) builder.field("leader_index",leaderIndexName) if (::followerIndexName.isInitialized) builder.field("follower_index",followerIndexName) - if (::shardInfoResponse.isInitialized) - builder.field("replication_data",shardInfoResponse) + if (::aggregatedReplayDetails.isInitialized) + builder.field("syncing_details",aggregatedReplayDetails) + if (::aggregatedRestoreDetails.isInitialized) + builder.field("bootstrap_details",aggregatedRestoreDetails) + if (isVerbose and ::shardInfoResponse.isInitialized) + builder.field("shard_replication_details",shardInfoResponse) builder.endObject() return builder } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoRequest.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoRequest.kt index b0268c5c..0e2596ea 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoRequest.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoRequest.kt @@ -12,11 +12,17 @@ import org.elasticsearch.common.xcontent.XContentBuilder class ShardInfoRequest : BroadcastRequest , ToXContentObject { var indexName: String + var verbose: Boolean = false constructor(indexName: String) { this.indexName = indexName } + constructor(indexName: String,verbose: Boolean) { + this.indexName = indexName + this.verbose = verbose + } + constructor(inp: StreamInput): super(inp) { indexName = inp.readString() } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoResponse.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoResponse.kt index 16c74025..51ac7f56 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoResponse.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/ShardInfoResponse.kt @@ -53,7 +53,6 @@ class ShardInfoResponse : BroadcastShardResponse, ToXContentObject { } private val SHARDID = ParseField("shard_id") - private val DOCCOUNT = ParseField("doc_count") private val REPLAYDETAILS = ParseField("syncing_task_details") private val RESTOREDETAILS = ParseField("bootstrap_task_details") @@ -73,6 +72,9 @@ class ShardInfoResponse : BroadcastShardResponse, ToXContentObject { fun isReplayDetailsInitialized(): Boolean { return ::replayDetails.isInitialized } + fun isRestoreDetailsInitialized(): Boolean { + return ::restoreDetails.isInitialized + } } class RestoreDetails : BroadcastResponse, ToXContentObject { diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TransportReplicationStatusAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TransportReplicationStatusAction.kt index 08552840..a65feb07 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TransportReplicationStatusAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/status/TransportReplicationStatusAction.kt @@ -1,12 +1,14 @@ package com.amazon.elasticsearch.replication.action.status +import com.amazon.elasticsearch.replication.ReplicationException import com.amazon.elasticsearch.replication.metadata.ReplicationMetadataManager import com.amazon.elasticsearch.replication.util.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.elasticsearch.ResourceNotFoundException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.HandledTransportAction @@ -67,14 +69,66 @@ class TransportReplicationStatusAction @Inject constructor(transportService: Tra followerResponse.followerIndexName = metadata.followerContext.resource followerResponse.leaderIndexName = metadata.leaderContext.resource followerResponse.status = status + populateAggregatedResponse(followerResponse) + if (!request.verbose) { + followerResponse.isVerbose = false + } followerResponse - } catch(e : Exception) { - // TODO : when we get resoucenotfound exception show replication status - // as not in progess and in case of generic excpetion, throw replication_exception. + } catch (e : ResourceNotFoundException) { + log.error("got ResourceNotFoundException while querying for status ",e) ReplicationStatusResponse("REPLICATION NOT IN PROGRESS") + } catch(e : Exception) { + log.error("got Exception while querying for status ",e) + throw ReplicationException("failed to fetch replication status") } } } } + + private fun populateAggregatedResponse(followerResponse: ReplicationStatusResponse) { + var aggregatedRemoteCheckpoint: Long = 0 + var aggregatedLocalCheckpoint: Long = 0 + var aggregatedSeqNo: Long = 0 + var anyShardInReplay: Boolean = false + var anyShardInRestore: Boolean = false + var aggregateTotalBytes: Long = 0 + var aggregateRecoveredBytes: Long = 0 + var aggregateRecovereyPercentage: Float = 0F + var aggregateTotalFiles: Int = 0 + var aggregateRecoveredFiles: Int = 0 + var aggregateFileRecovereyPercentage: Float = 0F + var startTime: Long = Long.MAX_VALUE + var time: Long = 0 + var numberOfShardsiInRestore: Int = 0 + + + followerResponse.shardInfoResponse.forEach { + if (it.isReplayDetailsInitialized()) { + aggregatedRemoteCheckpoint += it.replayDetails.remoteCheckpoint() + aggregatedLocalCheckpoint += it.replayDetails.localCheckpoint() + aggregatedSeqNo += it.replayDetails.seqNo() + anyShardInReplay = true + } + if (it.isRestoreDetailsInitialized()) { + anyShardInRestore = true + aggregateTotalBytes += it.restoreDetails.totalBytes + aggregateRecoveredBytes += it.restoreDetails.recoveredBytes + aggregateRecovereyPercentage = (numberOfShardsiInRestore * aggregateRecovereyPercentage + it.restoreDetails.recovereyPercentage) / (numberOfShardsiInRestore + 1) + aggregateFileRecovereyPercentage = (numberOfShardsiInRestore * aggregateFileRecovereyPercentage + it.restoreDetails.fileRecovereyPercentage) / (numberOfShardsiInRestore + 1) + numberOfShardsiInRestore++ + aggregateTotalFiles += it.restoreDetails.totalFiles + aggregateRecoveredFiles += it.restoreDetails.recoveredFiles + startTime = Math.min(startTime, it.restoreDetails.startTime) + time = Math.max(time, it.restoreDetails.time) + } + } + if (anyShardInReplay) { + followerResponse.aggregatedReplayDetails = ReplayDetails(aggregatedRemoteCheckpoint, aggregatedLocalCheckpoint, aggregatedSeqNo) + } + if (anyShardInRestore) { + followerResponse.aggregatedRestoreDetails = RestoreDetails(aggregateTotalBytes, aggregateRecoveredBytes, aggregateRecovereyPercentage + , aggregateTotalFiles, aggregateRecoveredFiles, aggregateFileRecovereyPercentage, startTime, time) + } + } } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/rest/ReplicationStatusHandler.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/rest/ReplicationStatusHandler.kt index 6fbaefdf..a095da6e 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/rest/ReplicationStatusHandler.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/rest/ReplicationStatusHandler.kt @@ -1,11 +1,12 @@ package com.amazon.elasticsearch.replication.rest -import com.amazon.elasticsearch.replication.action.status.ShardInfoRequest import com.amazon.elasticsearch.replication.action.status.ReplicationStatusAction +import com.amazon.elasticsearch.replication.action.status.ShardInfoRequest import org.apache.logging.log4j.LogManager import org.elasticsearch.client.node.NodeClient import org.elasticsearch.rest.BaseRestHandler +import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer import org.elasticsearch.rest.RestHandler import org.elasticsearch.rest.RestRequest import org.elasticsearch.rest.action.RestToXContentListener @@ -28,7 +29,8 @@ class ReplicationStatusHandler : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { val index = request.param("index") - val indexReplicationStatusRequest = ShardInfoRequest(index) + var isVerbose = (request.paramAsBoolean("verbose", false)) + val indexReplicationStatusRequest = ShardInfoRequest(index,isVerbose) return RestChannelConsumer { channel -> client.execute(ReplicationStatusAction.INSTANCE, indexReplicationStatusRequest, RestToXContentListener(channel)) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt index 82ca5cba..dd8b11fd 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt @@ -43,6 +43,7 @@ const val REST_REPLICATION_STOP = "$REST_REPLICATION_PREFIX{index}/_stop" const val REST_REPLICATION_PAUSE = "$REST_REPLICATION_PREFIX{index}/_pause" const val REST_REPLICATION_RESUME = "$REST_REPLICATION_PREFIX{index}/_resume" const val REST_REPLICATION_UPDATE = "$REST_REPLICATION_PREFIX{index}/_update" +const val REST_REPLICATION_STATUS_VERBOSE = "$REST_REPLICATION_PREFIX{index}/_status?verbose=true" const val REST_REPLICATION_STATUS = "$REST_REPLICATION_PREFIX{index}/_status" const val REST_AUTO_FOLLOW_PATTERN = "${REST_REPLICATION_PREFIX}_autofollow" @@ -71,8 +72,8 @@ fun getAckResponse(lowLevelResponse: Response): AcknowledgedResponse { return AcknowledgedResponse.fromXContent(xcp) } -fun RestHighLevelClient.replicationStatus(index: String) : Map { - val lowLevelStopRequest = Request("GET", REST_REPLICATION_STATUS.replace("{index}", index,true)) +fun RestHighLevelClient.replicationStatus(index: String,verbose: Boolean = true) : Map { + var lowLevelStopRequest = if(!verbose) Request("GET", REST_REPLICATION_STATUS.replace("{index}", index,true)) else Request("GET", REST_REPLICATION_STATUS_VERBOSE.replace("{index}", index,true)) lowLevelStopRequest.setJsonEntity("{}") val lowLevelStatusResponse = lowLevelClient.performRequest(lowLevelStopRequest) val statusResponse: Map = ESRestTestCase.entityAsMap(lowLevelStatusResponse) @@ -81,23 +82,41 @@ fun RestHighLevelClient.replicationStatus(index: String) : Map { fun `validate status syncing resposne`(statusResp: Map) { Assert.assertEquals(statusResp.getValue("status"),"SYNCING") - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("syncing_task_details")) - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("local_checkpoint")) - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("remote_checkpoint")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("syncing_task_details")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("local_checkpoint")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("remote_checkpoint")) +} + +fun `validate status syncing aggregated resposne`(statusResp: Map) { + Assert.assertEquals(statusResp.getValue("status"),"SYNCING") + Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("local_checkpoint")) + Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("remote_checkpoint")) } fun `validate not paused status resposne`(statusResp: Map) { Assert.assertNotEquals(statusResp.getValue("status"),"PAUSED") - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("syncing_task_details")) - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("local_checkpoint")) - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("remote_checkpoint")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("syncing_task_details")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("local_checkpoint")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("remote_checkpoint")) +} + +fun `validate not paused status aggregated resposne`(statusResp: Map) { + Assert.assertNotEquals(statusResp.getValue("status"),"PAUSED") + Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("local_checkpoint")) + Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("remote_checkpoint")) } fun `validate paused status resposne`(statusResp: Map) { Assert.assertEquals(statusResp.getValue("status"),"PAUSED") - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("syncing_task_details")) - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("local_checkpoint")) - Assert.assertTrue((statusResp.getValue("replication_data")).toString().contains("remote_checkpoint")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("syncing_task_details")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("local_checkpoint")) + Assert.assertTrue((statusResp.getValue("shard_replication_details")).toString().contains("remote_checkpoint")) +} + +fun `validate aggregated paused status resposne`(statusResp: Map) { + Assert.assertEquals(statusResp.getValue("status"),"PAUSED") + Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("local_checkpoint")) + Assert.assertTrue((statusResp.getValue("syncing_details")).toString().contains("remote_checkpoint")) } fun RestHighLevelClient.stopReplication(index: String, shouldWait: Boolean = true) { diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt index 5b4e1865..0b8adb61 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt @@ -151,6 +151,8 @@ class PauseReplicationIT: MultiClusterRestTestCase() { followerClient.pauseReplication(randomIndex) var statusResp = followerClient.replicationStatus(followerIndexName) `validate paused status resposne`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate aggregated paused status resposne`(statusResp) }.isInstanceOf(ResponseException::class.java) .hasMessageContaining("No replication in progress for index:$randomIndex") } @@ -172,6 +174,8 @@ class PauseReplicationIT: MultiClusterRestTestCase() { followerClient.pauseReplication(followerIndexName) var statusResp = followerClient.replicationStatus(followerIndexName) `validate paused status resposne`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate aggregated paused status resposne`(statusResp) // Since, we were still in FOLLOWING phase when pause was called, the index // in follower index should not have been deleted in follower cluster assertBusy { diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt index 26d780d2..73b6ff29 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt @@ -69,6 +69,8 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { followerClient.pauseReplication(followerIndexName) var statusResp = followerClient.replicationStatus(followerIndexName) `validate paused status resposne`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate aggregated paused status resposne`(statusResp) followerClient.resumeReplication(followerIndexName) } finally { followerClient.stopReplication(followerIndexName) @@ -89,9 +91,13 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { assertThatThrownBy { var statusResp = followerClient.replicationStatus(followerIndexName) `validate status syncing resposne`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate status syncing aggregated resposne`(statusResp) followerClient.resumeReplication(followerIndexName) statusResp = followerClient.replicationStatus(followerIndexName) `validate not paused status resposne`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate not paused status aggregated resposne`(statusResp) }.isInstanceOf(ResponseException::class.java) .hasMessageContaining("Replication on Index ${followerIndexName} is already running") } finally {