Skip to content

Commit

Permalink
Removing All Usages of Action Get Method Calls and adding the listene…
Browse files Browse the repository at this point in the history
…rs (#130)

Signed-off-by: Aditya Jindal <[email protected]>
  • Loading branch information
adityaj1107 authored Jul 22, 2021
1 parent 0c4c2fc commit ff57c54
Showing 1 changed file with 75 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
package org.opensearch.alerting.alerts

import org.apache.logging.log4j.LogManager
import org.apache.lucene.index.IndexNotFoundException
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
Expand All @@ -38,6 +39,7 @@ import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX
Expand Down Expand Up @@ -145,7 +147,6 @@ class AlertIndices(

fun onMaster() {
try {
// TODO: Change current actionGet requests within rolloverHistoryIndex() rolloverAndDeleteHistoryIndices() to use suspendUntil
// try to rollover immediately as we might be restarting the cluster
rolloverHistoryIndex()
// schedule the next rollover for approx MAX_AGE later
Expand Down Expand Up @@ -279,9 +280,9 @@ class AlertIndices(
deleteOldHistoryIndices()
}

private fun rolloverHistoryIndex(): Boolean {
private fun rolloverHistoryIndex() {
if (!historyIndexInitialized) {
return false
return
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
Expand All @@ -291,17 +292,24 @@ class AlertIndices(
.settings(Settings.builder().put("index.hidden", true).build())
request.addMaxIndexDocsCondition(historyMaxDocs)
request.addMaxIndexAgeCondition(historyMaxAge)
val response = client.admin().indices().rolloverIndex(request).actionGet(requestTimeout)
if (!response.isRolledOver) {
logger.info("$HISTORY_WRITE_INDEX not rolled over. Conditions were: ${response.conditionStatus}")
} else {
lastRolloverTime = TimeValue.timeValueMillis(threadPool.absoluteTimeInMillis())
}
return response.isRolledOver
client.admin().indices().rolloverIndex(
request,
object : ActionListener<RolloverResponse> {
override fun onResponse(response: RolloverResponse) {
if (!response.isRolledOver) {
logger.info("$HISTORY_WRITE_INDEX not rolled over. Conditions were: ${response.conditionStatus}")
} else {
lastRolloverTime = TimeValue.timeValueMillis(threadPool.absoluteTimeInMillis())
}
}
override fun onFailure(e: Exception) {
logger.error("$HISTORY_WRITE_INDEX not roll over failed.")
}
}
)
}

private fun deleteOldHistoryIndices() {
val indicesToDelete = mutableListOf<String>()

val clusterStateRequest = ClusterStateRequest()
.clear()
Expand All @@ -310,8 +318,27 @@ class AlertIndices(
.local(true)
.indicesOptions(IndicesOptions.strictExpand())

val clusterStateResponse = client.admin().cluster().state(clusterStateRequest).actionGet()
client.admin().cluster().state(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
logger.info("Deleting old history indices viz $indicesToDelete")
deleteAllOldHistoryIndices(indicesToDelete)
} else {
logger.info("No Old History Indices to delete")
}
}
override fun onFailure(e: Exception) {
logger.error("Error fetching cluster state")
}
}
)
}

private fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List<String> {
val indicesToDelete = mutableListOf<String>()
for (entry in clusterStateResponse.state.metadata.indices) {
val indexMetaData = entry.value
val creationTime = indexMetaData.creationDate
Expand All @@ -331,26 +358,48 @@ class AlertIndices(
indicesToDelete.add(indexMetaData.index.name)
}
}
return indicesToDelete
}

private fun deleteAllOldHistoryIndices(indicesToDelete: List<String>) {
if (indicesToDelete.isNotEmpty()) {
val deleteIndexRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
val deleteIndexResponse = client.admin().indices().delete(deleteIndexRequest).actionGet()

if (!deleteIndexResponse.isAcknowledged) {
logger.error("Could not delete one or more Alerting history indices: $indicesToDelete. Retrying one by one.")
for (index in indicesToDelete) {
try {
val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
val singleDeleteResponse = client.admin().indices().delete(singleDeleteRequest).actionGet()
client.admin().indices().delete(
deleteIndexRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(deleteIndicesResponse: AcknowledgedResponse) {
if (!deleteIndicesResponse.isAcknowledged) {
logger.error("Could not delete one or more Alerting history indices: $indicesToDelete. Retrying one by one.")
deleteOldHistoryIndex(indicesToDelete)
}
}
override fun onFailure(e: Exception) {
logger.error("Delete for Alerting History Indices $indicesToDelete Failed. Retrying one By one.")
deleteOldHistoryIndex(indicesToDelete)
}
}
)
}
}

if (!singleDeleteResponse.isAcknowledged) {
logger.error("Could not delete one or more Alerting history indices: $index")
private fun deleteOldHistoryIndex(indicesToDelete: List<String>) {
for (index in indicesToDelete) {
val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
client.admin().indices().delete(
singleDeleteRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(acknowledgedResponse: AcknowledgedResponse?) {
if (acknowledgedResponse != null) {
if (!acknowledgedResponse.isAcknowledged) {
logger.error("Could not delete one or more Alerting history indices: $index")
}
}
} catch (e: IndexNotFoundException) {
logger.debug("$index was already deleted. ${e.message}")
}
override fun onFailure(e: Exception) {
logger.debug("Exception ${e.message} while deleting the index $index")
}
}
}
)
}
}
}

0 comments on commit ff57c54

Please sign in to comment.