Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk processor concurrent requests #41451

Merged

Conversation

jakelandis
Copy link
Contributor

@jakelandis jakelandis commented Apr 23, 2019

org.elasticsearch.action.bulk.BulkProcessor is a threadsafe class that
allows for simple semantics to deal with sending bulk requests. Once a
bulk reaches it's pre-defined size, documents, or flush interval it will
execute sending the bulk. One configurable option is the number of concurrent
outstanding bulk requests. That concurrency is implemented in
org.elasticsearch.action.bulk.BulkRequestHandler via a semaphore. However,
the only code that currently calls into this code is blocked by synchronized
methods. This results in the in-ability for the BulkProcessor to behave concurrently
despite supporting configurable amounts of concurrent requests.

This change removes the synchronized method in favor an explicit
lock around the non-thread safe parts of the method. The call into
org.elasticsearch.action.bulk.BulkRequestHandler is no longer blocking, which
allows org.elasticsearch.action.bulk.BulkRequestHandler to handle it's own concurrency.


Currently the only production code that uses this class is Watcher
and it is defaulted to 0 concurrency. This change allows Watcher to optionally
increase the concurrency.

I have run this test a few thousand times and can not get it to error. Also, if I remove the lock locally the test will fail pretty quickly.

This is tangentially related to #41418 since it is the same synchronized method that can cause issues. However, this change without that fix only moves where the blocking happens (from synchronized block to semaphore block).

Adding HLRC and Watcher since outside of tests, they are the largest consumers.

  `org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that
  allows for simple semantics to deal with sending bulk requests. Once a
  bulk reaches it's pre-defined size, documents, or flush interval it will
  execute sending the bulk. One configurable option is the number of concurrent
  outstanding bulk requests. That concurrency is implemented in
  `org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However,
  the only code that currently calls into this code is blocked by `sychrnoized`
  methods. This results in the in-ability for the BulkProcessor to behave concurrently
  despite supporting configurable amounts of concurrent requests.

  This change removes the `synchronized` method in favor an explicit
  lock around the non-thread safe parts of the method. The call into
  `org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which
  allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.

  Note - only the primary add method has been updated since this is the hot path.
  Currently only Watcher uses this class and the concurrency is set to 0 by default.
  This change allows Watcher to optionally increase the concurrency.
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@jakelandis jakelandis added the WIP label Apr 23, 2019
@jakelandis
Copy link
Contributor Author

Hold off reviewing...I think I need to wrap the other instance swapping inside the same lock.

@jakelandis
Copy link
Contributor Author

@elasticmachine run elasticsearch-ci/bwc
@elasticmachine run elasticsearch-ci/default-distro

@jakelandis jakelandis removed the WIP label Apr 24, 2019
@jakelandis
Copy link
Contributor Author

@martijnvg @hub-cap - this is ready for review. I have re-run the tests a few hundred more times with the code as-is and can't get it to fail. To see this change in action you need to add the following println in BulkRequestHandler and run the concurrency tests.

   public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            System.out.println("permits:" + semaphore.availablePermits() + " of " + concurrentRequests + " queue length: " + semaphore.getQueueLength());

You will notice that before this change, it at most ever acquired 1 permit (due to the upstream synchronous method). With this change it can consume all the permits and add to the queue.

Copy link
Contributor

@hub-cap hub-cap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thank you for the detailed commit msg and note about the testing. I had one question but I will defer to those who are more well versed than I in the art of locks in java!

Copy link
Member

@jaymode jaymode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments about lock usage and the testing

for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) {
//alternate between ways to add to the bulk processor
futures.add(executorService.submit(() -> {
if(i.get() % 2 == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is the best way to test this. Ideally you want all of your threads ready to go prior to starting work. Usually we use a cyclic barrier or countdown latch so that we know all threads are ready prior to execution of the concurrent tasks. An example use of a cyclic barrier would be

public void testExceptionThrownDuringConcurrentComputeIfAbsent() throws BrokenBarrierException, InterruptedException {
int numberOfThreads = randomIntBetween(2, 32);
final Cache<String, String> cache = CacheBuilder.<String, String>builder().build();
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
final String key = randomAlphaOfLengthBetween(2, 32);
for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> {
try {
barrier.await();
for (int j = 0; j < numberOfEntries; j++) {
try {
String value = cache.computeIfAbsent(key, k -> {
throw new RuntimeException("failed to load");
});
fail("expected exception but got: " + value);
} catch (ExecutionException e) {
assertNotNull(e.getCause());
assertThat(e.getCause(), instanceOf(RuntimeException.class));
assertEquals(e.getCause().getMessage(), "failed to load");
}
}
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
}
});
thread.start();
}
// wait for all threads to be ready
barrier.await();
// wait for all threads to finish
barrier.await();
}
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaymode - thanks for the pointer. I added countdown latch to ensure that all the tasks are submitted to the executor before doing any work. Let me know if this missed the mark. - a1e456c

@jakelandis
Copy link
Contributor Author

@jaymode @hub-cap - thanks for your reviews! I believe all comments have been addressed.

Copy link
Member

@jaymode jaymode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some more comments and suggestions. Mainly concerning the handling of InterruptedException, keeping track of all exceptions, and tightening up the start gate latch so we know all items have been added and the executor has all of its threads constructed and has started running the expected number of concurrent tasks.

@jakelandis

This comment has been minimized.

@jakelandis jakelandis merged commit bd1dc98 into elastic:master May 23, 2019
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request May 23, 2019
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that
allows for simple semantics to deal with sending bulk requests. Once a
bulk reaches it's pre-defined size, documents, or flush interval it will
execute sending the bulk. One configurable option is the number of concurrent
outstanding bulk requests. That concurrency is implemented in
`org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However,
the only code that currently calls into this code is blocked by `synchronized`
methods. This results in the in-ability for the BulkProcessor to behave concurrently
despite supporting configurable amounts of concurrent requests.

This change removes the `synchronized` method in favor an explicit
lock around the non-thread safe parts of the method. The call into
`org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which
allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
jakelandis added a commit that referenced this pull request May 23, 2019
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that
allows for simple semantics to deal with sending bulk requests. Once a
bulk reaches it's pre-defined size, documents, or flush interval it will
execute sending the bulk. One configurable option is the number of concurrent
outstanding bulk requests. That concurrency is implemented in
`org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However,
the only code that currently calls into this code is blocked by `synchronized`
methods. This results in the in-ability for the BulkProcessor to behave concurrently
despite supporting configurable amounts of concurrent requests.

This change removes the `synchronized` method in favor an explicit
lock around the non-thread safe parts of the method. The call into
`org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which
allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
gurkankaymak pushed a commit to gurkankaymak/elasticsearch that referenced this pull request May 27, 2019
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that
allows for simple semantics to deal with sending bulk requests. Once a
bulk reaches it's pre-defined size, documents, or flush interval it will
execute sending the bulk. One configurable option is the number of concurrent
outstanding bulk requests. That concurrency is implemented in
`org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However,
the only code that currently calls into this code is blocked by `synchronized`
methods. This results in the in-ability for the BulkProcessor to behave concurrently
despite supporting configurable amounts of concurrent requests.

This change removes the `synchronized` method in favor an explicit
lock around the non-thread safe parts of the method. The call into
`org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which
allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Oct 4, 2019
7.3.0 fixes this issue by changing the locking strategy in elastic#41451.
However, that change is not part of 6.x and the change here is
a minimal workaround to prevent the potential of deadlock.

This change will no longer retry failed bulk requests that go
through the BulkProcessor for Watcher. Specifically this removes
the retry logic when adding Watcher history and Triggered watches
when the Bulk request failed.

Related elastic#47599
jakelandis added a commit that referenced this pull request Oct 7, 2019
7.3.0 fixes this issue by changing the locking strategy in #41451.
However, that change is not part of 6.x and the change here is
a minimal workaround to prevent the potential of deadlock.

This change will no longer retry failed bulk requests that go
through the BulkProcessor for Watcher. Specifically this removes
the retry logic when adding Watcher history and Triggered watches
when the Bulk request failed.

Related #47599
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Oct 30, 2019
Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes elastic#47599

Note - elastic#41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
jakelandis added a commit that referenced this pull request Oct 31, 2019
Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes #47599

Note - #41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Nov 11, 2019
Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes elastic#47599

Note - elastic#41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Nov 11, 2019
Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes elastic#47599

Note - elastic#41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Nov 11, 2019
Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes elastic#47599

Note - elastic#41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
jakelandis added a commit that referenced this pull request Nov 11, 2019
Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes #47599

Note - #41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
jakelandis added a commit that referenced this pull request Nov 11, 2019
Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes #47599

Note - #41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
jakelandis added a commit that referenced this pull request Nov 12, 2019
* Prevent deadlock by using separate schedulers (#48697)

Currently the BulkProcessor class uses a single scheduler to schedule
flushes and retries. Functionally these are very different concerns but
can result in a dead lock. Specifically, the single shared scheduler
can kick off a flush task, which only finishes it's task when the bulk
that is being flushed finishes. If (for what ever reason), any items in
that bulk fails it will (by default) schedule a retry. However, that retry
will never run it's task, since the flush task is consuming the 1 and
only thread available from the shared scheduler.

Since the BulkProcessor is mostly client based code, the client can
provide their own scheduler. As-is the scheduler would require
at minimum 2 worker threads to avoid the potential deadlock. Since the
number of threads is a configuration option in the scheduler, the code
can not enforce this 2 worker rule until runtime. For this reason this
commit splits the single task scheduler into 2 schedulers. This eliminates
the potential for the flush task to block the retry task and removes this
deadlock scenario.

This commit also deprecates the Java APIs that presume a single scheduler,
and updates any internal code to no longer use those APIs.

Fixes #47599

Note - #41451 fixed the general case where a bulk fails and is retried
that can result in a deadlock. This fix should address that case as well as
the case when a bulk failure *from the flush* needs to be retried.
@bruckner
Copy link

bruckner commented Jul 3, 2020

Is there any chance this fix will be backported to 6.8? Seems on par with the later #48697 which was backported.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants