From dabbb84be2388c4b05b7e4dea0dc33d9e20535a3 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Fri, 26 Aug 2022 14:00:04 +0530 Subject: [PATCH] Add lastExecutionTime for autofollow coroutine (#508) Signed-off-by: Ankit Kala Signed-off-by: Ankit Kala (cherry picked from commit 8f0a55c9a7e32f643345a972d28e4829d7e3c530) --- .../task/autofollow/AutoFollowTask.kt | 6 +++++ .../integ/rest/UpdateAutoFollowPatternIT.kt | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt index 570e39b7..0685b79d 100644 --- a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt @@ -74,6 +74,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String try { addRetryScheduler() pollForIndices() + stat.lastExecutionTime = System.currentTimeMillis() delay(replicationSettings.autofollowFetchPollDuration.millis) } catch(e: OpenSearchException) { @@ -252,6 +253,7 @@ class AutoFollowStat: Task.Status { var failCounterForRun :Long=0 var successCount: Long=0 var failedLeaderCall :Long=0 + var lastExecutionTime : Long=0 constructor(name: String, pattern: String) { @@ -266,6 +268,7 @@ class AutoFollowStat: Task.Status { failedIndices = inp.readSet(StreamInput::readString) successCount = inp.readLong() failedLeaderCall = inp.readLong() + lastExecutionTime = inp.readLong() } override fun writeTo(out: StreamOutput) { @@ -275,6 +278,7 @@ class AutoFollowStat: Task.Status { out.writeCollection(failedIndices, StreamOutput::writeString) out.writeLong(successCount) out.writeLong(failedLeaderCall) + out.writeLong(lastExecutionTime) } override fun getWriteableName(): String { @@ -289,6 +293,8 @@ class AutoFollowStat: Task.Status { builder.field("num_failed_start_replication", failCount) builder.field("num_failed_leader_calls", failedLeaderCall) builder.field("failed_indices", failedIndices) + builder.field("last_execution_time", lastExecutionTime) return builder.endObject() } + } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt index d1a757bd..82db8fb5 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt @@ -91,6 +91,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { .isEqualTo(true) followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask) followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) + var stats = followerClient.AutoFollowStats() var af_stats = stats.get("autofollow_stats")!! as ArrayList> for (key in af_stats) { @@ -118,8 +119,13 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { TimeValue.timeValueSeconds(30)) val clusterUpdateSetttingsReq = ClusterUpdateSettingsRequest().persistentSettings(settings) val clusterUpdateResponse = followerClient.cluster().putSettings(clusterUpdateSetttingsReq, RequestOptions.DEFAULT) + + var lastExecutionTime = 0L + var stats = followerClient.AutoFollowStats() + Assert.assertTrue(clusterUpdateResponse.isAcknowledged) followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) + leaderIndexNameNew = createRandomIndex(leaderClient) // Verify that newly created index on leader which match the pattern are also replicated. assertBusy({ @@ -127,8 +133,25 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { .exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT)) .isEqualTo(true) followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask) + var af_stats = stats.get("autofollow_stats")!! as ArrayList> + for (key in af_stats) { + if(key["name"] == indexPatternName) { + Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(0L) + lastExecutionTime = key["last_execution_time"]!! as Long + } + } + }, 30, TimeUnit.SECONDS) + assertBusy({ + var af_stats = stats.get("autofollow_stats")!! as ArrayList> + for (key in af_stats) { + if(key["name"] == indexPatternName) { + Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(lastExecutionTime) + } + } + }, 40, TimeUnit.SECONDS) + } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)