Skip to content

Commit

Permalink
added internal client call detection, need to fix integ tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ronnaksaxena committed Sep 4, 2023
1 parent 9ad89c6 commit e39ff81
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,12 @@ class ResponseInterceptor(
fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String): Long {
// Build search request to find the maximum rollup timestamp <= liveDataStartPoint
val sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC)
// Add the 3 match alls to detect this was an internal client call in the request interceptor
val query = QueryBuilders.boolQuery()
.must(QueryBuilders.rangeQuery(dateTargetField).lte(liveDataStartPoint))
.must(QueryBuilders.matchAllQuery())
.must(QueryBuilders.matchAllQuery())
.must(QueryBuilders.matchAllQuery())
val searchSourceBuilder = SearchSourceBuilder()
.sort(sort)
.query(query)
Expand Down Expand Up @@ -179,14 +183,19 @@ class ResponseInterceptor(
}
val request: ShardSearchRequest = response.shardSearchRequest!!
val oldQuery = request.source().query()
val newQuery = QueryBuilders.boolQuery()
.must(oldQuery)
.must(QueryBuilders.matchAllQuery())
.must(QueryBuilders.matchAllQuery())
.must(QueryBuilders.matchAllQuery())
val (rollupIndices, liveIndices) = getRollupAndLiveIndices(request)
val shardRequestIndex = request.shardId().indexName
val isShardIndexRollup = isRollupIndex(shardRequestIndex, clusterService.state())
// Build search request to find the maximum date in all rollup indices
var sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC)
var searchSourceBuilder = SearchSourceBuilder()
.sort(sort)
.query(oldQuery)
.query(newQuery)
.size(1)
// Need to avoid infinite interceptor loop
val maxRolledDateRequest = SearchRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,34 @@ class RollupInterceptor(
request.source(request.source().changeAggregations(listOf(intervalAgg)))
return
}
// Detects if this was an internal client call to allow a size of 0 on a rollup search
fun requestCalledInInterceptor(request: ShardSearchRequest): Boolean {
val jsonRequest: String = request.source().query().toString()
// Detected dummy field from internal search request
if (jsonRequest.contains(
",\n" +
" {\n" +
" \"match_all\" : {\n" +
" \"boost\" : 1.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"match_all\" : {\n" +
" \"boost\" : 1.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"match_all\" : {\n" +
" \"boost\" : 1.0\n" +
" }\n" +
" }"
)
) {
return true
}

return false
}
@Suppress("SpreadOperator")
override fun <T : TransportRequest> interceptHandler(
action: String,
Expand All @@ -202,11 +229,13 @@ class RollupInterceptor(
}
// Rewrite the request to fit rollup format if not already done previously
if (isRollupIndex && !isReqeustRollupFormat(request)) {
// TODO fix logic to allow response interceptor client calls to have a size of 1
// if (!requestCalledInInterceptor && request.source().size() != 0) {
// throw IllegalArgumentException("Rollup search must have size explicitly set to 0, " +
// "but found ${request.source().size()}")
// }
// TODO fix logic to allow response interceptor client calls to have a size of 1
if (!requestCalledInInterceptor(request) && request.source().size() != 0) {
throw IllegalArgumentException(
"Rollup search must have size explicitly set to 0, " +
"but found ${request.source().size()}"
)
}
rewriteRollupRequest(request, rollupJob!!, concreteRollupIndicesArray)
}
}
Expand Down

0 comments on commit e39ff81

Please sign in to comment.