Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollup Search API : Searching both historical rollup and non-rollup data #901

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0ea7f53
added response interceptor
ronnaksaxena Aug 18, 2023
4e93b1f
Base case: Query Live and Rollup data with no overlap
ronnaksaxena Aug 18, 2023
7fd394a
finished base case and added integ test
ronnaksaxena Aug 18, 2023
f023a39
added to response interceptor
ronnaksaxena Aug 22, 2023
4aa4202
can rewrite request to bucket pipeline
ronnaksaxena Aug 22, 2023
c3b5477
trying to rewrite aggregations in a helper function
ronnaksaxena Aug 23, 2023
5d20241
able to create new aggreations but getting shardIndex is not set error
ronnaksaxena Aug 23, 2023
743263b
Can find start and end times for rollup and live index
ronnaksaxena Aug 23, 2023
1f07f21
Handles overlap between 1 live index and 1 rollup index for sum aggre…
ronnaksaxena Aug 24, 2023
b4dbc26
added min max aggregations and fixed intersection time calculation
ronnaksaxena Aug 24, 2023
1ee93c6
changed variable name in computeAggregationsWithoutOverlap
ronnaksaxena Aug 24, 2023
4203a60
Added integ tests for nonoverlapping case
ronnaksaxena Aug 28, 2023
72e0aad
added avg and value count aggregation
ronnaksaxena Aug 29, 2023
9b977d2
fixed ktlint and integ test
ronnaksaxena Aug 29, 2023
8ed31e7
changed test and build workflow
ronnaksaxena Aug 29, 2023
f0d364f
added integ test for multiple live indices
ronnaksaxena Aug 30, 2023
b52fbdc
added test case for alias live indices
ronnaksaxena Aug 30, 2023
820dc24
cleaned up code and moved functions to utils file
ronnaksaxena Aug 30, 2023
ef7682f
fixed detekt errors
ronnaksaxena Aug 30, 2023
e9fa46c
fixed ktlint error :/'
ronnaksaxena Aug 30, 2023
c8d8790
Can run all integ tests at once now
ronnaksaxena Aug 31, 2023
1e3294f
removed DateTimeFromatter
ronnaksaxena Aug 31, 2023
1a7da76
fixed inf interceptor loop, need to pass RollupInterceptorIT.test rol…
ronnaksaxena Sep 5, 2023
b145ecf
passes all integ tests
ronnaksaxena Sep 5, 2023
a9c6369
fixed detekt errors
ronnaksaxena Sep 5, 2023
495ee4d
deleted rest test
ronnaksaxena Sep 6, 2023
4dd4096
added test back
ronnaksaxena Sep 6, 2023
d94fbbd
trying a new workflow build
ronnaksaxena Sep 6, 2023
aeeccae
Merge branch 'case2' of https://github.com/ronnaksaxena/index-managem…
ronnaksaxena Sep 6, 2023
4ef546b
added stars to worklow files
ronnaksaxena Sep 6, 2023
bf04e32
added unit test
ronnaksaxena Sep 7, 2023
d618e90
resolved some PR comments from Bowen
ronnaksaxena Sep 7, 2023
9c567ee
resolved more comments on my PR
ronnaksaxena Sep 7, 2023
79f68e5
removed stars from workflows
ronnaksaxena Sep 7, 2023
768a2bc
testing time of alias test case
ronnaksaxena Sep 7, 2023
fb86758
boring, took too long
ronnaksaxena Sep 8, 2023
17df156
commented out last 2 tests
ronnaksaxena Sep 8, 2023
e496c57
removed all response interceptor tests
ronnaksaxena Sep 8, 2023
8aa3f81
added one test back
ronnaksaxena Sep 8, 2023
016d161
fixed data stream integ tests
ronnaksaxena Sep 11, 2023
96603eb
commented out breaking tests
ronnaksaxena Sep 11, 2023
35ee9e6
added tests back
ronnaksaxena Sep 11, 2023
06b7b02
removed countdown latch and added coroutines
ronnaksaxena Sep 12, 2023
acf1c4b
resolved comments on the PR
ronnaksaxena Sep 13, 2023
9c9d242
resolved PR comments added kotlin docs for methods
ronnaksaxena Sep 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Multi node test workflow
on:
pull_request:
branches:
- "*"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this after PR is approved.

- "**"
push:
branches:
- "*"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Test and Build Workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ import org.opensearch.indexmanagement.rollup.action.start.TransportStartRollupAc
import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction
import org.opensearch.indexmanagement.rollup.action.stop.TransportStopRollupAction
import org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter
import org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptor
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
Expand Down Expand Up @@ -208,6 +209,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
lateinit var clusterService: ClusterService
lateinit var indexNameExpressionResolver: IndexNameExpressionResolver
lateinit var rollupInterceptor: RollupInterceptor
lateinit var responseInterceptor: ResponseInterceptor
lateinit var fieldCapsFilter: FieldCapsFilter
lateinit var indexMetadataProvider: IndexMetadataProvider
private val indexMetadataServices: MutableList<Map<String, IndexMetadataService>> = mutableListOf()
Expand Down Expand Up @@ -391,6 +393,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
environment
)
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
responseInterceptor = ResponseInterceptor(clusterService, settings, indexNameExpressionResolver, client)
val jvmService = JvmService(environment.settings())
val transformRunner = TransformRunner.initialize(
client,
Expand Down Expand Up @@ -612,7 +615,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
}

override fun getTransportInterceptors(namedWriteableRegistry: NamedWriteableRegistry, threadContext: ThreadContext): List<TransportInterceptor> {
return listOf(rollupInterceptor)
return listOf(rollupInterceptor, responseInterceptor)
}

override fun getActionFilters(): List<ActionFilter> {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.ZonedDateTime

const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time"
const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis"
Expand Down Expand Up @@ -464,3 +467,83 @@

return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, Rollup.Companion::parse)
}
// Returns a SearchSourceBuilder with different aggregations but the rest of the properties are the same
@Suppress("ComplexMethod")
fun SearchSourceBuilder.changeAggregations(aggregationBuilderCollection: Collection<AggregationBuilder>): SearchSourceBuilder {
val ssb = SearchSourceBuilder()
aggregationBuilderCollection.forEach { ssb.aggregation(it) }
if (this.explain() != null) ssb.explain(this.explain())
if (this.ext() != null) ssb.ext(this.ext())
ssb.fetchSource(this.fetchSource())
this.docValueFields()?.forEach { ssb.docValueField(it.field, it.format) }
ssb.storedFields(this.storedFields())
if (this.from() >= 0) ssb.from(this.from())
ssb.highlighter(this.highlighter())
this.indexBoosts()?.forEach { ssb.indexBoost(it.index, it.boost) }
if (this.minScore() != null) ssb.minScore(this.minScore())
if (this.postFilter() != null) ssb.postFilter(this.postFilter())
ssb.profile(this.profile())
if (this.query() != null) ssb.query(this.query())
this.rescores()?.forEach { ssb.addRescorer(it) }
this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) }
if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter())
if (this.slice() != null) ssb.slice(this.slice())
if (this.size() >= 0) ssb.size(this.size())
this.sorts()?.forEach { ssb.sort(it) }
if (this.stats() != null) ssb.stats(this.stats())
if (this.suggest() != null) ssb.suggest(this.suggest())
if (this.terminateAfter() >= 0) ssb.terminateAfter(this.terminateAfter())
if (this.timeout() != null) ssb.timeout(this.timeout())
ssb.trackScores(this.trackScores())
this.trackTotalHitsUpTo()?.let { ssb.trackTotalHitsUpTo(it) }
if (this.version() != null) ssb.version(this.version())
if (this.seqNoAndPrimaryTerm() != null) ssb.seqNoAndPrimaryTerm(this.seqNoAndPrimaryTerm())
if (this.collapse() != null) ssb.collapse(this.collapse())
return ssb
}
@Suppress("MagicNumber")
fun convertDateStringToEpochMillis(dateString: String): Long {
val parts = dateString.split(" ")
require(parts.size == 2) { "Date in was not correct format" }
val dateParts = parts[0].split("-")
val timeParts = parts[1].split(":")

require((dateParts.size == 3 && timeParts.size == 3)) { "Date in was not correct format" }
val year = dateParts[0].toInt()
val month = dateParts[1].toInt()
val day = dateParts[2].toInt()

val hour = timeParts[0].toInt()
val minute = timeParts[1].toInt()
val second = timeParts[2].toInt()

val localDateTime = LocalDateTime.of(year, month, day, hour, minute, second)
val instant = localDateTime.toInstant(ZoneOffset.UTC)
return instant.toEpochMilli()
}
@Suppress("MagicNumber")
fun convertFixedIntervalStringToMs(fixedInterval: String): Long {
// Possible types are ms, s, m, h, d
val regex = """(\d+)([a-zA-Z]+)""".toRegex()
val matchResult = regex.find(fixedInterval)
?: throw IllegalArgumentException("Invalid interval format: $fixedInterval")

val numericValue = matchResult.groupValues[1].toLong()
val intervalType = matchResult.groupValues[2]

val milliseconds = when (intervalType) {
"ms" -> numericValue
"s" -> numericValue * 1000L
"m" -> numericValue * 60 * 1000L
"h" -> numericValue * 60 * 60 * 1000L
"d" -> numericValue * 24 * 60 * 60 * 1000L
"w" -> numericValue * 7 * 24 * 60 * 60 * 1000L
else -> throw IllegalArgumentException("Unsupported interval type: $intervalType")

Check warning on line 541 in src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt#L541

Added line #L541 was not covered by tests
}

return milliseconds
}

fun zonedDateTimeToMillis(zonedDateTime: ZonedDateTime): Long {
return zonedDateTime.toInstant().toEpochMilli()
}
Loading
Loading