Skip to content

Commit

Permalink
BugFix: Changes to ensure replication tasks are not failing premature…
Browse files Browse the repository at this point in the history
…ly (#230)

Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon authored Nov 5, 2021
1 parent 6f06923 commit c0ac321
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import com.amazon.elasticsearch.replication.util.overrideFgacRole
import com.amazon.elasticsearch.replication.util.suspendExecute
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ResourceNotFoundException
import org.elasticsearch.action.DocWriteResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
Expand Down Expand Up @@ -53,12 +56,14 @@ class ReplicationMetadataManager constructor(private val clusterService: Cluster
}

private suspend fun addMetadata(metadataReq: AddReplicationMetadataRequest) {
val response = replicaionMetadataStore.addMetadata(metadataReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while adding metadata")
throw ReplicationException("Error adding replication metadata")
}
executeAndWrapExceptionIfAny({
val response = replicaionMetadataStore.addMetadata(metadataReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while adding metadata")
throw ReplicationException("Error adding replication metadata")
}
}, log, "Error adding replication metadata")
}

suspend fun updateIndexReplicationState(followerIndex: String,
Expand Down Expand Up @@ -86,12 +91,15 @@ class ReplicationMetadataManager constructor(private val clusterService: Cluster
}

private suspend fun updateMetadata(updateReq: UpdateReplicationMetadataRequest) {
val response = replicaionMetadataStore.updateMetadata(updateReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while updating metadata")
throw ReplicationException("Error updating replication metadata")
}
executeAndWrapExceptionIfAny({
val response = replicaionMetadataStore.updateMetadata(updateReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while updating metadata")
throw ReplicationException("Error updating replication metadata")
}
}, log, "Error updating replication metadata")

}

suspend fun updateSettings(followerIndex: String, settings: Settings) {
Expand All @@ -115,31 +123,45 @@ class ReplicationMetadataManager constructor(private val clusterService: Cluster
}

private suspend fun deleteMetadata(deleteReq: DeleteReplicationMetadataRequest) {
val delRes = replicaionMetadataStore.deleteMetadata(deleteReq)
if(delRes.result != DocWriteResponse.Result.DELETED && delRes.result != DocWriteResponse.Result.NOT_FOUND) {
log.error("Encountered error with result - ${delRes.result}, while deleting metadata")
throw ReplicationException("Error deleting replication metadata")
}
executeAndWrapExceptionIfAny({
val delRes = replicaionMetadataStore.deleteMetadata(deleteReq)
if(delRes.result != DocWriteResponse.Result.DELETED && delRes.result != DocWriteResponse.Result.NOT_FOUND) {
log.error("Encountered error with result - ${delRes.result}, while deleting metadata")
throw ReplicationException("Error deleting replication metadata")
}
}, log, "Error deleting replication metadata")
}

suspend fun getIndexReplicationMetadata(followerIndex: String, fetch_from_primary: Boolean = false): ReplicationMetadata {
val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.INDEX.name, null, followerIndex)
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
return executeAndWrapExceptionIfAny({
replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
}, log, "Failed to fetch replication metadata") as ReplicationMetadata
}

fun getIndexReplicationMetadata(followerIndex: String,
connectionName: String?,
fetch_from_primary: Boolean = false,
timeout: Long = RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC): ReplicationMetadata {
val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.INDEX.name, connectionName, followerIndex)
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary, timeout).replicationMetadata
try {
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary, timeout).replicationMetadata
} catch (e: ResourceNotFoundException) {
log.error("Encountered exception - ", e)
throw e
} catch (e: Exception) {
log.error("Failed to fetch replication metadata", e)
throw ReplicationException("Failed to fetch replication metadata")
}
}

suspend fun getAutofollowMetadata(patternName: String,
connectionName: String,
fetch_from_primary: Boolean = false): ReplicationMetadata {
val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.AUTO_FOLLOW.name, connectionName, patternName)
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
return executeAndWrapExceptionIfAny({
replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
}, log, "Failed to fetch replication metadata") as ReplicationMetadata
}

private suspend fun updateReplicationState(indexName: String, overallState: ReplicationOverallState) {
Expand All @@ -151,6 +173,20 @@ class ReplicationMetadataManager constructor(private val clusterService: Cluster
}
val updateReplicationStateDetailsRequest = UpdateReplicationStateDetailsRequest(indexName, replicationStateParamMap,
updateType)
client.suspendExecute(UpdateReplicationStateAction.INSTANCE, updateReplicationStateDetailsRequest, defaultContext = true)
executeAndWrapExceptionIfAny({
client.suspendExecute(UpdateReplicationStateAction.INSTANCE, updateReplicationStateDetailsRequest, defaultContext = true)
}, log, "Error updating replicaiton metadata")
}

private suspend fun executeAndWrapExceptionIfAny(tryBlock: suspend () -> Any?, log: Logger, errorMessage: String): Any? {
try {
return tryBlock()
} catch (e: ElasticsearchException) {
log.error("Encountered exception - ", e)
throw e
} catch (e: Exception) {
log.error("Encountered exception - ", e)
throw ReplicationException(errorMessage)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.amazon.elasticsearch.replication.metadata.store

import com.amazon.elasticsearch.replication.metadata.ReplicationMetadataManager
import com.amazon.elasticsearch.replication.util.execute
import com.amazon.elasticsearch.replication.util.suspendExecuteWithRetries
import com.amazon.elasticsearch.replication.util.suspending
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.ResourceAlreadyExistsException
import org.elasticsearch.ResourceNotFoundException
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest
Expand All @@ -14,6 +18,7 @@ import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.health.ClusterHealthStatus
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.component.AbstractLifecycleComponent
Expand Down Expand Up @@ -79,6 +84,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
}
}

checkAndWaitForStoreHealth()
checkAndUpdateMapping()

val id = getId(addReq.replicationMetadata.metadataType, addReq.replicationMetadata.connectionName,
Expand Down Expand Up @@ -115,11 +121,13 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}

checkAndWaitForStoreHealth()

val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
getReq.realtime(true)
getReq.refresh(true)
if(fetch_from_primary) {
val preference = getPreferenceOnPrimaryNode() ?: throw throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist")
val preference = getPreferenceOnPrimaryNode() ?: throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist")
getReq.preference(preference)
}

Expand All @@ -141,6 +149,11 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}


val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus()
val clusterHealthRes = client.admin().cluster().health(clusterHealthReq).actionGet(timeout)
assert(clusterHealthRes.status <= ClusterHealthStatus.YELLOW) { "Replication metadata store is unhealthy" }

val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
getReq.realtime(true)
getReq.refresh(true)
Expand Down Expand Up @@ -189,6 +202,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
if(!configStoreExists()) {
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}
checkAndWaitForStoreHealth()

val delReq = DeleteRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
return client.suspending(client::delete, defaultContext = true)(delReq)
Expand All @@ -200,6 +214,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
if(!configStoreExists()) {
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}
checkAndWaitForStoreHealth()
checkAndUpdateMapping()

val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX, MAPPING_TYPE, id)
Expand Down Expand Up @@ -246,4 +261,14 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic

override fun doClose() {
}

private suspend fun checkAndWaitForStoreHealth() {
if(!configStoreExists()) {
return
}
val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus()
// This should ensure that security plugin and shards are active during boot-up before triggering the requests
val clusterHealthRes = client.suspendExecuteWithRetries(null, ClusterHealthAction.INSTANCE,
clusterHealthReq, log=log, defaultContext = true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.client.Client
Expand All @@ -48,6 +50,7 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService
import org.elasticsearch.persistent.AllocatedPersistentTask
import org.elasticsearch.persistent.PersistentTaskState
import org.elasticsearch.persistent.PersistentTasksService
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.TaskId
import org.elasticsearch.tasks.TaskManager
import org.elasticsearch.threadpool.ThreadPool
Expand All @@ -69,6 +72,10 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
protected lateinit var replicationMetadata: ReplicationMetadata
@Volatile private lateinit var taskManager: TaskManager

companion object {
const val DEFAULT_WAIT_ON_ERRORS = 60000L
}

override fun init(persistentTasksService: PersistentTasksService, taskManager: TaskManager,
persistentTaskId: String, allocationId: Long) {
super.init(persistentTasksService, taskManager, persistentTaskId, allocationId)
Expand All @@ -89,7 +96,7 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
var exception : Throwable? = null
try {
registerCloseListeners()
setReplicationMetadata()
waitAndSetReplicationMetadata()
execute(this, initialState)
markAsCompleted()
} catch (e: Exception) {
Expand Down Expand Up @@ -194,17 +201,29 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
}
}

private suspend fun waitAndSetReplicationMetadata() {
if (this::replicationMetadata.isInitialized) {
return
} else {
while(overallTaskScope.isActive) {
try {
setReplicationMetadata()
return
} catch (e: ElasticsearchException) {
if(e.status().status < 500 && e.status() != RestStatus.TOO_MANY_REQUESTS) {
throw e
}
log.error("Failed to fetch replication metadata due to ", e)
delay(DEFAULT_WAIT_ON_ERRORS)
}
}
}
}

/**
* Sets the security context
*/
protected open suspend fun setReplicationMetadata() {
replicationMetadata = if(this is AutoFollowTask) {
replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true)
}
else {
replicationMetadataManager.getIndexReplicationMetadata(followerIndexName, fetch_from_primary = true)
}
}
protected abstract suspend fun setReplicationMetadata()

open class CrossClusterReplicationTaskResponse(val status: String): ActionResponse(), ToXContentObject {
override fun writeTo(out: StreamOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.amazon.elasticsearch.replication.util.suspending
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ElasticsearchSecurityException
import org.elasticsearch.action.admin.indices.get.GetIndexRequest
import org.elasticsearch.action.support.IndicesOptions
Expand All @@ -39,6 +40,7 @@ import org.elasticsearch.common.logging.Loggers
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.persistent.PersistentTaskState
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.tasks.TaskId
import org.elasticsearch.threadpool.Scheduler
Expand Down Expand Up @@ -69,9 +71,21 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
stat = AutoFollowStat(params.patternName, replicationMetadata.leaderContext.resource)
while (scope.isActive) {
addRetryScheduler()
autoFollow()
delay(replicationSettings.autofollowFetchPollDuration.millis)
try {
addRetryScheduler()
autoFollow()
delay(replicationSettings.autofollowFetchPollDuration.millis)
}
catch(e: ElasticsearchException) {
// Any transient error encountered during auto follow execution should be re-tried
val status = e.status().status
if(status < 500 && status != RestStatus.TOO_MANY_REQUESTS.status) {
log.error("Exiting autofollow task", e)
throw e
}
log.debug("Encountered transient error while running autofollow task", e)
delay(replicationSettings.autofollowFetchPollDuration.millis)
}
}
}

Expand Down Expand Up @@ -189,6 +203,10 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
override fun getStatus(): AutoFollowStat {
return stat
}

override suspend fun setReplicationMetadata() {
this.replicationMetadata = replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true)
}
}

class AutoFollowStat: Task.Status {
Expand Down
Loading

0 comments on commit c0ac321

Please sign in to comment.