Skip to content

Commit

Permalink
Update TransportGetFindingsSearchAction.kt
Browse files Browse the repository at this point in the history
  • Loading branch information
Annie Lee committed Mar 30, 2022
1 parent 2e58bc2 commit 4c8f68f
Showing 1 changed file with 30 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
package org.opensearch.alerting.transport

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
Expand All @@ -30,8 +27,6 @@ import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
Expand Down Expand Up @@ -131,25 +126,23 @@ class TransportGetFindingsSearchAction @Inject constructor(
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
val totalFindingCount = response.hits.totalHits?.value?.toInt()
val mgetRequest = MultiGetRequest()
val findingsWithDocs = mutableListOf<FindingWithDocs>()
for (hit in response.hits) {
val id = hit.id
val xcp = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val finding = Finding.parse(xcp, id)
val doc_ids = finding.relatedDocId.split(",").toTypedArray()
val docs = mutableListOf<FindingDocument>()
for (doc_id in doc_ids) {
// TODO: Add the document to docs after searching
val findingDocument = searchDocument(doc_id, finding.index, actionListener)
val documentIds = finding.relatedDocId.split(",").toTypedArray()
// Add getRequests to mget request
documentIds.values.forEach {
// TODO: check if we want to add individual get document request, or use documentIds array for a single finding related_docs
docId ->
mgetMetadataReq.add(MultiGetRequest.Item(sourceIndex, docId))
}
val findingWithDoc = FindingWithDocs(finding, docs)
findingsWithDocs.add(findingWithDoc)
// TODO: remove debug log
log.info("findingWithDoc: $findingWithDoc")
}
actionListener.onResponse(GetFindingsSearchResponse(RestStatus.OK, totalFindingCount, findingsWithDocs))
searchDocument(mgetRequest, totalFindingCount, actionListener)
}

override fun onFailure(t: Exception) {
Expand All @@ -160,50 +153,32 @@ class TransportGetFindingsSearchAction @Inject constructor(
}

fun searchDocument(
documentId: String,
sourceIndex: String,
mgetRequest: MultiGetRequest,
totalFindingCount: Int,
actionListener: ActionListener<GetFindingsSearchResponse>
): FindingDocument? {
val getRequest = GetRequest(sourceIndex, documentId)
var findingDocument: FindingDocument? = null
client.threadPool().threadContext.stashContext().use {
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(
"Document $documentId not found from source index $sourceIndex.",
RestStatus.NOT_FOUND
)
)
)
return
}

if (!response.isSourceEmpty) {
val xcp = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.toString())
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
findingDocument = FindingDocument.parse(xcp)
// TODO: remove debug log
log.info("Response not empty")
val docStr = findingDocument?.toXContent(
XContentBuilder.builder(XContentType.JSON.xContent()),
ToXContent.EMPTY_PARAMS
).string()
log.info("findingDocument: $docStr")
}
client.multiGet(
mgetRequest,
object : ActionListener<MultiGetResponse> {
override fun onResponse(response: MultiGetResponse) {
// val findingsWithDocs: Map<Finding, FindingDocument> =
response.responses.associate {
// TODO: REMOVE DEBUG LOG
log.info("response: $response")
val xcp = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.toString())
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
findingDocument = FindingDocument.parse(xcp)
// TODO: Parse the searched documents and add to map of findings, need to associate original finding id to response
}
// TODO: Form the response here with the map/list of findings
actionListener.onResponse(GetFindingsSearchResponse(RestStatus.OK, totalFindingCount, findingsWithDocs))
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
)
}
return findingDocument
}
)
}
}

0 comments on commit 4c8f68f

Please sign in to comment.