Skip to content

Commit

Permalink
Multi rollup ndx search (opensearch-project#453)
Browse files Browse the repository at this point in the history
* added support for searching multiple rollup indices with same mapping

Signed-off-by: Petar Dzepina <[email protected]>

* fixed failing rollupInterceptorIT  test

Signed-off-by: Petar Dzepina <[email protected]>

* reverted old error messages

Signed-off-by: Petar Dzepina <[email protected]>

* reverted checking for matching jobs on whole set instead of job by job; Added picking rollup job deterministic

Signed-off-by: petar.dzepina <[email protected]>

* fixed sorting

Signed-off-by: petar.dzepina <[email protected]>

* added ITs for multi rollup index search

Signed-off-by: petardz <[email protected]>

* added ITs for multi rollup index search#2

Signed-off-by: petardz <[email protected]>

* detekt fixes

Signed-off-by: petardz <[email protected]>

* changed index names and rollup job

Signed-off-by: petardz <[email protected]>

* detekt fix

Signed-off-by: petardz <[email protected]>

* empty commit to trigger test pipeline

Signed-off-by: petardz <[email protected]>

Co-authored-by: petar.dzepina <[email protected]>
  • Loading branch information
petardz and petardzepina authored Aug 9, 2022
1 parent b8a77d4 commit af5b366
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
}

protected fun extractFailuresFromSearchResponse(searchResponse: Response): List<Map<String, String>?>? {
val shards = searchResponse.asMap()["_shards"] as Map<String, ArrayList<Map<String, Any>>>
assertNotNull(shards)
val failures = shards["failures"]
assertNotNull(failures)
return failures?.let {
val result: ArrayList<Map<String, String>?>? = ArrayList()
for (failure in it) {
result?.add((failure as Map<String, Map<String, String>>)["reason"])
}
return result
}
}

companion object {
internal interface IProxy {
val version: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,292 @@ class RollupInterceptorIT : RollupRestTestCase() {
trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"]
)
}

fun `test rollup search multiple target indices successfully`() {
val sourceIndex1 = "source_rollup_search_multi_jobs_1"
val sourceIndex2 = "source_rollup_search_multi_jobs_2"
generateNYCTaxiData(sourceIndex1)
generateNYCTaxiData(sourceIndex2)
val targetIndex1 = "target_rollup_search_multi_jobs1"
val targetIndex2 = "target_rollup_search_multi_jobs2"
val rollupHourly1 = Rollup(
id = "hourly_basic_term_query_rollup_search_multi_1",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex1,
targetIndex = targetIndex1,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupHourly1)

waitFor {
val rollupJob = getRollup(rollupId = rollupHourly1.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

val rollupHourly2 = Rollup(
id = "hourly_basic_term_query_rollup_search_multi_2",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex2,
targetIndex = targetIndex2,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupHourly2)

waitFor {
val rollupJob = getRollup(rollupId = rollupHourly2.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

val req = """
{
"size": 0,
"query": {
"term": { "RatecodeID": 1 }
},
"aggs": {
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
"max_passenger_count": { "max": { "field": "passenger_count" } },
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
}
}
""".trimIndent()
val rawRes1 = client().makeRequest("POST", "/$sourceIndex1/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes1.restStatus() == RestStatus.OK)
val rawRes2 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes2.restStatus() == RestStatus.OK)
val rollupResMulti = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupResMulti.restStatus() == RestStatus.OK)
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rollupAggResMulti = rollupResMulti.asMap()["aggregations"] as Map<String, Map<String, Any>>

// When the cluster setting to search all jobs is off, the aggregations will be the same for searching a single job as for searching both
assertEquals(
"Searching single rollup job and rollup target index did not return the same max results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"]
)
assertEquals(
"Searching single rollup job and rollup target index did not return the same sum results",
rawAgg1Res.getValue("sum_passenger_count")["value"], rollupAggResMulti.getValue("sum_passenger_count")["value"]
)
val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int
assertEquals(
"Searching single rollup job and rollup target index did not return the same value count results",
rawAgg1Res.getValue("value_count_passenger_count")["value"], rollupAggResMulti.getValue("value_count_passenger_count")["value"]
)

val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double
updateSearchAllJobsClusterSetting(true)

val rollupResAll = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupResAll.restStatus() == RestStatus.OK)
val rollupAggResAll = rollupResAll.asMap()["aggregations"] as Map<String, Map<String, Any>>

// With search all jobs setting on, the sum, and value_count will now be equal to the sum of the single job search results
assertEquals(
"Searching single rollup job and rollup target index did not return the same sum results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResAll.getValue("max_passenger_count")["value"]
)
assertEquals(
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
trueAggSum, rollupAggResAll.getValue("sum_passenger_count")["value"]
)
assertEquals(
"Searching rollup target index did not return the value count for all of the rollup jobs on the index",
trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"]
)
}

fun `test rollup search multiple target indices failed`() {
val sourceIndex1 = "source_rollup_search_multi_failed_1"
val sourceIndex2 = "source_rollup_search_multi_failed_2"
generateNYCTaxiData(sourceIndex1)
generateNYCTaxiData(sourceIndex2)
val targetIndex1 = "target_rollup_search_multi_failed_jobs1"
val targetIndex2 = "target_rollup_search_multi_failed_jobs2"
val rollupJob1 = Rollup(
id = "hourly_basic_term_query_rollup_search_failed_1",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex1,
targetIndex = targetIndex1,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("VendorID", "VendorID"),
),
metrics = listOf(
RollupMetrics(
sourceField = "fare_amount", targetField = "fare_amount",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "improvement_surcharge", targetField = "improvement_surcharge", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupJob1)

waitFor {
val rollupJob = getRollup(rollupId = rollupJob1.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

val rollupJob2 = Rollup(
id = "hourly_basic_term_query_rollup_search_failed_2",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = sourceIndex2,
targetIndex = targetIndex2,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_dropoff_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupJob2)

waitFor {
val rollupJob = getRollup(rollupId = rollupJob2.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

val req = """
{
"size": 0,
"query": {
"term": { "RatecodeID": 1 }
},
"aggs": {
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
"max_passenger_count": { "max": { "field": "passenger_count" } },
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
}
}
""".trimIndent()
// Search 1 non-rollup index and 1 rollup
val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResult1.restStatus() == RestStatus.OK)
val failures = extractFailuresFromSearchResponse(searchResult1)
assertNotNull(failures)
assertEquals(1, failures?.size)
assertEquals(
"Searching multiple indices where one is rollup and other is not, didn't return failure",
"illegal_argument_exception", failures?.get(0)?.get("type") ?: "Didn't find failure type in search response"

)
assertEquals(
"Searching multiple indices where one is rollup and other is not, didn't return failure",
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response"
)

// Search 2 rollups with different mappings
try {
client().makeRequest(
"POST",
"/$targetIndex1,$targetIndex2/_search",
emptyMap(),
StringEntity(req, ContentType.APPLICATION_JSON)
)
} catch (e: ResponseException) {
assertEquals(
"Searching multiple rollup indices which weren't created by same rollup job, didn't return failure",
"Could not find a rollup job that can answer this query because [missing field RatecodeID, missing field passenger_count]",
(e.response.asMap() as Map<String, Map<String, Map<String, String>>>)["error"]!!["caused_by"]!!["reason"]
)
assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}

0 comments on commit af5b366

Please sign in to comment.