Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add lastExecutionTime for autofollow coroutine (#508)
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>

Signed-off-by: Ankit Kala <[email protected]>
(cherry picked from commit 8f0a55c)
ankitkala authored and github-actions[bot] committed Aug 26, 2022
1 parent 9df9639 commit dabbb84
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
try {
addRetryScheduler()
pollForIndices()
stat.lastExecutionTime = System.currentTimeMillis()
delay(replicationSettings.autofollowFetchPollDuration.millis)
}
catch(e: OpenSearchException) {
@@ -252,6 +253,7 @@ class AutoFollowStat: Task.Status {
var failCounterForRun :Long=0
var successCount: Long=0
var failedLeaderCall :Long=0
var lastExecutionTime : Long=0


constructor(name: String, pattern: String) {
@@ -266,6 +268,7 @@ class AutoFollowStat: Task.Status {
failedIndices = inp.readSet(StreamInput::readString)
successCount = inp.readLong()
failedLeaderCall = inp.readLong()
lastExecutionTime = inp.readLong()
}

override fun writeTo(out: StreamOutput) {
@@ -275,6 +278,7 @@ class AutoFollowStat: Task.Status {
out.writeCollection(failedIndices, StreamOutput::writeString)
out.writeLong(successCount)
out.writeLong(failedLeaderCall)
out.writeLong(lastExecutionTime)
}

override fun getWriteableName(): String {
@@ -289,6 +293,8 @@ class AutoFollowStat: Task.Status {
builder.field("num_failed_start_replication", failCount)
builder.field("num_failed_leader_calls", failedLeaderCall)
builder.field("failed_indices", failedIndices)
builder.field("last_execution_time", lastExecutionTime)
return builder.endObject()
}

}
Original file line number Diff line number Diff line change
@@ -91,6 +91,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
.isEqualTo(true)
followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask)
followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask)

var stats = followerClient.AutoFollowStats()
var af_stats = stats.get("autofollow_stats")!! as ArrayList<HashMap<String, Any>>
for (key in af_stats) {
@@ -118,17 +119,39 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
TimeValue.timeValueSeconds(30))
val clusterUpdateSetttingsReq = ClusterUpdateSettingsRequest().persistentSettings(settings)
val clusterUpdateResponse = followerClient.cluster().putSettings(clusterUpdateSetttingsReq, RequestOptions.DEFAULT)

var lastExecutionTime = 0L
var stats = followerClient.AutoFollowStats()

Assert.assertTrue(clusterUpdateResponse.isAcknowledged)
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)

leaderIndexNameNew = createRandomIndex(leaderClient)
// Verify that newly created index on leader which match the pattern are also replicated.
assertBusy({
Assertions.assertThat(followerClient.indices()
.exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT))
.isEqualTo(true)
followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask)
var af_stats = stats.get("autofollow_stats")!! as ArrayList<HashMap<String, Any>>
for (key in af_stats) {
if(key["name"] == indexPatternName) {
Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(0L)
lastExecutionTime = key["last_execution_time"]!! as Long
}
}

}, 30, TimeUnit.SECONDS)

assertBusy({
var af_stats = stats.get("autofollow_stats")!! as ArrayList<HashMap<String, Any>>
for (key in af_stats) {
if(key["name"] == indexPatternName) {
Assertions.assertThat(key["last_execution_time"]!! as Long).isNotEqualTo(lastExecutionTime)
}
}
}, 40, TimeUnit.SECONDS)


} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)

0 comments on commit dabbb84

Please sign in to comment.