Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search Rollup and Live Data Feature: Base case (No overlapping data) #898

Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ class RollupInterceptor(
val indices = request.indices().map { it.toString() }.toTypedArray()
val concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
val concreteRolledUpIndexNames = mutableListOf<String>()
for (indexName in concreteIndices) {
if (isRollupIndex(indexName, clusterService.state())) {
concreteRolledUpIndexNames.add(indexName)
}
}
val filteredConcreteIndices = concreteRolledUpIndexNames.toTypedArray()
Comment on lines +95 to +101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the shard is a rollup index, you do a filtering and keep only the rollup indexes in the request. What about the other path, when the shard is a live index, do you also need a filtering?

// To extract fields from QueryStringQueryBuilder we need concrete source index name.
val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0)
?: throw IllegalArgumentException("No rollup job associated with target_index")
Expand All @@ -102,7 +109,7 @@ class RollupInterceptor(
val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories)
val fieldMappings = queryFieldMappings + aggregationFieldMappings

val allMatchingRollupJobs = validateIndicies(concreteIndices, fieldMappings)
val allMatchingRollupJobs = validateIndicies(filteredConcreteIndices, fieldMappings)

// only rebuild if there is necessity to rebuild
if (fieldMappings.isNotEmpty()) {
Expand Down Expand Up @@ -142,7 +149,7 @@ class RollupInterceptor(
var allMatchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>> = mapOf()
for (concreteIndex in concreteIndices) {
val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs()
?: throw IllegalArgumentException("Not all indices have rollup job")
?: throw IllegalArgumentException("Not all indices have rollup job, missing on $concreteIndex")

val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1876,4 +1876,82 @@ class RollupInterceptorIT : RollupRestTestCase() {
Assert.assertTrue(e.message!!.contains("Can't parse query_string query without sourceIndex mappings!"))
}
}
fun `test search a live index and rollup index with no overlap`() {
generateNYCTaxiData("source_rollup_search")
val rollup = Rollup(
id = "basic_term_query_rollup_search",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_rollup_search",
targetIndex = "target_rollup_search",
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

// Delete values from live index
var deleteResponse = client().makeRequest(
"POST",
"source_rollup_search/_delete_by_query",
mapOf("refresh" to "true"),
StringEntity("""{"query": {"match_all": {}}}""", ContentType.APPLICATION_JSON)
)
Comment on lines +1925 to +1930
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all the data from source index gets deleted, then it seems to me you are just search the rollup index later instead of both indexes

assertTrue(deleteResponse.restStatus() == RestStatus.OK)
// Term query
var req = """
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"sum_passenger_count": {
"sum": {
"field": "passenger_count"
}
}
}
}
""".trimIndent()
var searchResponse = client().makeRequest("POST", "/target_rollup_search,source_rollup_search/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResponse.restStatus() == RestStatus.OK)
var responseAggs = searchResponse.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"Aggregation from searching both indices is wrong",
9024.0,
responseAggs.getValue("sum_passenger_count")["value"]
)
}
}