Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testrollupsearch #931

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0ea7f53
added response interceptor
ronnaksaxena Aug 18, 2023
4e93b1f
Base case: Query Live and Rollup data with no overlap
ronnaksaxena Aug 18, 2023
7fd394a
finished base case and added integ test
ronnaksaxena Aug 18, 2023
f023a39
added to response interceptor
ronnaksaxena Aug 22, 2023
4aa4202
can rewrite request to bucket pipeline
ronnaksaxena Aug 22, 2023
c3b5477
trying to rewrite aggregations in a helper function
ronnaksaxena Aug 23, 2023
5d20241
able to create new aggreations but getting shardIndex is not set error
ronnaksaxena Aug 23, 2023
743263b
Can find start and end times for rollup and live index
ronnaksaxena Aug 23, 2023
1f07f21
Handles overlap between 1 live index and 1 rollup index for sum aggre…
ronnaksaxena Aug 24, 2023
b4dbc26
added min max aggregations and fixed intersection time calculation
ronnaksaxena Aug 24, 2023
1ee93c6
changed variable name in computeAggregationsWithoutOverlap
ronnaksaxena Aug 24, 2023
4203a60
Added integ tests for nonoverlapping case
ronnaksaxena Aug 28, 2023
72e0aad
added avg and value count aggregation
ronnaksaxena Aug 29, 2023
9b977d2
fixed ktlint and integ test
ronnaksaxena Aug 29, 2023
8ed31e7
changed test and build workflow
ronnaksaxena Aug 29, 2023
f0d364f
added integ test for multiple live indices
ronnaksaxena Aug 30, 2023
b52fbdc
added test case for alias live indices
ronnaksaxena Aug 30, 2023
820dc24
cleaned up code and moved functions to utils file
ronnaksaxena Aug 30, 2023
ef7682f
fixed detekt errors
ronnaksaxena Aug 30, 2023
e9fa46c
fixed ktlint error :/'
ronnaksaxena Aug 30, 2023
c8d8790
Can run all integ tests at once now
ronnaksaxena Aug 31, 2023
1e3294f
removed DateTimeFromatter
ronnaksaxena Aug 31, 2023
1a7da76
fixed inf interceptor loop, need to pass RollupInterceptorIT.test rol…
ronnaksaxena Sep 5, 2023
b145ecf
passes all integ tests
ronnaksaxena Sep 5, 2023
a9c6369
fixed detekt errors
ronnaksaxena Sep 5, 2023
495ee4d
deleted rest test
ronnaksaxena Sep 6, 2023
4dd4096
added test back
ronnaksaxena Sep 6, 2023
d94fbbd
trying a new workflow build
ronnaksaxena Sep 6, 2023
aeeccae
Merge branch 'case2' of https://github.com/ronnaksaxena/index-managem…
ronnaksaxena Sep 6, 2023
4ef546b
added stars to worklow files
ronnaksaxena Sep 6, 2023
bf04e32
added unit test
ronnaksaxena Sep 7, 2023
d618e90
resolved some PR comments from Bowen
ronnaksaxena Sep 7, 2023
9c567ee
resolved more comments on my PR
ronnaksaxena Sep 7, 2023
79f68e5
removed stars from workflows
ronnaksaxena Sep 7, 2023
768a2bc
testing time of alias test case
ronnaksaxena Sep 7, 2023
fb86758
boring, took too long
ronnaksaxena Sep 8, 2023
17df156
commented out last 2 tests
ronnaksaxena Sep 8, 2023
211f5e6
uncomment one test
bowenlan-amzn Sep 8, 2023
62ef04d
Only run multinode workflow
bowenlan-amzn Sep 8, 2023
8168907
Uncomment alias test
bowenlan-amzn Sep 8, 2023
78c53f4
Enable trace log
bowenlan-amzn Sep 8, 2023
d4855ae
Enable trace log
bowenlan-amzn Sep 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Multi node test workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
Expand All @@ -22,7 +22,7 @@ jobs:
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run integration tests with multi node config
run: ./gradlew integTest -PnumNodes=3
run: ./gradlew integTest -Dtests.class="org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptorIT"
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ import org.opensearch.indexmanagement.rollup.action.start.TransportStartRollupAc
import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction
import org.opensearch.indexmanagement.rollup.action.stop.TransportStopRollupAction
import org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter
import org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptor
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
Expand Down Expand Up @@ -208,6 +209,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
lateinit var clusterService: ClusterService
lateinit var indexNameExpressionResolver: IndexNameExpressionResolver
lateinit var rollupInterceptor: RollupInterceptor
lateinit var responseInterceptor: ResponseInterceptor
lateinit var fieldCapsFilter: FieldCapsFilter
lateinit var indexMetadataProvider: IndexMetadataProvider
private val indexMetadataServices: MutableList<Map<String, IndexMetadataService>> = mutableListOf()
Expand Down Expand Up @@ -391,6 +393,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
environment
)
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
responseInterceptor = ResponseInterceptor(clusterService, settings, indexNameExpressionResolver, client)
val jvmService = JvmService(environment.settings())
val transformRunner = TransformRunner.initialize(
client,
Expand Down Expand Up @@ -612,7 +615,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
}

override fun getTransportInterceptors(namedWriteableRegistry: NamedWriteableRegistry, threadContext: ThreadContext): List<TransportInterceptor> {
return listOf(rollupInterceptor)
return listOf(rollupInterceptor, responseInterceptor)
}

override fun getActionFilters(): List<ActionFilter> {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuild
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.ZonedDateTime

const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time"
const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis"
Expand Down Expand Up @@ -464,3 +467,83 @@ fun parseRollup(response: GetResponse, xContentRegistry: NamedXContentRegistry =

return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, Rollup.Companion::parse)
}
// Returns a SearchSourceBuilder with different aggregations but the rest of the properties are the same
@Suppress("ComplexMethod")
fun SearchSourceBuilder.changeAggregations(aggregationBuilderCollection: Collection<AggregationBuilder>): SearchSourceBuilder {
val ssb = SearchSourceBuilder()
aggregationBuilderCollection.forEach { ssb.aggregation(it) }
if (this.explain() != null) ssb.explain(this.explain())
if (this.ext() != null) ssb.ext(this.ext())
ssb.fetchSource(this.fetchSource())
this.docValueFields()?.forEach { ssb.docValueField(it.field, it.format) }
ssb.storedFields(this.storedFields())
if (this.from() >= 0) ssb.from(this.from())
ssb.highlighter(this.highlighter())
this.indexBoosts()?.forEach { ssb.indexBoost(it.index, it.boost) }
if (this.minScore() != null) ssb.minScore(this.minScore())
if (this.postFilter() != null) ssb.postFilter(this.postFilter())
ssb.profile(this.profile())
if (this.query() != null) ssb.query(this.query())
this.rescores()?.forEach { ssb.addRescorer(it) }
this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) }
if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter())
if (this.slice() != null) ssb.slice(this.slice())
if (this.size() >= 0) ssb.size(this.size())
this.sorts()?.forEach { ssb.sort(it) }
if (this.stats() != null) ssb.stats(this.stats())
if (this.suggest() != null) ssb.suggest(this.suggest())
if (this.terminateAfter() >= 0) ssb.terminateAfter(this.terminateAfter())
if (this.timeout() != null) ssb.timeout(this.timeout())
ssb.trackScores(this.trackScores())
this.trackTotalHitsUpTo()?.let { ssb.trackTotalHitsUpTo(it) }
if (this.version() != null) ssb.version(this.version())
if (this.seqNoAndPrimaryTerm() != null) ssb.seqNoAndPrimaryTerm(this.seqNoAndPrimaryTerm())
if (this.collapse() != null) ssb.collapse(this.collapse())
return ssb
}
@Suppress("MagicNumber")
fun convertDateStringToEpochMillis(dateString: String): Long {
val parts = dateString.split(" ")
require(parts.size == 2) { "Date in was not correct format" }
val dateParts = parts[0].split("-")
val timeParts = parts[1].split(":")

require((dateParts.size == 3 && timeParts.size == 3)) { "Date in was not correct format" }
val year = dateParts[0].toInt()
val month = dateParts[1].toInt()
val day = dateParts[2].toInt()

val hour = timeParts[0].toInt()
val minute = timeParts[1].toInt()
val second = timeParts[2].toInt()

val localDateTime = LocalDateTime.of(year, month, day, hour, minute, second)
val instant = localDateTime.toInstant(ZoneOffset.UTC)
return instant.toEpochMilli()
}
@Suppress("MagicNumber")
fun convertFixedIntervalStringToMs(fixedInterval: String): Long {
// Possible types are ms, s, m, h, d
val regex = """(\d+)([a-zA-Z]+)""".toRegex()
val matchResult = regex.find(fixedInterval)
?: throw IllegalArgumentException("Invalid interval format: $fixedInterval")

val numericValue = matchResult.groupValues[1].toLong()
val intervalType = matchResult.groupValues[2]

val milliseconds = when (intervalType) {
"ms" -> numericValue
"s" -> numericValue * 1000L
"m" -> numericValue * 60 * 1000L
"h" -> numericValue * 60 * 60 * 1000L
"d" -> numericValue * 24 * 60 * 60 * 1000L
"w" -> numericValue * 7 * 24 * 60 * 60 * 1000L
else -> throw IllegalArgumentException("Unsupported interval type: $intervalType")
}

return milliseconds
}

fun zonedDateTimeToMillis(zonedDateTime: ZonedDateTime): Long {
return zonedDateTime.toInstant().toEpochMilli()
}
Loading