Skip to content

Commit

Permalink
Add lastExecutionTime for autofollow coroutine (#508) (#530)
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>

Signed-off-by: Ankit Kala <[email protected]>

Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala authored Aug 29, 2022
1 parent 6e8e821 commit 62c479c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
try {
addRetryScheduler()
autoFollow()
stat.lastExecutionTime = System.currentTimeMillis()
delay(replicationSettings.autofollowFetchPollDuration.millis)
}
catch(e: ElasticsearchException) {
Expand Down Expand Up @@ -221,6 +222,7 @@ class AutoFollowStat: Task.Status {
var failCounterForRun :Long=0
var successCount: Long=0
var failedLeaderCall :Long=0
var lastExecutionTime : Long=0


constructor(name: String, pattern: String) {
Expand All @@ -235,6 +237,7 @@ class AutoFollowStat: Task.Status {
failedIndices = inp.readSet(StreamInput::readString)
successCount = inp.readLong()
failedLeaderCall = inp.readLong()
lastExecutionTime = inp.readLong()
}

override fun writeTo(out: StreamOutput) {
Expand All @@ -244,6 +247,7 @@ class AutoFollowStat: Task.Status {
out.writeCollection(failedIndices, StreamOutput::writeString)
out.writeLong(successCount)
out.writeLong(failedLeaderCall)
out.writeLong(lastExecutionTime)
}

override fun getWriteableName(): String {
Expand All @@ -258,6 +262,8 @@ class AutoFollowStat: Task.Status {
builder.field("num_failed_start_replication", failCount)
builder.field("num_failed_leader_calls", failedLeaderCall)
builder.field("failed_indices", failedIndices)
builder.field("last_execution_time", lastExecutionTime)
return builder.endObject()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
.isEqualTo(true)
followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask)
followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask)

var stats = followerClient.AutoFollowStats()
var af_stats = stats.get("autofollow_stats")!! as ArrayList<HashMap<String, Any>>
for (key in af_stats) {
Expand Down Expand Up @@ -120,17 +121,39 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
TimeValue.timeValueSeconds(30))
val clusterUpdateSetttingsReq = ClusterUpdateSettingsRequest().persistentSettings(settings)
val clusterUpdateResponse = followerClient.cluster().putSettings(clusterUpdateSetttingsReq, RequestOptions.DEFAULT)

var lastExecutionTime = 0L
var stats = followerClient.AutoFollowStats()

Assert.assertTrue(clusterUpdateResponse.isAcknowledged)
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)

leaderIndexNameNew = createRandomIndex(leaderClient)
// Verify that newly created index on leader which match the pattern are also replicated.
assertBusy({
Assertions.assertThat(followerClient.indices()
.exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT))
.isEqualTo(true)
followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask)
var af_stats = stats.get("autofollow_stats")!! as ArrayList<HashMap<String, Any>>
for (key in af_stats) {
if(key["name"] == indexPatternName) {
Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(0L)
lastExecutionTime = key["last_execution_time"]!! as Long
}
}

}, 30, TimeUnit.SECONDS)

assertBusy({
var af_stats = stats.get("autofollow_stats")!! as ArrayList<HashMap<String, Any>>
for (key in af_stats) {
if(key["name"] == indexPatternName) {
Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(lastExecutionTime)
}
}
}, 40, TimeUnit.SECONDS)


} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
Expand Down

0 comments on commit 62c479c

Please sign in to comment.