Skip to content

Commit

Permalink
fix retry issue
Browse files Browse the repository at this point in the history
  • Loading branch information
sbcd90 committed Apr 3, 2024
1 parent 9008367 commit 49b2374
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.opensearch.core.rest.RestStatus
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.node.NodeClosedException
import org.opensearch.transport.ActionNotFoundTransportException
import org.opensearch.transport.ConnectTransportException
import org.opensearch.transport.RemoteTransportException
import org.opensearch.transport.TransportException
Expand Down Expand Up @@ -281,7 +282,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
e is RemoteTransportException &&
(
cause is NodeClosedException ||
cause is CircuitBreakingException
cause is CircuitBreakingException ||
cause is ActionNotFoundTransportException
)
)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ class TransportDocLevelMonitorFanOutAction
var totalDocsSizeInBytesStat = 0L
var docsSizeOfBatchInBytes = 0L
var findingsToTriggeredQueries: Map<String, List<DocLevelQuery>> = mutableMapOf()
// Maps a finding ID to the related document.
private val findingIdToDocSource = mutableMapOf<String, MultiGetItemResponse>()

@Volatile var percQueryMaxNumDocsInMemory: Int = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(settings)
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(settings)
Expand All @@ -164,10 +162,6 @@ class TransportDocLevelMonitorFanOutAction
@Volatile var allowList: List<String> = DestinationSettings.ALLOW_LIST.get(settings)
@Volatile var fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(settings)

/* Contains list of docs source that are held in memory to submit to percolate query against query index.
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) {
percQueryMaxNumDocsInMemory = it
Expand Down Expand Up @@ -227,10 +221,10 @@ class TransportDocLevelMonitorFanOutAction
val queryToDocIds = mutableMapOf<DocLevelQuery, MutableSet<String>>()
val inputRunResults = mutableMapOf<String, MutableSet<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()
val findingIdToDocSource = mutableMapOf<String, MultiGetItemResponse>()
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID

val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
val fieldsToBeQueried = mutableSetOf<String>()
Expand Down Expand Up @@ -265,7 +259,8 @@ class TransportDocLevelMonitorFanOutAction
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried),
shardIds.map { it.id }
shardIds.map { it.id },
transformedDocs
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
Expand All @@ -277,6 +272,7 @@ class TransportDocLevelMonitorFanOutAction
concreteIndicesSeenSoFar,
inputRunResults,
docsToQueries,
transformedDocs
)
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
Expand Down Expand Up @@ -311,6 +307,7 @@ class TransportDocLevelMonitorFanOutAction
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
}
Expand Down Expand Up @@ -357,6 +354,7 @@ class TransportDocLevelMonitorFanOutAction
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
executionId: String,
findingIdToDocSource: MutableMap<String, MultiGetItemResponse>,
workflowRunContext: WorkflowRunContext?
): DocumentLevelTriggerRunResult {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
Expand Down Expand Up @@ -389,7 +387,8 @@ class TransportDocLevelMonitorFanOutAction
if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty())
getDocSources(
findingToDocPairs = findingToDocPairs,
monitor = monitor
monitor = monitor,
findingIdToDocSource = findingIdToDocSource
)

val alerts = mutableListOf<Alert>()
Expand Down Expand Up @@ -674,6 +673,7 @@ class TransportDocLevelMonitorFanOutAction
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
shardList: List<Int>,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
updateLastRunContext: (String, String) -> Unit
) {
for (shardId in shardList) {
Expand Down Expand Up @@ -723,6 +723,7 @@ class TransportDocLevelMonitorFanOutAction
concreteIndices,
inputRunResults,
docsToQueries,
transformedDocs
)
}
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
Expand All @@ -749,6 +750,7 @@ class TransportDocLevelMonitorFanOutAction
concreteIndices,
inputRunResults,
docsToQueries,
transformedDocs
)
}
}
Expand All @@ -761,6 +763,7 @@ class TransportDocLevelMonitorFanOutAction
concreteIndices: List<String>,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>
) {
try {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
Expand Down Expand Up @@ -1042,7 +1045,8 @@ class TransportDocLevelMonitorFanOutAction
*/
private suspend fun getDocSources(
findingToDocPairs: List<Pair<String, String>>,
monitor: Monitor
monitor: Monitor,
findingIdToDocSource: MutableMap<String, MultiGetItemResponse>
) {
val docFieldTags = parseSampleDocTags(monitor.triggers)
val request = MultiGetRequest()
Expand Down

0 comments on commit 49b2374

Please sign in to comment.