Skip to content

Commit

Permalink
Added exception check once the .opendistro-alerting-config index is b… (
Browse files Browse the repository at this point in the history
#650)

* Added exception check once the .opendistro-alerting-config index is being created

During .opendistro-alerting-config index creation, if ResourceAlreadyExists exception is being raised, the flow will check first if the index is in yellow state and then it will re-try to index monitor

Signed-off-by: Stevan Buzejic <[email protected]>

* Formating of the file fixed

Signed-off-by: Stevan Buzejic <[email protected]>

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored Nov 7, 2022
1 parent f842a04 commit ceff609
Showing 1 changed file with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthAction
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
Expand Down Expand Up @@ -297,10 +302,30 @@ class TransportIndexMonitorAction @Inject constructor(
if (!scheduledJobIndices.scheduledJobIndexExists()) {
scheduledJobIndices.initScheduledJobIndex(object : ActionListener<CreateIndexResponse> {
override fun onResponse(response: CreateIndexResponse) {
onCreateMappingsResponse(response)
onCreateMappingsResponse(response.isAcknowledged)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
// https://github.com/opensearch-project/alerting/issues/646
if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) {
scope.launch {
// Wait for the yellow status
val request = ClusterHealthRequest()
.indices(SCHEDULED_JOBS_INDEX)
.waitForYellowStatus()
val response: ClusterHealthResponse = client.suspendUntil {
execute(ClusterHealthAction.INSTANCE, request, it)
}
if (response.isTimedOut) {
actionListener.onFailure(
OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy")
)
}
// Retry mapping of monitor
onCreateMappingsResponse(true)
}
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
})
} else if (!IndexUtils.scheduledJobIndexUpdated) {
Expand Down Expand Up @@ -346,6 +371,7 @@ class TransportIndexMonitorAction @Inject constructor(
val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE))
val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout)
val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource)

client.search(
searchRequest,
object : ActionListener<SearchResponse> {
Expand Down Expand Up @@ -401,8 +427,8 @@ class TransportIndexMonitorAction @Inject constructor(
}
}

private fun onCreateMappingsResponse(response: CreateIndexResponse) {
if (response.isAcknowledged) {
private fun onCreateMappingsResponse(isAcknowledged: Boolean) {
if (isAcknowledged) {
log.info("Created $SCHEDULED_JOBS_INDEX with mappings.")
prepareMonitorIndexing()
IndexUtils.scheduledJobIndexUpdated()
Expand Down

0 comments on commit ceff609

Please sign in to comment.