diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index f586677d12afd..52d87e4cd44f9 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -116,7 +116,7 @@ public abstract class AbstractAsyncBulkByScrollAction listener) { this.task = task; @@ -158,7 +158,7 @@ public BiFunction, ScrollableHitSource.Hit, RequestWrapper> // The default script applier executes a no-op return (request, searchHit) -> request; } - + /** * Build the {@link RequestWrapper} for a single search hit. This shouldn't handle * metadata or scripting. That will be handled by copyMetadata and @@ -237,7 +237,7 @@ public void start() { } try { startTime.set(System.nanoTime()); - scrollSource.start(response -> onScrollResponse(timeValueNanos(System.nanoTime()), 0, response)); + scrollSource.start(response -> onScrollResponse(System.nanoTime(), 0, response)); } catch (Exception e) { finishHim(e); } @@ -245,11 +245,11 @@ public void start() { /** * Process a scroll response. - * @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay. + * @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay. * @param lastBatchSize the size of the last batch. Used to calculate the throttling delay. * @param response the scroll response to process */ - void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.Response response) { + void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.Response response) { logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size()); if (task.isCancelled()) { logger.debug("[{}]: finishing early because the task was cancelled", task.getId()); @@ -276,7 +276,7 @@ protected void doRun() throws Exception { * It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time * waiting on the scroll doesn't count against this batch in the throttle. */ - prepareBulkRequest(timeValueNanos(System.nanoTime()), response); + prepareBulkRequest(System.nanoTime(), response); } @Override @@ -285,7 +285,7 @@ public void onFailure(Exception e) { } }; prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable); - worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable); + worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable); } /** @@ -293,7 +293,7 @@ public void onFailure(Exception e) { * delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the * thread may be blocked by the user script. */ - void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Response response) { + void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.Response response) { logger.debug("[{}]: preparing bulk request", task.getId()); if (task.isCancelled()) { logger.debug("[{}]: finishing early because the task was cancelled", task.getId()); @@ -318,18 +318,18 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon /* * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation. */ - startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), 0); + startNextScroll(thisBatchStartTimeNS, System.nanoTime(), 0); return; } request.timeout(mainRequest.getTimeout()); request.waitForActiveShards(mainRequest.getWaitForActiveShards()); - sendBulkRequest(thisBatchStartTime, request); + sendBulkRequest(thisBatchStartTimeNS, request); } /** * Send a bulk request, handling retries. */ - void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) { + void sendBulkRequest(long thisBatchStartTimeNS, BulkRequest request) { if (logger.isDebugEnabled()) { logger.debug("[{}]: sending [{}] entry, [{}] bulk request", task.getId(), request.requests().size(), new ByteSizeValue(request.estimatedSizeInBytes())); @@ -342,7 +342,7 @@ void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) { bulkRetry.withBackoff(client::bulk, request, new ActionListener() { @Override public void onResponse(BulkResponse response) { - onBulkResponse(thisBatchStartTime, response); + onBulkResponse(thisBatchStartTimeNS, response); } @Override @@ -355,7 +355,7 @@ public void onFailure(Exception e) { /** * Processes bulk responses, accounting for failures. */ - void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) { + void onBulkResponse(long thisBatchStartTimeNS, BulkResponse response) { try { List failures = new ArrayList(); Set destinationIndicesThisBatch = new HashSet<>(); @@ -403,7 +403,7 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) { return; } - startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), response.getItems().length); + startNextScroll(thisBatchStartTimeNS, System.nanoTime(), response.getItems().length); } catch (Exception t) { finishHim(t); } @@ -415,15 +415,15 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) { * @param lastBatchSize the number of requests sent in the last batch. This is used to calculate the throttling values which are applied * when the scroll returns */ - void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) { + void startNextScroll(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) { if (task.isCancelled()) { logger.debug("[{}]: finishing early because the task was cancelled", task.getId()); finishHim(null); return; } - TimeValue extraKeepAlive = worker.throttleWaitTime(lastBatchStartTime, now, lastBatchSize); + TimeValue extraKeepAlive = worker.throttleWaitTime(lastBatchStartTimeNS, nowNS, lastBatchSize); scrollSource.startNextScroll(extraKeepAlive, response -> { - onScrollResponse(lastBatchStartTime, lastBatchSize, response); + onScrollResponse(lastBatchStartTimeNS, lastBatchSize, response); }); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 11326e4236b97..402a77512e811 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -107,7 +107,6 @@ import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; -import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; @@ -208,7 +207,7 @@ public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1); DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); action.setScroll(scrollId()); - TimeValue now = timeValueNanos(System.nanoTime()); + long now = System.nanoTime(); action.startNextScroll(now, now, 0); assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get())); if (listener.isDone()) { @@ -223,7 +222,7 @@ public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() t client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100); DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); action.setScroll(scrollId()); - TimeValue now = timeValueNanos(System.nanoTime()); + long now = System.nanoTime(); action.startNextScroll(now, now, 0); assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get())); assertBusy(() -> assertTrue(listener.isDone())); @@ -239,7 +238,7 @@ public void testScrollResponseSetsTotal() { long total = randomIntBetween(0, Integer.MAX_VALUE); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response); assertEquals(total, testTask.getStatus().getTotal()); } @@ -252,7 +251,7 @@ public void testScrollResponseBatchingBehavior() throws Exception { Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null); DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); - simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response); + simulateScrollResponse(action, System.nanoTime(), 0, response); // Use assert busy because the update happens on another thread final int expectedBatches = batches; @@ -305,7 +304,7 @@ public void testBulkResponseSetsLotsOfStatus() { new IndexResponse(shardId, "type", "id" + i, seqNo, primaryTerm, randomInt(), createdResponse); responses[i] = new BulkItemResponse(i, opType, response); } - new DummyAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0)); + new DummyAsyncBulkByScrollAction().onBulkResponse(System.nanoTime(), new BulkResponse(responses, 0)); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); assertEquals(updated, testTask.getStatus().getUpdated()); assertEquals(created, testTask.getStatus().getCreated()); @@ -335,7 +334,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n } }); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class)); assertThat(e.getCause(), hasToString(containsString("test"))); @@ -353,7 +352,7 @@ public void testShardFailuresAbortRequest() throws Exception { SearchFailure shardFailure = new SearchFailure(new RuntimeException("test")); ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse); BulkByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), empty()); assertThat(response.getSearchFailures(), contains(shardFailure)); @@ -367,7 +366,7 @@ public void testShardFailuresAbortRequest() throws Exception { */ public void testSearchTimeoutsAbortRequest() throws Exception { ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse); BulkByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), empty()); assertThat(response.getSearchFailures(), empty()); @@ -384,7 +383,7 @@ public void testBulkFailuresAbortRequest() throws Exception { DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong()); - action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse); + action.onBulkResponse(System.nanoTime(), bulkResponse); BulkByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), contains(failure)); assertThat(response.getSearchFailures(), empty()); @@ -404,7 +403,7 @@ protected AbstractAsyncBulkByScrollAction.RequestWrapper buildRequest(Hit doc ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0); hit.setSource(new BytesArray("{}"), XContentType.JSON); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null); - simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response); + simulateScrollResponse(action, System.nanoTime(), 0, response); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); assertThat(e.getCause(), instanceOf(RuntimeException.class)); assertThat(e.getCause().getMessage(), equalTo("surprise")); @@ -456,8 +455,8 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n // Set throttle to 1 request per second to make the math simpler worker.rethrottle(1f); // Make the last batch look nearly instant but have 100 documents - TimeValue lastBatchStartTime = timeValueNanos(System.nanoTime()); - TimeValue now = timeValueNanos(lastBatchStartTime.nanos() + 1); + long lastBatchStartTime = System.nanoTime(); + long now = lastBatchStartTime + 1; action.startNextScroll(lastBatchStartTime, now, 100); // So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish) @@ -503,7 +502,7 @@ private void bulkRetryTestCase(boolean failWithRejection) throws Exception { CountDownLatch successLatch = new CountDownLatch(1); DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() { @Override - void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) { + void startNextScroll(long lastBatchStartTime, long now, int lastBatchSize) { successLatch.countDown(); } }; @@ -511,7 +510,7 @@ void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchS for (int i = 0; i < size + 1; i++) { request.add(new IndexRequest("index", "type", "id" + i)); } - action.sendBulkRequest(timeValueNanos(System.nanoTime()), request); + action.sendBulkRequest(System.nanoTime(), request); if (failWithRejection) { BulkByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), hasSize(1)); @@ -577,22 +576,22 @@ public void testCancelBeforeInitialSearch() throws Exception { } public void testCancelBeforeScrollResponse() throws Exception { - cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1, + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1, new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null))); } public void testCancelBeforeSendBulkRequest() throws Exception { cancelTaskCase((DummyAsyncBulkByScrollAction action) -> - action.sendBulkRequest(timeValueNanos(System.nanoTime()), new BulkRequest())); + action.sendBulkRequest(System.nanoTime(), new BulkRequest())); } public void testCancelBeforeOnBulkResponse() throws Exception { cancelTaskCase((DummyAsyncBulkByScrollAction action) -> - action.onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(new BulkItemResponse[0], 0))); + action.onBulkResponse(System.nanoTime(), new BulkResponse(new BulkItemResponse[0], 0))); } public void testCancelBeforeStartNextScroll() throws Exception { - TimeValue now = timeValueNanos(System.nanoTime()); + long now = System.nanoTime(); cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.startNextScroll(now, now, 0)); } @@ -641,7 +640,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null); // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task worker.rethrottle(1); - simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response); + simulateScrollResponse(action, System.nanoTime(), 1000, response); // Now that we've got our cancel we'll just verify that it all came through all right assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled()); @@ -670,7 +669,7 @@ private void cancelTaskCase(Consumer testMe) throw /** * Simulate a scroll response by setting the scroll id and firing the onScrollResponse method. */ - private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize, + private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize, ScrollableHitSource.Response response) { action.setScroll(scrollId()); action.onScrollResponse(lastBatchTime, lastBatchSize, response); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java index ae2a6a552cba4..5d50cf7791419 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java @@ -182,11 +182,11 @@ TimeValue throttledUntil() { * Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be * rescheduled over and over again. */ - public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize, + public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize, AbstractRunnable prepareBulkRequestRunnable) { // Synchronize so we are less likely to schedule the same request twice. synchronized (delayedPrepareBulkRequestReference) { - TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize); + TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize); logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay); try { delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), @@ -197,8 +197,8 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt } } - public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) { - long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize); + public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) { + long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize); long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime())); return timeValueNanos(waitTime); } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index a76fced9772f5..770fc65621055 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.closeTo; @@ -152,7 +151,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n } }; try { - workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, + workerState.delayPrepareBulkRequest(threadPool, System.nanoTime(), batchSizeForMaxDelay, new AbstractRunnable() { @Override protected void doRun() throws Exception { @@ -225,7 +224,7 @@ public boolean isCancelled() { }; try { // Have the task use the thread pool to delay a task that does nothing - workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() { + workerState.delayPrepareBulkRequest(threadPool, 0, 1, new AbstractRunnable() { @Override protected void doRun() throws Exception { }