diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt index 5ab3f18ad..29d0b0f77 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt @@ -4,15 +4,13 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import org.apache.logging.log4j.LogManager import org.opensearch.action.DocWriteRequest -import org.opensearch.action.admin.indices.refresh.RefreshAction -import org.opensearch.action.admin.indices.refresh.RefreshRequest -import org.opensearch.action.admin.indices.refresh.RefreshResponse import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.GroupedActionListener +import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction import org.opensearch.client.Client @@ -87,13 +85,18 @@ class ThreatIntelDetectionService( } return iocsInData } catch (e: Exception) { - log.error("TI_DEBUG: Failed to extract IoC's from the queryable data to scan against threat intel") + log.error("TI_DEBUG: Failed to extract IoC's from the queryable data to scan against threat intel", e) return mutableSetOf() } } private suspend fun searchTermsOnIndices(monitor: Monitor, iocs: List, threatIntelIndices: List) { val iocSubLists = iocs.chunkSublists(BATCH_SIZE) + var i = iocSubLists.size + for (iocSubList in iocSubLists) { + if (iocSubList.isEmpty()) i-- + } + // TODO get unique values from list first val responses: Collection = suspendCoroutine { cont -> // todo implement a listener that tolerates multiple exceptions @@ -111,7 +114,7 @@ class ThreatIntelDetectionService( cont.resumeWithException(e) } }, - iocSubLists.size + i ) // chunk all iocs from queryable data and perform terms query for matches // if matched return only the ioc's that matched and not the entire document @@ -125,6 +128,7 @@ class ThreatIntelDetectionService( client.search(searchRequest, groupedListener) } } + log.error("num responses expected in groupedlistener: $i") val iocMatches = mutableSetOf() for (response in responses) { log.error("TI_DEBUG search response took: ${response.took} millis") @@ -147,15 +151,12 @@ class ThreatIntelDetectionService( } suspend fun createFindings(monitor: Monitor, iocMatches: List) { - val findingDocPairs = mutableListOf>() - val findings = mutableListOf() val indexRequests = mutableListOf() - val findingsToTriggeredQueries = mutableMapOf>() for (iocMatch in iocMatches) { val finding = Finding( id = "ioc" + UUID.randomUUID().toString(), - relatedDocIds = listOf(iocMatch), + relatedDocIds = listOf("ioc:$iocMatch"), correlatedDocIds = listOf(), monitorId = monitor.id, monitorName = monitor.name, @@ -167,34 +168,39 @@ class ThreatIntelDetectionService( val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) .string() - log.debug("Findings: $findingStr") indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) .opType(DocWriteRequest.OpType.CREATE) } - bulkIndexFindings(monitor, indexRequests) + if(indexRequests.isNotEmpty()) + bulkIndexFindings(monitor, indexRequests) } private suspend fun bulkIndexFindings( monitor: Monitor, indexRequests: List, ) { - indexRequests.chunked(1000).forEach { batch -> - val bulkResponse: BulkResponse = client.suspendUntil { - bulk(BulkRequest().add(batch), it) - } - if (bulkResponse.hasFailures()) { - bulkResponse.items.forEach { item -> - if (item.isFailed) { - log.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + try { + for (batch in indexRequests) { + val bulkResponse: BulkResponse = client.suspendUntil { + val bulkRequest = BulkRequest().add(batch) + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + bulk(bulkRequest, it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + log.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}] with error ${item.failureMessage}") + } } + } else { + log.error("TI_DEBUG: Monitor ${monitor.id}: [${bulkResponse.items.size}] findings successfully indexed.") } - } else { - log.debug("[${bulkResponse.items.size}] All findings successfully indexed.") } + log.error("TI_DEBUG: completed index requests iteration") + } catch (e: Exception) { + log.error("TI_DEBUG: bulk index findings failed", e) } - val res: RefreshResponse = - client.suspendUntil { client.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex)) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 11a03dcab..9e7fc7da3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -806,12 +806,14 @@ class TransportDocLevelMonitorFanOutAction } } totalDocsQueriedStat += transformedDocs.size.toLong() - if ((monitor.inputs[0] as DocLevelMonitorInput).iocFieldNames.isNotEmpty()) + if ((monitor.inputs[0] as DocLevelMonitorInput).iocFieldNames.isNotEmpty()) { threatIntelDetectionService.scanDataAgainstThreatIntel( monitor, listOf(".opensearch-sap-threat-intel*"), searchHitsBeingProcessed ) + log.error("TI_DEBUG: completed ioc scan for monitor ${monitor.id}") + } } finally { transformedDocs.clear() docsSizeOfBatchInBytes = 0