Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically cancel aggregation if search task is cancelled #71021

Closed
scrappyiron opened this issue Mar 30, 2021 · 14 comments
Closed

Automatically cancel aggregation if search task is cancelled #71021

scrappyiron opened this issue Mar 30, 2021 · 14 comments
Assignees
Labels
:Analytics/Aggregations Aggregations >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) team-discuss

Comments

@scrappyiron
Copy link
Contributor

scrappyiron commented Mar 30, 2021

Elasticsearch version:

7.6.2

Description of the problem including expected versus actual behavior:

This is a followup to #70347

We often run search requests that have long-running aggregations, and we would like to be able to terminate the search while its in the middle of calculating the aggregations.

We would like to be able to terminate a long-running aggregation of a search request in two ways:

  1. If the client closes the connection
  2. If the client cancels the search task through the task API.

Currently if we close the connection or cancel the search task during the reduce phase, the search task is cancelled but the aggregation search still runs to completion. We would like to change this behavior so that if we close the connection or cancel the search task during the reduce phase, the aggregation will immediately terminate and send a response to the user.

Steps to reproduce:

Query body:

{
   "query": {
       "bool": {
           "filter": [
               {
                   "terms": {
                       "insurance_id": [
                           111,
                           222,
                           333,
                           444,
                           555,
                           666,
                           777
                           ...(150 more)
                       ]
                   }
               },
               {
                   "range": {
                       "start_time": {
                           "lt": "2020-12-31 00:00:00",
                           "gte": "2020-12-01 00:00:00",
                           "time_zone": "+00:00"
                       }
                   }
               }
           ]
       }
   },
   "aggs": {
       "start_time": {
           "date_histogram": {
               "field": "start_time",
               "interval": "day",
               "extended_bounds": {
                   "max": "2020-12-31 00:00:00",
                   "min": "2020-12-01 00:00:00"
               },
               "min_doc_count": 0,
               "format": "yyyy-MM-dd HH:mm:ss"
           },
           "aggs": {
               "address_id": {
                   "terms": {
                       "field": "address_id",
                       "size": 100000
                   },
                   "aggs": {
                       "age": {
                           "sum": {
                               "field": "age"
                           }
                       },
                       "salary": {
                           "sum": {
                               "field": "salary"
                           }
                       },
                       "height": {
                           "sum": {
                               "field": "height"
                           }
                       },
                       "weight": {
                           "sum": {
                               "field": "weight"
                           }
                       },
                       "english_score": {
                           "sum": {
                               "field": "english_score"
                           }
                       },
                       "math_score": {
                           "sum": {
                               "field": "math_score"
                           }
                       }
                   }
               }
           }
       }
   },
   "size": 0
}

Request params:

allow_partial_search_results=false
timeout=1000ms
request_cache=false

Provide logs (if relevant):

If I let the request run to completion without cancelling the task, the response is

{
  "took" : 29998,
  "timed_out" : false,
  "_shards" : {
    "total" : 240,
    "successful" : 240,
    "skipped" : 0,
    "failed" : 0
  },

If I cancel the search task during the reduce phase, I observe the following logs on the server

[2021-03-30T08:02:48,681][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Received ban for the parent [N78L0bWWQEuSLPvitGDBxw:97420] on the node [N78L0bWWQEuSLPvitGDBxw], reason: [by user request]
[2021-03-30T08:02:51,297][DEBUG][o.e.m.j.JvmGcMonitorService] [10.23.161.209(1)] [gc][55597] overhead, spent [109ms] collecting in the last [1s]
[2021-03-30T08:03:09,824][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [3gtuxzK2SY-zuO-uwXsA9A]
[2021-03-30T08:03:09,825][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [CTb_1tG5RUSSAUp1Kzlxuw]
[2021-03-30T08:03:09,825][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [PzB_ypGOQUyrMdkamCutwQ]
[2021-03-30T08:03:09,825][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [7Xwwlm1TTLWgqO39lAL4dw]
[2021-03-30T08:03:09,825][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [7X8uHu2SQry3-VVfHGIK4Q]
[2021-03-30T08:03:09,825][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [1G4dEDf8SIevYmBy-OTKgQ]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [88BlhmEjSE6uEWAp3YxeCg]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [Zvbxvvu2Ri6spT5c9cMK6A]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [zzcx7SAbRmqBAn0q8QfO6w]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [kSsfGkAqSC-oQhfO0XbHoQ]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [qRGABA5GQayYidPQuW1Uow]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [znWZ2uptRTCE0022ixlI9A]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [MTwuKSigTdi8Jer6IiAxJA]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [U7Ad3MOuQKe_Jk0d3sTKcQ]
[2021-03-30T08:03:09,826][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [3ruMY6IJQs6yKQtPGLF7kw]
[2021-03-30T08:03:09,827][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [zJoGmY-ERlScwT_eupyPIA]
[2021-03-30T08:03:09,827][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [GgAjEzZ7RFKsXpxNIVXt7Q]
[2021-03-30T08:03:09,827][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:97420] to the node [N78L0bWWQEuSLPvitGDBxw]
[2021-03-30T08:03:09,827][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] [10.23.161.209(1)] Removing ban for the parent [N78L0bWWQEuSLPvitGDBxw:97420] on the node [N78L0bWWQEuSLPvitGDBxw]

and the response is

{
  "took" : 28974,
  "timed_out" : false,
  "_shards" : {
    "total" : 240,
    "successful" : 240,
    "skipped" : 0,
    "failed" : 0
  },

So even though I have cancelled the search task during the reduce phase, the long-running aggregation still runs to completion instead of terminating.

Proposed change:

  1. For search requests, pass the SearchTask into InternalAggregation.java's ReduceContext.
  2. Raise a TaskCancelledException in InternalAggregation.java's consumeBucketsAndMaybeBreak() if the SearchTask is cancelled.

This should allow the aggregation reduce to immediately terminate the entire search aggregation request if it detects that the task has been cancelled each time it consumes buckets. And since closing the client connection cancels the task (#43332), this change will also allow closing the client connection to terminate the aggregation.

@scrappyiron scrappyiron added >enhancement needs:triage Requires assignment of a team area label labels Mar 30, 2021
@romseygeek romseygeek added :Analytics/Aggregations Aggregations :Search/Search Search-related issues that do not fall into other categories and removed needs:triage Requires assignment of a team area label labels Mar 30, 2021
@elasticmachine elasticmachine added Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:Search Meta label for search team labels Mar 30, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search (Team:Search)

@nik9000 nik9000 added team-discuss and removed :Search/Search Search-related issues that do not fall into other categories Team:Search Meta label for search team labels Apr 1, 2021
@imotov imotov self-assigned this Apr 7, 2021
@imotov
Copy link
Contributor

imotov commented Apr 8, 2021

@danielwhsu would you mind running hot_threads after the task is cancelled and before reduce phase ends and posting the results here?

@scrappyiron
Copy link
Contributor Author

@imotov Sure, here is the hot_threads API result after I cancel the task using the task api and before the final reduce finishes:

daniel.hsu@n227-072-133:~$ curl "http://10.23.161.209:9200/_nodes/hot_threads"
::: {10.23.197.142(1)}{GgAjEzZ7RFKsXpxNIVXt7Q}{eDbGmntFQJ606Pe-UOywww}{10.23.197.142}{10.23.197.142:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.185.198(1)}{kSsfGkAqSC-oQhfO0XbHoQ}{CQ5HviSHRdO3ES3cYzoQlQ}{10.23.185.198}{10.23.185.198:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.199.12(1)}{3ruMY6IJQs6yKQtPGLF7kw}{kt_JhoCSS1KXP5oXZ9Q5bw}{10.23.199.12}{10.23.199.12:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.176.167(1)}{88BlhmEjSE6uEWAp3YxeCg}{YbmaAWJYTkaDGk1IQY1CjQ}{10.23.176.167}{10.23.176.167:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.194.84(1)}{CTb_1tG5RUSSAUp1Kzlxuw}{oYMBmDCXRDy_GClo9dvOkw}{10.23.194.84}{10.23.194.84:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.197.158(1)}{3gtuxzK2SY-zuO-uwXsA9A}{4i55KiidTei7NCebmpFpCQ}{10.23.197.158}{10.23.197.158:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.197.157(1)}{qRGABA5GQayYidPQuW1Uow}{8X4oQuUDRmygH2ebfC6eRQ}{10.23.197.157}{10.23.197.157:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.222.165(1)}{zzcx7SAbRmqBAn0q8QfO6w}{WoTvXv6ZSJ2U5mFRZcUagA}{10.23.222.165}{10.23.222.165:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.194.227(1)}{7X8uHu2SQry3-VVfHGIK4Q}{dt30EBEJQR-y3w4ZS9xoqQ}{10.23.194.227}{10.23.194.227:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.161.209(1)}{N78L0bWWQEuSLPvitGDBxw}{SKDTH8CWTCGm9WVT_Wzk4w}{10.23.161.209}{10.23.161.209:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
   
   65.7% (328.7ms out of 500ms) cpu usage by thread 'elasticsearch[10.23.161.209(1)][search][T#33]'
     8/10 snapshots sharing following 22 elements
       app//org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.reduceBucket(InternalTerms.java:313)
       app//org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.reduce(InternalTerms.java:261)
       app//org.elasticsearch.search.aggregations.bucket.terms.LongTerms.reduce(LongTerms.java:153)
       app//org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:168)
       app//org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram.reduceBucket(InternalDateHistogram.java:384)
       app//org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram.reduceBuckets(InternalDateHistogram.java:334)
       app//org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram.reduce(InternalDateHistogram.java:452)
       app//org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:168)
       app//org.elasticsearch.search.aggregations.InternalAggregations.topLevelReduce(InternalAggregations.java:116)
       app//org.elasticsearch.action.search.SearchPhaseController.reducedQueryPhase(SearchPhaseController.java:490)
       app//org.elasticsearch.action.search.SearchPhaseController.reducedQueryPhase(SearchPhaseController.java:404)
       app//org.elasticsearch.action.search.SearchPhaseController$1.reduce(SearchPhaseController.java:725)
       app//org.elasticsearch.action.search.FetchSearchPhase.innerRun(FetchSearchPhase.java:103)
       app//org.elasticsearch.action.search.FetchSearchPhase.access$000(FetchSearchPhase.java:44)
       app//org.elasticsearch.action.search.FetchSearchPhase$1.doRun(FetchSearchPhase.java:88)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
       app//org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:44)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:692)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
       [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
       [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       [email protected]/java.lang.Thread.run(Thread.java:830)
     2/10 snapshots sharing following 22 elements
       app//org.apache.lucene.util.PriorityQueue.pop(PriorityQueue.java:175)
       app//org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.reduce(InternalTerms.java:281)
       app//org.elasticsearch.search.aggregations.bucket.terms.LongTerms.reduce(LongTerms.java:153)
       app//org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:168)
       app//org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram.reduceBucket(InternalDateHistogram.java:384)
       app//org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram.reduceBuckets(InternalDateHistogram.java:334)
       app//org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram.reduce(InternalDateHistogram.java:452)
       app//org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:168)
       app//org.elasticsearch.search.aggregations.InternalAggregations.topLevelReduce(InternalAggregations.java:116)
       app//org.elasticsearch.action.search.SearchPhaseController.reducedQueryPhase(SearchPhaseController.java:490)
       app//org.elasticsearch.action.search.SearchPhaseController.reducedQueryPhase(SearchPhaseController.java:404)
       app//org.elasticsearch.action.search.SearchPhaseController$1.reduce(SearchPhaseController.java:725)
       app//org.elasticsearch.action.search.FetchSearchPhase.innerRun(FetchSearchPhase.java:103)
       app//org.elasticsearch.action.search.FetchSearchPhase.access$000(FetchSearchPhase.java:44)
       app//org.elasticsearch.action.search.FetchSearchPhase$1.doRun(FetchSearchPhase.java:88)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
       app//org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:44)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:692)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
       [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
       [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       [email protected]/java.lang.Thread.run(Thread.java:830)

::: {10.150.120.135(1)}{7Xwwlm1TTLWgqO39lAL4dw}{eQhA4ANtSYm9O0znmOWawg}{10.150.120.135}{10.150.120.135:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.222.87(1)}{zJoGmY-ERlScwT_eupyPIA}{9q2abU-HQmOMehpO5wUMYA}{10.23.222.87}{10.23.222.87:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.214.140(1)}{1G4dEDf8SIevYmBy-OTKgQ}{OzQp-MSvR56BJKXD0AKYsg}{10.23.214.140}{10.23.214.140:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.150.120.135(2)}{PzB_ypGOQUyrMdkamCutwQ}{wFIeThS_RG69uQhRlwjSvw}{10.150.120.135}{10.150.120.135:9301}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.201.24(1)}{MTwuKSigTdi8Jer6IiAxJA}{O3OOM1bURDSg-Sk0KVPVEQ}{10.23.201.24}{10.23.201.24:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.186.88(1)}{znWZ2uptRTCE0022ixlI9A}{HRsTbbyMT8CAWMLpJIyfQw}{10.23.186.88}{10.23.186.88:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.187.135(1)}{Zvbxvvu2Ri6spT5c9cMK6A}{hACmDMZtTpuK2aD1Pxz0LA}{10.23.187.135}{10.23.187.135:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.548Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {10.23.217.48(1)}{U7Ad3MOuQKe_Jk0d3sTKcQ}{KvCpvtSGSCOMBiBN9WXcJw}{10.23.217.48}{10.23.217.48:9300}{dim}{xpack.installed=true}
   Hot threads at 2021-04-12T21:59:51.549Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

@scrappyiron
Copy link
Contributor Author

scrappyiron commented Apr 12, 2021

@imotov I have actually already made a change in my company's own fork of the Elasticsearch repo, and we've tested this change and it is able to let us stop searches during the reduce phase if the task is cancelled. If it would help I could create a pull request with the changes that we have been using, what do you think?

@imotov
Copy link
Contributor

imotov commented Apr 13, 2021

If it would help I could create a pull request with the changes that we have been using, what do you think?

If it has tests in it, it will most likely speed up things.

@scrappyiron
Copy link
Contributor Author

@imotov I've created my PR here (#71714), and I've included a unit test that raises an exception if the task has been cancelled.

I had attempted to add an integration test a few weeks ago in my company's fork of Elasticsearch that would test cancelling the SearchTask on a running search aggregation query, but it was a flaky test since it was difficult to control the timing such that the cancel request would happen during the reduce phase. Please let me know if my unit test is sufficient, or where you'd like to see more testing.

@imotov
Copy link
Contributor

imotov commented Apr 15, 2021

@danielwhsu yeah, an integration test is a bit tricky here, you basically need to block in reduce phase before cancelling and unblock after cancellation is propagated to all shards. So, I suspect the simplest way to achieve this reliably would be with something like ScriptedBlockPlugin called in reduce_script of a scripted metric aggregation. So we would block there, cancel the task, ensure cancellation is propagated and then unblock.

@scrappyiron
Copy link
Contributor Author

@imotov Got it, I'll take a look at using the script for the integration test. Before I start writing more tests though, I think it might be useful if we discuss whether the approach in the pull request looks reasonable?

@imotov
Copy link
Contributor

imotov commented Apr 15, 2021

I think it might be useful if we discuss whether the approach in the pull request looks reasonable

I think the cancellation check itself is reasonable, but the current way of plumbing the search task to it has a race condition. SearchPhaseController is a singleton, so using it to pass a search task will lead to all sorts of weird issues on busy systems. We need to find some other way of making the current search task available in the reduce phase.

Before I start writing more tests

The integration test that I suggested is very generic, so it should help regardless the actual implementation we will end up using. I think it is a good place to start working on this issue.

@scrappyiron
Copy link
Contributor Author

Hi @imotov , I've picked up this task again and I have a few questions about the integration tests for it.

I'm trying to get an integration test to work with the ScriptedBlockPlugin and scripted metric aggregation, but the aggregation is not triggering the ScriptedBlockPlugin for some reason.

This is the integration test I added in SearchCancellationIT.java:

    public void testCancellationDuringAggregation() throws Exception {
        List<ScriptedBlockPlugin> plugins = initBlockFactory();
        indexTestData();

        ActionFuture<SearchResponse> searchResponse = client()
            .prepareSearch("test")
            .setQuery(matchAllQuery())
            .addAggregation(
                new ScriptedMetricAggregationBuilder("test_agg")
                    .initScript(
                        new Script(
                            ScriptType.INLINE,
                            "mockscript",
                            SCRIPT_NAME,
                            Collections.emptyMap()
                        )
                    )
                    .mapScript(
                        new Script(
                            ScriptType.INLINE,
                            "mockscript",
                            SCRIPT_NAME,
                            Collections.emptyMap()
                        )
                    )
                    .combineScript(
                        new Script(
                            ScriptType.INLINE,
                            "mockscript",
                            SCRIPT_NAME,
                            Collections.emptyMap()
                        )
                    )
                    .reduceScript(
                        new Script(
                            ScriptType.INLINE,
                            "mockscript",
                            SCRIPT_NAME,
                            Collections.emptyMap()
                        )
                    )
            )
            .execute();

        awaitForBlock(plugins);
        cancelSearch(SearchAction.NAME);
        disableBlocks(plugins);
        ensureSearchWasCancelled(searchResponse);
    }

For debugging purposes I have made every single script field of the scripted aggregation metric use ScriptedBlockPlugin's script, but the integration test still always fails due to the awaitForBlock(plugins) detecting that there are zero plugin hits.

I don't understand why there are zero plugin hits though, because my aggregation should have at the very least run the ScriptedBlockPlugin script from map_script on all of the test documents created in indexTestData(). Do you know why this might be happening?

I'm also wondering if using the ScriptedBlockPlugin script in reduce_script will allow us to test a potential code change. Because regardless of whichever code change we use, the code change will include some logic to cancel the aggregation during the reduce phase. However if we replace the reduce phase of the aggregation with a ScriptedBlockPlugin script, then the actual code change in the reduce phase will never be triggered...?

@imotov
Copy link
Contributor

imotov commented May 16, 2021

@danielwhsu could you push these changes into your PR so I can take a look?

@scrappyiron
Copy link
Contributor Author

@imotov sure, I've just updated #71714 with my integration test (I've removed the old cancellation logic since it's incorrect).

imotov added a commit that referenced this issue Oct 1, 2021
…71714)

This change raises a TaskCancelledException to stop the search query if it is detected that the SearchTask has been cancelled during the reduce phase.

Issue: #71021

Co-authored-by: Daniel Hsu <[email protected]>
Co-authored-by: Igor Motov <[email protected]>
@imotov
Copy link
Contributor

imotov commented Oct 1, 2021

Closed by #71714

@imotov imotov closed this as completed Oct 1, 2021
imotov added a commit to imotov/elasticsearch that referenced this issue Oct 2, 2021
…lastic#71714)

This change raises a TaskCancelledException to stop the search query if it is detected that the SearchTask has been cancelled during the reduce phase.

Issue: elastic#71021

Co-authored-by: Daniel Hsu <[email protected]>
Co-authored-by: Igor Motov <[email protected]>
imotov added a commit to imotov/elasticsearch that referenced this issue Oct 2, 2021
The SearchCancellationIT#testCancellationDuringAggregation only works when
real reduce takes place and therefore needs at least 2 shards to be present.

Relates to elastic#71021
imotov added a commit that referenced this issue Oct 2, 2021
The SearchCancellationIT#testCancellationDuringAggregation only works when
real reduce takes place and therefore needs at least 2 shards to be present.

Relates to #71021
imotov added a commit to imotov/elasticsearch that referenced this issue Oct 2, 2021
The SearchCancellationIT#testCancellationDuringAggregation only works when
real reduce takes place and therefore needs at least 2 shards to be present.

Relates to elastic#71021
imotov added a commit that referenced this issue Oct 5, 2021
…led (#78583)

This change raises a TaskCancelledException to stop the search query if it is detected that the SearchTask has been cancelled during the reduce phase.

Issue: #71021

Co-authored-by: Daniel Hsu <[email protected]>
Co-authored-by: Igor Motov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/Aggregations Aggregations >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) team-discuss
Projects
None yet
Development

No branches or pull requests

5 participants