Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollup Search API : Searching both historical rollup and non-rollup data #901

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0ea7f53
added response interceptor
ronnaksaxena Aug 18, 2023
4e93b1f
Base case: Query Live and Rollup data with no overlap
ronnaksaxena Aug 18, 2023
7fd394a
finished base case and added integ test
ronnaksaxena Aug 18, 2023
f023a39
added to response interceptor
ronnaksaxena Aug 22, 2023
4aa4202
can rewrite request to bucket pipeline
ronnaksaxena Aug 22, 2023
c3b5477
trying to rewrite aggregations in a helper function
ronnaksaxena Aug 23, 2023
5d20241
able to create new aggreations but getting shardIndex is not set error
ronnaksaxena Aug 23, 2023
743263b
Can find start and end times for rollup and live index
ronnaksaxena Aug 23, 2023
1f07f21
Handles overlap between 1 live index and 1 rollup index for sum aggre…
ronnaksaxena Aug 24, 2023
b4dbc26
added min max aggregations and fixed intersection time calculation
ronnaksaxena Aug 24, 2023
1ee93c6
changed variable name in computeAggregationsWithoutOverlap
ronnaksaxena Aug 24, 2023
4203a60
Added integ tests for nonoverlapping case
ronnaksaxena Aug 28, 2023
72e0aad
added avg and value count aggregation
ronnaksaxena Aug 29, 2023
9b977d2
fixed ktlint and integ test
ronnaksaxena Aug 29, 2023
8ed31e7
changed test and build workflow
ronnaksaxena Aug 29, 2023
f0d364f
added integ test for multiple live indices
ronnaksaxena Aug 30, 2023
b52fbdc
added test case for alias live indices
ronnaksaxena Aug 30, 2023
820dc24
cleaned up code and moved functions to utils file
ronnaksaxena Aug 30, 2023
ef7682f
fixed detekt errors
ronnaksaxena Aug 30, 2023
e9fa46c
fixed ktlint error :/'
ronnaksaxena Aug 30, 2023
c8d8790
Can run all integ tests at once now
ronnaksaxena Aug 31, 2023
1e3294f
removed DateTimeFromatter
ronnaksaxena Aug 31, 2023
1a7da76
fixed inf interceptor loop, need to pass RollupInterceptorIT.test rol…
ronnaksaxena Sep 5, 2023
b145ecf
passes all integ tests
ronnaksaxena Sep 5, 2023
a9c6369
fixed detekt errors
ronnaksaxena Sep 5, 2023
495ee4d
deleted rest test
ronnaksaxena Sep 6, 2023
4dd4096
added test back
ronnaksaxena Sep 6, 2023
d94fbbd
trying a new workflow build
ronnaksaxena Sep 6, 2023
aeeccae
Merge branch 'case2' of https://github.com/ronnaksaxena/index-managem…
ronnaksaxena Sep 6, 2023
4ef546b
added stars to worklow files
ronnaksaxena Sep 6, 2023
bf04e32
added unit test
ronnaksaxena Sep 7, 2023
d618e90
resolved some PR comments from Bowen
ronnaksaxena Sep 7, 2023
9c567ee
resolved more comments on my PR
ronnaksaxena Sep 7, 2023
79f68e5
removed stars from workflows
ronnaksaxena Sep 7, 2023
768a2bc
testing time of alias test case
ronnaksaxena Sep 7, 2023
fb86758
boring, took too long
ronnaksaxena Sep 8, 2023
17df156
commented out last 2 tests
ronnaksaxena Sep 8, 2023
e496c57
removed all response interceptor tests
ronnaksaxena Sep 8, 2023
8aa3f81
added one test back
ronnaksaxena Sep 8, 2023
016d161
fixed data stream integ tests
ronnaksaxena Sep 11, 2023
96603eb
commented out breaking tests
ronnaksaxena Sep 11, 2023
35ee9e6
added tests back
ronnaksaxena Sep 11, 2023
06b7b02
removed countdown latch and added coroutines
ronnaksaxena Sep 12, 2023
acf1c4b
resolved comments on the PR
ronnaksaxena Sep 13, 2023
9c9d242
resolved PR comments added kotlin docs for methods
ronnaksaxena Sep 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run integration tests with multi node config
run: ./gradlew integTest -Dtests.class="org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptorIT"
run: ./gradlew integTest
ronnaksaxena marked this conversation as resolved.
Show resolved Hide resolved
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class ResponseInterceptor(
val client: Client
) : TransportInterceptor {
private val logger = LogManager.getLogger(javaClass)

override fun interceptSender(sender: TransportInterceptor.AsyncSender): TransportInterceptor.AsyncSender {
return CustomAsyncSender(sender)
}
Expand Down Expand Up @@ -195,6 +194,7 @@ class ResponseInterceptor(
.indices(*rollupIndices) // add all rollup indices to this request
var maxRolledDateResponse: SearchResponse? = null
var latch = CountDownLatch(1)
logger.info("ronsax sending request to find max rollup time for index: $shardRequestIndex")
ronnaksaxena marked this conversation as resolved.
Show resolved Hide resolved
client.search(
maxRolledDateRequest,
object : ActionListener<SearchResponse> {
Expand Down Expand Up @@ -230,6 +230,7 @@ class ResponseInterceptor(

var minLiveDateResponse: SearchResponse? = null
latch = CountDownLatch(1)
logger.info("ronsax sending request to find minLiveData for index: $shardRequestIndex")
ronnaksaxena marked this conversation as resolved.
Show resolved Hide resolved
client.search(
minLiveDateRequest,
object : ActionListener<SearchResponse> {
Expand All @@ -256,6 +257,7 @@ class ResponseInterceptor(
// If intersection found on rollup index, remove overlap
if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) {
// Start at 0, end at live data
logger.info("ronsax sending request to find rollup endtime for index: $shardRequestIndex")
ronnaksaxena marked this conversation as resolved.
Show resolved Hide resolved
val endTime = getRollupEndTime(liveDataStartPoint, rollupIndices, dateTargetField)
return Pair(0L, endTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RollupInterceptor(
}
// Returns Pair<containsRollup: Boolean, rollupJob: RollupJob>
ronnaksaxena marked this conversation as resolved.
Show resolved Hide resolved
@Suppress("SpreadOperator")
private fun originalSearchContainsRollup(request: ShardSearchRequest): Pair<Boolean, Rollup?> {
private fun originalSearchContainsRollup(request: ShardSearchRequest): Pair<Boolean, Rollup?> { // Throwing an error on data streams
val indices = request.indices().map { it.toString() }.toTypedArray()
val allIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
Expand Down Expand Up @@ -169,7 +169,7 @@ class RollupInterceptor(
}
// Wraps all existing aggs in bucket aggregation
// Notifies the response interceptor that was rewritten since agg name is interceptor_interval_data
// Edge case if User selected this as the aggregation name :/
// Edge case if User selected interceptor_interval_data as the aggregation name :/
val intervalAgg = AggregationBuilders.dateHistogram("interceptor_interval_data")
.field(dateSourceField)
.calendarInterval(DateHistogramInterval(rollupInterval))
Expand All @@ -190,14 +190,16 @@ class RollupInterceptor(
return object : TransportRequestHandler<T> {
override fun messageReceived(request: T, channel: TransportChannel, task: Task) {
if (searchEnabled && request is ShardSearchRequest) {
val (containsRollup, rollupJob) = originalSearchContainsRollup(request)
val isDataStream = (request.indices().any { IndexUtils.isDataStream(it, clusterService.state()) })
val shardRequestIndex = request.shardId().indexName
val isRollupIndex = isRollupIndex(shardRequestIndex, clusterService.state())
// isRollupIndex throws an exception if the request is on a data stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the exception found out here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a null ptr exception when checking is the shardId indexName for a data stream is a rollup index

val isRollupIndex = if (isDataStream) false else isRollupIndex(shardRequestIndex, clusterService.state())
val (containsRollup, rollupJob) = if (isDataStream) Pair(false, null) else originalSearchContainsRollup(request)
// Only modifies rollup searches and avoids internal client calls
if (containsRollup || isRollupIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need || isRollupIndex here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this check is needed. The base case fails unless I check for both but maybe the logic could be cleaned up to have less if checks

val (concreteRollupIndicesArray, concreteLiveIndicesArray) = getConcreteIndices(request)
/* Avoid infinite interceptor loop:
if there is an internal client call made in the reponse interceptor there is only 1 index.
if there is an internal client call made in the response interceptor there is only 1 index.
Therefore, conditions are not met for api to combine rollup and live data
*/
val isMultiSearch = (concreteRollupIndicesArray.isNotEmpty() && concreteLiveIndicesArray.isNotEmpty())
Expand Down
Loading
Loading