diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 68775b5af5cfc..2b83e431d75db 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -82,7 +82,8 @@ public static class Builder { private final BiConsumer> consumer; private final Listener listener; - private final Scheduler scheduler; + private final Scheduler flushScheduler; + private final Scheduler retryScheduler; private final Runnable onClose; private int concurrentRequests = 1; private int bulkActions = 1000; @@ -95,10 +96,11 @@ public static class Builder { private String globalPipeline; private Builder(BiConsumer> consumer, Listener listener, - Scheduler scheduler, Runnable onClose) { + Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) { this.consumer = consumer; this.listener = listener; - this.scheduler = scheduler; + this.flushScheduler = flushScheduler; + this.retryScheduler = retryScheduler; this.onClose = onClose; } @@ -182,7 +184,7 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) { */ public BulkProcessor build() { return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, - bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults()); + bulkSize, flushInterval, flushScheduler, retryScheduler, onClose, createBulkRequestWithGlobalDefaults()); } private Supplier createBulkRequestWithGlobalDefaults() { @@ -192,19 +194,55 @@ private Supplier createBulkRequestWithGlobalDefaults() { } } + /** + * @param client The client that executes the bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @param flushScheduler The scheduler that is used to flush + * @param retryScheduler The scheduler that is used for retries + * @param onClose The runnable instance that is executed on close. Consumers are required to clean up the schedulers. + * @return the builder for BulkProcessor + */ + public static Builder builder(Client client, Listener listener, Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) { + Objects.requireNonNull(client, "client"); + Objects.requireNonNull(listener, "listener"); + return new Builder(client::bulk, listener, flushScheduler, retryScheduler, onClose); + } + + + /** + * @param client The client that executes the bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @return the builder for BulkProcessor + * @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)} + * with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client, + * org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler, + * org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly + */ + @Deprecated public static Builder builder(Client client, Listener listener) { Objects.requireNonNull(client, "client"); Objects.requireNonNull(listener, "listener"); - return new Builder(client::bulk, listener, client.threadPool(), () -> {}); + return new Builder(client::bulk, listener, client.threadPool(), client.threadPool(), () -> {}); } + /** + * @param consumer The consumer that is called to fulfil bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @return the builder for BulkProcessor + */ public static Builder builder(BiConsumer> consumer, Listener listener) { Objects.requireNonNull(consumer, "consumer"); Objects.requireNonNull(listener, "listener"); - final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); return new Builder(consumer, listener, - buildScheduler(scheduledThreadPoolExecutor), - () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); + buildScheduler(flushScheduledThreadPoolExecutor), + buildScheduler(retryScheduledThreadPoolExecutor), + () -> + { + Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); + Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); + }); } private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { @@ -222,25 +260,34 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr private BulkRequest bulkRequest; private final Supplier bulkRequestSupplier; private final BulkRequestHandler bulkRequestHandler; - private final Scheduler scheduler; private final Runnable onClose; private volatile boolean closed = false; BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, - Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { + Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose, Supplier bulkRequestSupplier) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); - this.scheduler = scheduler; this.bulkRequest = bulkRequestSupplier.get(); this.bulkRequestSupplier = bulkRequestSupplier; - this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); + this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests); // Start period flushing task after everything is setup - this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); + this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler); this.onClose = onClose; } + /** + * @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry + */ + @Deprecated + BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, + int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, + Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { + this(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, + scheduler, scheduler, onClose, bulkRequestSupplier ); + } + /** * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. */ diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java index 76a99994e04ee..fe784fe07b804 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java @@ -63,7 +63,7 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception { BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) @@ -87,7 +87,7 @@ public void testBulkProcessorFlush() throws Exception { int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that this bulk won't be automatically flushed .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -122,7 +122,7 @@ public void testBulkProcessorConcurrentRequests() throws Exception { MultiGetRequestBuilder multiGetRequestBuilder; - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -204,7 +204,7 @@ public void testBulkProcessorWaitOnClose() throws Exception { BulkProcessorTestListener listener = new BulkProcessorTestListener(); int numDocs = randomIntBetween(10, 100); - BulkProcessor processor = BulkProcessor.builder(client(), listener) + BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), @@ -251,7 +251,7 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index f1731083ae376..ce3835a96ec3d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -77,7 +77,7 @@ private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejec assertAcked(prepareCreate(INDEX_NAME)); ensureGreen(); - BulkProcessor bulkProcessor = BulkProcessor.builder(client(), new BulkProcessor.Listener() { + BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // no op diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index ae5be24c3410c..c592d6e910098 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -438,7 +438,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} }; int bulkSize = between(1, 20); - BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener) + BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener) .setBulkActions(bulkSize) .setConcurrentRequests(4) .build(); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 7af26953ff68d..bd820fc508961 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -57,7 +58,6 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -356,7 +356,7 @@ public Collection createComponents(Client client, ClusterService cluster final InputRegistry inputRegistry = new InputRegistry(inputFactories); inputFactories.put(ChainInput.TYPE, new ChainInputFactory(inputRegistry)); - bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() { + bulkProcessor = BulkProcessor.builder(new OriginSettingClient(client, WATCHER_ORIGIN)::bulk, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index 428ec96df97e0..e55ad626271cd 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -128,7 +128,8 @@ public void init() { when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); parser = mock(TriggeredWatch.Parser.class); - BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + BulkProcessor bulkProcessor = BulkProcessor. + builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor); } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 2ea364de18b4e..072854d7cd467 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -70,7 +70,7 @@ public void init() { when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); - BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); historyStore = new HistoryStore(bulkProcessor); }