Skip to content

Commit

Permalink
BugFix: Changes to ensure replication tasks are not failing prematurely
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon committed Nov 5, 2021
1 parent c5079c9 commit b75b6a3
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import org.opensearch.replication.util.overrideFgacRole
import org.opensearch.replication.util.suspendExecute
import org.opensearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.OpenSearchException
import org.opensearch.ResourceNotFoundException
import org.opensearch.action.DocWriteResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -67,12 +70,14 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl
}

private suspend fun addMetadata(metadataReq: AddReplicationMetadataRequest) {
val response = replicaionMetadataStore.addMetadata(metadataReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while adding metadata")
throw org.opensearch.replication.ReplicationException("Error adding replication metadata")
}
executeAndWrapExceptionIfAny({
val response = replicaionMetadataStore.addMetadata(metadataReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while adding metadata")
throw ReplicationException("Error adding replication metadata")
}
}, log, "Error adding replication metadata")
}

suspend fun updateIndexReplicationState(followerIndex: String,
Expand Down Expand Up @@ -100,12 +105,14 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl
}

private suspend fun updateMetadata(updateReq: UpdateReplicationMetadataRequest) {
val response = replicaionMetadataStore.updateMetadata(updateReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while updating metadata")
throw org.opensearch.replication.ReplicationException("Error updating replication metadata")
}
executeAndWrapExceptionIfAny({
val response = replicaionMetadataStore.updateMetadata(updateReq)
if(response.result != DocWriteResponse.Result.CREATED &&
response.result != DocWriteResponse.Result.UPDATED) {
log.error("Encountered error with result - ${response.result}, while updating metadata")
throw org.opensearch.replication.ReplicationException("Error updating replication metadata")
}
}, log, "Error updating replication metadata")
}

suspend fun updateSettings(followerIndex: String, settings: Settings) {
Expand All @@ -129,31 +136,45 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl
}

private suspend fun deleteMetadata(deleteReq: DeleteReplicationMetadataRequest) {
val delRes = replicaionMetadataStore.deleteMetadata(deleteReq)
if(delRes.result != DocWriteResponse.Result.DELETED && delRes.result != DocWriteResponse.Result.NOT_FOUND) {
log.error("Encountered error with result - ${delRes.result}, while deleting metadata")
throw org.opensearch.replication.ReplicationException("Error deleting replication metadata")
}
executeAndWrapExceptionIfAny({
val delRes = replicaionMetadataStore.deleteMetadata(deleteReq)
if(delRes.result != DocWriteResponse.Result.DELETED && delRes.result != DocWriteResponse.Result.NOT_FOUND) {
log.error("Encountered error with result - ${delRes.result}, while deleting metadata")
throw ReplicationException("Error deleting replication metadata")
}
}, log, "Error deleting replication metadata")
}

suspend fun getIndexReplicationMetadata(followerIndex: String, fetch_from_primary: Boolean = false): ReplicationMetadata {
val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.INDEX.name, null, followerIndex)
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
return executeAndWrapExceptionIfAny({
replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
}, log, "Failed to fetch replication metadata") as ReplicationMetadata
}

fun getIndexReplicationMetadata(followerIndex: String,
connectionName: String?,
fetch_from_primary: Boolean = false,
timeout: Long = RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC): ReplicationMetadata {
val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.INDEX.name, connectionName, followerIndex)
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary, timeout).replicationMetadata
try {
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary, timeout).replicationMetadata
} catch (e: ResourceNotFoundException) {
log.error("Encountered exception - ", e)
throw e
} catch (e: Exception) {
log.error("Failed to fetch replication metadata", e)
throw ReplicationException("Failed to fetch replication metadata")
}
}

suspend fun getAutofollowMetadata(patternName: String,
connectionName: String,
fetch_from_primary: Boolean = false): ReplicationMetadata {
val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.AUTO_FOLLOW.name, connectionName, patternName)
return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
return executeAndWrapExceptionIfAny({
replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata
}, log, "Failed to fetch replication metadata") as ReplicationMetadata
}

private suspend fun updateReplicationState(indexName: String, overallState: ReplicationOverallState) {
Expand All @@ -165,6 +186,20 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl
}
val updateReplicationStateDetailsRequest = UpdateReplicationStateDetailsRequest(indexName, replicationStateParamMap,
updateType)
client.suspendExecute(UpdateReplicationStateAction.INSTANCE, updateReplicationStateDetailsRequest, defaultContext = true)
executeAndWrapExceptionIfAny({
client.suspendExecute(UpdateReplicationStateAction.INSTANCE, updateReplicationStateDetailsRequest, defaultContext = true)
}, log, "Error updating replicaiton metadata")
}

private suspend fun executeAndWrapExceptionIfAny(tryBlock: suspend () -> Any?, log: Logger, errorMessage: String): Any? {
try {
return tryBlock()
} catch (e: OpenSearchException) {
log.error("Encountered exception - ", e)
throw e
} catch (e: Exception) {
log.error("Encountered exception - ", e)
throw ReplicationException(errorMessage)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.ResourceNotFoundException
import org.opensearch.action.admin.cluster.health.ClusterHealthAction
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
Expand All @@ -25,12 +27,14 @@ import org.opensearch.action.delete.DeleteResponse
import org.opensearch.action.get.GetRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.client.Client
import org.opensearch.cluster.health.ClusterHealthStatus
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.common.xcontent.*
import org.opensearch.replication.util.suspendExecuteWithRetries

class ReplicationMetadataStore constructor(val client: Client, val clusterService: ClusterService,
val namedXContentRegistry: NamedXContentRegistry): AbstractLifecycleComponent() {
Expand Down Expand Up @@ -89,6 +93,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
}
}

checkAndWaitForStoreHealth()
checkAndUpdateMapping()

val id = getId(addReq.replicationMetadata.metadataType, addReq.replicationMetadata.connectionName,
Expand Down Expand Up @@ -125,11 +130,13 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}

checkAndWaitForStoreHealth()

val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
getReq.realtime(true)
getReq.refresh(true)
if(fetch_from_primary) {
val preference = getPreferenceOnPrimaryNode() ?: throw throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist")
val preference = getPreferenceOnPrimaryNode() ?: throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist")
getReq.preference(preference)
}

Expand All @@ -151,6 +158,10 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}

val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus()
val clusterHealthRes = client.admin().cluster().health(clusterHealthReq).actionGet(timeout)
assert(clusterHealthRes.status <= ClusterHealthStatus.YELLOW) { "Replication metadata store is unhealthy" }

val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
getReq.realtime(true)
getReq.refresh(true)
Expand Down Expand Up @@ -200,6 +211,8 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}

checkAndWaitForStoreHealth()

val delReq = DeleteRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
return client.suspending(client::delete, defaultContext = true)(delReq)
}
Expand All @@ -210,6 +223,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
if(!configStoreExists()) {
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}
checkAndWaitForStoreHealth()
checkAndUpdateMapping()

val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX, MAPPING_TYPE, id)
Expand Down Expand Up @@ -256,4 +270,14 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic

override fun doClose() {
}

private suspend fun checkAndWaitForStoreHealth() {
if(!configStoreExists()) {
return
}
val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus()
// This should ensure that security plugin and shards are active during boot-up before triggering the requests
val clusterHealthRes = client.suspendExecuteWithRetries(null, ClusterHealthAction.INSTANCE,
clusterHealthReq, log=log, defaultContext = true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import org.apache.logging.log4j.Logger
import org.opensearch.OpenSearchException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionResponse
import org.opensearch.client.Client
Expand All @@ -44,6 +46,7 @@ import org.opensearch.persistent.AllocatedPersistentTask
import org.opensearch.persistent.PersistentTaskState
import org.opensearch.persistent.PersistentTasksService
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.TaskId
import org.opensearch.tasks.TaskManager
import org.opensearch.threadpool.ThreadPool
Expand All @@ -65,6 +68,10 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
protected lateinit var replicationMetadata: ReplicationMetadata
@Volatile private lateinit var taskManager: TaskManager

companion object {
const val DEFAULT_WAIT_ON_ERRORS = 60000L
}

override fun init(persistentTasksService: PersistentTasksService, taskManager: TaskManager,
persistentTaskId: String, allocationId: Long) {
super.init(persistentTasksService, taskManager, persistentTaskId, allocationId)
Expand All @@ -85,7 +92,7 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
var exception : Throwable? = null
try {
registerCloseListeners()
setReplicationMetadata()
waitAndSetReplicationMetadata()
execute(this, initialState)
markAsCompleted()
} catch (e: Exception) {
Expand Down Expand Up @@ -190,22 +197,33 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
}
}

private suspend fun waitAndSetReplicationMetadata() {
if (this::replicationMetadata.isInitialized) {
return
} else {
while(overallTaskScope.isActive) {
try {
setReplicationMetadata()
return
} catch (e: OpenSearchException) {
if(e.status().status < 500 && e.status() != RestStatus.TOO_MANY_REQUESTS) {
throw e
}
log.error("Failed to fetch replication metadata due to ", e)
delay(DEFAULT_WAIT_ON_ERRORS)
}
}
}
}

/**
* Sets the security context
*/
open suspend fun setReplicationMetadata() {
replicationMetadata = if(this is AutoFollowTask) {
replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true)
}
else {
replicationMetadataManager.getIndexReplicationMetadata(followerIndexName, fetch_from_primary = true)
}
}
protected abstract suspend fun setReplicationMetadata()

//used only in testing
open suspend fun setReplicationMetadata(rm :ReplicationMetadata) {
replicationMetadata = rm

}

open class CrossClusterReplicationTaskResponse(val status: String): ActionResponse(), ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.replication.util.suspending
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchSecurityException
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.support.IndicesOptions
Expand All @@ -35,6 +36,7 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.persistent.PersistentTaskState
import org.opensearch.replication.ReplicationException
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.tasks.TaskId
import org.opensearch.threadpool.Scheduler
Expand Down Expand Up @@ -65,9 +67,21 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
stat = AutoFollowStat(params.patternName, replicationMetadata.leaderContext.resource)
while (scope.isActive) {
addRetryScheduler()
autoFollow()
delay(replicationSettings.autofollowFetchPollDuration.millis)
try {
addRetryScheduler()
autoFollow()
delay(replicationSettings.autofollowFetchPollDuration.millis)
}
catch(e: OpenSearchException) {
// Any transient error encountered during auto follow execution should be re-tried
val status = e.status().status
if(status < 500 && status != RestStatus.TOO_MANY_REQUESTS.status) {
log.error("Exiting autofollow task", e)
throw e
}
log.debug("Encountered transient error while running autofollow task", e)
delay(replicationSettings.autofollowFetchPollDuration.millis)
}
}
}

Expand Down Expand Up @@ -185,6 +199,10 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
override fun getStatus(): AutoFollowStat {
return stat
}

override suspend fun setReplicationMetadata() {
this.replicationMetadata = replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true)
}
}

class AutoFollowStat: Task.Status {
Expand Down
Loading

0 comments on commit b75b6a3

Please sign in to comment.