From 4661adaea2f8c9b9a3d7f177fc2f24b08206720a Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 22 Oct 2024 21:44:29 -0700 Subject: [PATCH] Fix race condition in PageListener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR - Introduced an `AtomicInteger` called `pagesInFlight` to track the number of pages currently being processed.  - Incremented `pagesInFlight` before processing each page and decremented it after processing is complete - Adjusted the condition in `scheduleImputeHCTask` to check both `pagesInFlight.get() == 0` (all pages have been processed) and `sentOutPages.get() == receivedPages.get()` (all responses have been received) before scheduling the `imputeHC` task.  - Removed the previous final check in `onResponse` that decided when to schedule `imputeHC`, relying instead on the updated counters for accurate synchronization. These changes address the race condition where `sentOutPages` might not have been incremented in time before checking whether to schedule the `imputeHC` task. By accurately tracking the number of in-flight pages and sent pages, we ensure that `imputeHC` is executed only after all pages have been fully processed and all responses have been received. Testing done: 1. Reproduced the race condition by starting two detectors with imputation. This causes an out of order illegal argument exception from RCF due to this race condition. Also verified the change fixed the problem. 2. added an IT for the above scenario. Signed-off-by: Kaituo Li --- .../org/opensearch/timeseries/transport/ResultProcessor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/org/opensearch/timeseries/transport/ResultProcessor.java b/src/main/java/org/opensearch/timeseries/transport/ResultProcessor.java index 2b7ebca65..8eb5d5d88 100644 --- a/src/main/java/org/opensearch/timeseries/transport/ResultProcessor.java +++ b/src/main/java/org/opensearch/timeseries/transport/ResultProcessor.java @@ -239,6 +239,9 @@ public void onResponse(CompositeRetriever.Page entityFeatures) { scheduleImputeHCTask(); } + // Increment pagesInFlight to track the processing of this page + pagesInFlight.incrementAndGet(); + if (entityFeatures != null && false == entityFeatures.isEmpty()) { LOG .info(