diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/autofollow/AutoFollowTask.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/autofollow/AutoFollowTask.kt index db884c0d..98c89456 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/autofollow/AutoFollowTask.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/autofollow/AutoFollowTask.kt @@ -74,6 +74,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String try { addRetryScheduler() autoFollow() + stat.lastExecutionTime = System.currentTimeMillis() delay(replicationSettings.autofollowFetchPollDuration.millis) } catch(e: ElasticsearchException) { @@ -221,6 +222,7 @@ class AutoFollowStat: Task.Status { var failCounterForRun :Long=0 var successCount: Long=0 var failedLeaderCall :Long=0 + var lastExecutionTime : Long=0 constructor(name: String, pattern: String) { @@ -235,6 +237,7 @@ class AutoFollowStat: Task.Status { failedIndices = inp.readSet(StreamInput::readString) successCount = inp.readLong() failedLeaderCall = inp.readLong() + lastExecutionTime = inp.readLong() } override fun writeTo(out: StreamOutput) { @@ -244,6 +247,7 @@ class AutoFollowStat: Task.Status { out.writeCollection(failedIndices, StreamOutput::writeString) out.writeLong(successCount) out.writeLong(failedLeaderCall) + out.writeLong(lastExecutionTime) } override fun getWriteableName(): String { @@ -258,6 +262,8 @@ class AutoFollowStat: Task.Status { builder.field("num_failed_start_replication", failCount) builder.field("num_failed_leader_calls", failedLeaderCall) builder.field("failed_indices", failedIndices) + builder.field("last_execution_time", lastExecutionTime) return builder.endObject() } + } diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt index dd9d1972..486235b6 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt @@ -93,6 +93,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { .isEqualTo(true) followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask) followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) + var stats = followerClient.AutoFollowStats() var af_stats = stats.get("autofollow_stats")!! as ArrayList> for (key in af_stats) { @@ -120,8 +121,13 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { TimeValue.timeValueSeconds(30)) val clusterUpdateSetttingsReq = ClusterUpdateSettingsRequest().persistentSettings(settings) val clusterUpdateResponse = followerClient.cluster().putSettings(clusterUpdateSetttingsReq, RequestOptions.DEFAULT) + + var lastExecutionTime = 0L + var stats = followerClient.AutoFollowStats() + Assert.assertTrue(clusterUpdateResponse.isAcknowledged) followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) + leaderIndexNameNew = createRandomIndex(leaderClient) // Verify that newly created index on leader which match the pattern are also replicated. assertBusy({ @@ -129,8 +135,25 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { .exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT)) .isEqualTo(true) followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask) + var af_stats = stats.get("autofollow_stats")!! as ArrayList> + for (key in af_stats) { + if(key["name"] == indexPatternName) { + Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(0L) + lastExecutionTime = key["last_execution_time"]!! as Long + } + } + }, 30, TimeUnit.SECONDS) + assertBusy({ + var af_stats = stats.get("autofollow_stats")!! as ArrayList> + for (key in af_stats) { + if(key["name"] == indexPatternName) { + Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(lastExecutionTime) + } + } + }, 40, TimeUnit.SECONDS) + } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)