Skip to content

Commit

Permalink
refactored DeleteMonitor API; fixed IndexMonitor bug
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz committed Dec 30, 2022
1 parent 0618056 commit c688613
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.delete.DeleteResponse
import org.opensearch.action.get.GetRequest
Expand All @@ -22,6 +24,7 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand Down Expand Up @@ -152,33 +155,51 @@ class TransportDeleteMonitorAction @Inject constructor(
val metadata = MonitorMetadataService.getMetadata(monitor)
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->

if (!clusterState.routingTable.hasIndex(queryIndex)) {
val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
}
if (indicesExistsResponse.isExists == false) {
return
}
// Delete all queries added by this monitor
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
)
}

// Get document count on that index to check if we should delete it
// Check if there's any queries from other monitors in this queryIndex,
// to avoid unnecessary doc deletion, if we could just delete index completely
val searchResponse: SearchResponse = client.suspendUntil {
search(SearchRequest(queryIndex).source(SearchSourceBuilder().size(0)), it)
search(
SearchRequest(queryIndex).source(
SearchSourceBuilder()
.size(0)
.query(
QueryBuilders.boolQuery().mustNot(
QueryBuilders.matchQuery("monitor_id", monitorId)
)
)
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
it
)
}
if (searchResponse.hits.totalHits.value == 0L) {
val ack: AcknowledgedResponse = client.suspendUntil {
client.admin().indices().delete(DeleteIndexRequest(queryIndex), it)
client.admin().indices().delete(
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
)
}
if (ack.isAcknowledged == false) {
log.error("Failed to delete concrete queryIndex:$queryIndex during monitor deletion")
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
}
} else {
// Delete all queries added by this monitor
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ class TransportIndexMonitorAction @Inject constructor(
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(metadata, updating = true)

actionListener.onResponse(
IndexMonitorResponse(
Expand Down Expand Up @@ -625,14 +627,14 @@ class TransportIndexMonitorAction @Inject constructor(
// Delete and insert all queries from/to queryIndex
if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor)
updatedMetadata = MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true)
client.suspendUntil<Client, BulkByScrollResponse> {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(currentMonitor.dataSources.queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id))
.execute(it)
}
indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy)
MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true)
}
actionListener.onResponse(
IndexMonitorResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))
).get()
assertNotEquals(0, searchResponse.hits.hits.size)
client().execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE)
).get()

deleteMonitor(monitorId)
assertIndexNotExists(customQueryIndex + "*")
assertAliasNotExists(customQueryIndex)
}
Expand Down Expand Up @@ -1000,8 +999,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
inputs = listOf(docLevelInput),
triggers = listOf(trigger)
)
// This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
// Create doc with 11 fields
val docPayload: StringBuilder = StringBuilder(1000)
docPayload.append("{")
for (i in 1..10) {
docPayload.append(""" "id$i":$i,""")
Expand Down Expand Up @@ -1060,6 +1059,62 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertTrue(alerts.size == 2)
}

/**
* 1. Create monitor with input source_index with 900 fields in mappings - can fit 1 in queryIndex
* 2. Update monitor and change input source_index to a new one with 900 fields in mappings
* 3. Expect queryIndex rollover resulting in 2 backing indices
* 4. Delete monitor and expect that all backing indices are deleted
* */
fun `test updating monitor no execution queryIndex rolling over`() {
val testSourceIndex1 = "test_source_index1"
val testSourceIndex2 = "test_source_index2"
createIndex(testSourceIndex1, Settings.EMPTY)
createIndex(testSourceIndex2, Settings.EMPTY)
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger)
)
// This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
docPayload.append("{")
for (i in 1..899) {
docPayload.append(""" "id$i":$i,""")
}
docPayload.append("\"test_field\" : \"us-west-2\" }")
// Indexing docs here as an easier means to set index mappings
indexDoc(testSourceIndex1, "1", docPayload.toString())
indexDoc(testSourceIndex2, "1", docPayload.toString())
// Create monitor
var monitorResponse = createMonitor(monitor)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor

// Update monitor and change input
val updatedMonitor = monitor.copy(
inputs = listOf(
DocLevelMonitorInput("description", listOf(testSourceIndex2), listOf(docQuery))
)
)
updateMonitor(updatedMonitor, updatedMonitor.id)
assertFalse(monitorResponse?.id.isNullOrEmpty())

// Expect queryIndex to rollover after setting new source_index with close to limit amount of fields in mappings
var getIndexResponse: GetIndexResponse =
client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get()
assertEquals(2, getIndexResponse.indices.size)

deleteMonitor(updatedMonitor.id)
waitUntil {
getIndexResponse =
client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get()
return@waitUntil getIndexResponse.indices.isEmpty()
}
assertEquals(0, getIndexResponse.indices.size)
}

fun `test queryIndex bwc when index was not an alias`() {
createIndex(DOC_LEVEL_QUERIES_INDEX, Settings.builder().put("index.hidden", true).build())
assertIndexExists(DOC_LEVEL_QUERIES_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentType
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
Expand Down Expand Up @@ -154,6 +155,13 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() {
return client().execute(AlertingActions.INDEX_MONITOR_ACTION_TYPE, request).actionGet()
}

protected fun deleteMonitor(monitorId: String): Boolean {
client().execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE)
).get()
return true
}

protected fun searchAlerts(id: String, indices: String = AlertIndices.ALERT_INDEX, refresh: Boolean = true): List<Alert> {
try {
if (refresh) refreshIndex(indices)
Expand Down

0 comments on commit c688613

Please sign in to comment.