Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi rollup ndx search #453

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
}

protected fun extractFailuresFromSearchResponse(searchResponse: Response): List<Map<String, String>?>? {
val shards = searchResponse.asMap()["_shards"] as Map<String, ArrayList<Map<String, Any>>>
assertNotNull(shards)
val failures = shards["failures"]
assertNotNull(failures)
return failures?.let {
val result: ArrayList<Map<String, String>?>? = ArrayList()
for (failure in it) {
result?.add((failure as Map<String, Map<String, String>>)["reason"])
}
return result
}
}

companion object {
internal interface IProxy {
val version: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,292 @@ class RollupInterceptorIT : RollupRestTestCase() {
trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"]
)
}

fun `test rollup search multiple target indices successfully`() {
val sourceIndex1 = "source_rollup_search_multi_jobs_1"
val sourceIndex2 = "source_rollup_search_multi_jobs_2"
generateNYCTaxiData(sourceIndex1)
generateNYCTaxiData(sourceIndex2)
val targetIndex1 = "target_rollup_search_multi_jobs1"
val targetIndex2 = "target_rollup_search_multi_jobs2"
val rollupHourly1 = Rollup(
id = "hourly_basic_term_query_rollup_search_multi_1",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex1,
targetIndex = targetIndex1,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupHourly1)

waitFor {
val rollupJob = getRollup(rollupId = rollupHourly1.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

val rollupHourly2 = Rollup(
id = "hourly_basic_term_query_rollup_search_multi_2",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex2,
targetIndex = targetIndex2,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupHourly2)

waitFor {
val rollupJob = getRollup(rollupId = rollupHourly2.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

val req = """
{
"size": 0,
"query": {
"term": { "RatecodeID": 1 }
},
"aggs": {
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
"max_passenger_count": { "max": { "field": "passenger_count" } },
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
}
}
""".trimIndent()
val rawRes1 = client().makeRequest("POST", "/$sourceIndex1/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes1.restStatus() == RestStatus.OK)
val rawRes2 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes2.restStatus() == RestStatus.OK)
val rollupResMulti = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupResMulti.restStatus() == RestStatus.OK)
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rollupAggResMulti = rollupResMulti.asMap()["aggregations"] as Map<String, Map<String, Any>>

// When the cluster setting to search all jobs is off, the aggregations will be the same for searching a single job as for searching both
assertEquals(
"Searching single rollup job and rollup target index did not return the same max results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"]
)
assertEquals(
"Searching single rollup job and rollup target index did not return the same sum results",
rawAgg1Res.getValue("sum_passenger_count")["value"], rollupAggResMulti.getValue("sum_passenger_count")["value"]
)
val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int
assertEquals(
"Searching single rollup job and rollup target index did not return the same value count results",
rawAgg1Res.getValue("value_count_passenger_count")["value"], rollupAggResMulti.getValue("value_count_passenger_count")["value"]
)

val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double
updateSearchAllJobsClusterSetting(true)

val rollupResAll = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupResAll.restStatus() == RestStatus.OK)
val rollupAggResAll = rollupResAll.asMap()["aggregations"] as Map<String, Map<String, Any>>

// With search all jobs setting on, the sum, and value_count will now be equal to the sum of the single job search results
assertEquals(
"Searching single rollup job and rollup target index did not return the same sum results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResAll.getValue("max_passenger_count")["value"]
)
assertEquals(
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
trueAggSum, rollupAggResAll.getValue("sum_passenger_count")["value"]
)
assertEquals(
"Searching rollup target index did not return the value count for all of the rollup jobs on the index",
trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"]
)
}

fun `test rollup search multiple target indices failed`() {
val sourceIndex1 = "source_rollup_search_multi_failed_1"
val sourceIndex2 = "source_rollup_search_multi_failed_2"
generateNYCTaxiData(sourceIndex1)
generateNYCTaxiData(sourceIndex2)
val targetIndex1 = "target_rollup_search_multi_failed_jobs1"
val targetIndex2 = "target_rollup_search_multi_failed_jobs2"
val rollupJob1 = Rollup(
id = "hourly_basic_term_query_rollup_search_failed_1",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex1,
targetIndex = targetIndex1,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("VendorID", "VendorID"),
),
metrics = listOf(
RollupMetrics(
sourceField = "fare_amount", targetField = "fare_amount",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "improvement_surcharge", targetField = "improvement_surcharge", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupJob1)

waitFor {
val rollupJob = getRollup(rollupId = rollupJob1.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

val rollupJob2 = Rollup(
id = "hourly_basic_term_query_rollup_search_failed_2",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex2,
targetIndex = targetIndex2,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_dropoff_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupJob2)

waitFor {
val rollupJob = getRollup(rollupId = rollupJob2.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

val req = """
{
"size": 0,
"query": {
"term": { "RatecodeID": 1 }
},
"aggs": {
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
"max_passenger_count": { "max": { "field": "passenger_count" } },
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
}
}
""".trimIndent()
// Search 1 non-rollup index and 1 rollup
val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResult1.restStatus() == RestStatus.OK)
val failures = extractFailuresFromSearchResponse(searchResult1)
assertNotNull(failures)
assertEquals(1, failures?.size)
assertEquals(
"Searching multiple indices where one is rollup and other is not, didn't return failure",
"illegal_argument_exception", failures?.get(0)?.get("type") ?: "Didn't find failure type in search response"

)
assertEquals(
"Searching multiple indices where one is rollup and other is not, didn't return failure",
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response"
)

// Search 2 rollups with different mappings
try {
client().makeRequest(
"POST",
"/$targetIndex1,$targetIndex2/_search",
emptyMap(),
StringEntity(req, ContentType.APPLICATION_JSON)
)
} catch (e: ResponseException) {
assertEquals(
"Searching multiple rollup indices which weren't created by same rollup job, didn't return failure",
"Could not find a rollup job that can answer this query because [missing field RatecodeID, missing field passenger_count]",
(e.response.asMap() as Map<String, Map<String, Map<String, String>>>)["error"]!!["caused_by"]!!["reason"]
)
assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}