Skip to content

Commit

Permalink
fix stats logging
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 19, 2024
1 parent 586459e commit 5bf9f72
Showing 1 changed file with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docsSizeInBytes = AtomicLong(0)
val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
val queryingStartTimeMillis = System.currentTimeMillis()
docLevelMonitorInput.indices.forEach { indexName ->
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
Expand Down Expand Up @@ -212,7 +211,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
concreteIndexName,
nonPercolateSearchesTimeTaken
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
Expand Down Expand Up @@ -273,7 +273,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
updatedIndexNames,
concreteIndicesSeenSoFar,
inputRunResults,
docsToQueries
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
Expand Down Expand Up @@ -346,6 +348,22 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
e
)
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
} finally {
logger.debug(
"PERF_DEBUG_STATS: Monitor ${monitor.id} " +
"Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken"
)
logger.debug(
"PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}",
monitor.id,
percolateQueriesTimeTaken
)
logger.debug(
"PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}",
monitor.id,
docTransformTimeTaken
)
logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueried)
}
}

Expand Down Expand Up @@ -586,13 +604,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun updateLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String
index: String,
nonPercolateSearchesTimeTaken: AtomicLong
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken)
updatedLastRunContext[shard] = maxSeqNo.toString()
}
return updatedLastRunContext
Expand Down Expand Up @@ -629,7 +648,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand All @@ -645,6 +664,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
if (response.hits.hits.isEmpty())
return -1L

Expand Down Expand Up @@ -695,8 +715,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo,
maxSeqNo,
null,
docIds
docIds,
nonPercolateSearchesTimeTaken
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
Expand All @@ -707,6 +729,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsSizeInBytes
)
)
docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime)
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
Expand All @@ -727,7 +750,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
}
Expand All @@ -752,6 +777,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices: List<String>,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong
) {
try {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
Expand All @@ -760,7 +787,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitor,
monitorMetadata,
concreteIndices,
monitorInputIndices
monitorInputIndices,
percolateQueriesTimeTaken
)

percolateQueryResponseHits.forEach { hit ->
Expand All @@ -774,7 +802,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
} finally { // no catch block because exception is caught and handled in runMonitor() class
totalDocsQueried.getAndAdd(transformedDocs.size.toLong())
} finally {
transformedDocs.clear()
docsSizeInBytes.set(0)
}
Expand All @@ -790,7 +819,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null
docIds: List<String>? = null,
nonPercolateSearchesTimeTaken: AtomicLong,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand Down Expand Up @@ -818,8 +848,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.preference(Preference.PRIMARY_FIRST.type())
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
return response.hits
}

Expand All @@ -831,6 +863,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata: MonitorMetadata,
concreteIndices: List<String>,
monitorInputIndices: List<String>,
percolateQueriesTimeTaken: AtomicLong,
): SearchHits {
val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList())
val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices))
Expand Down Expand Up @@ -882,6 +915,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
"Response status is ${response.status()}"
)
}
logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
percolateQueriesTimeTaken.getAndAdd(response.took.millis)
return response.hits
}
/** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/
Expand Down

0 comments on commit 5bf9f72

Please sign in to comment.