-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix a race condition between ShardConsumer shutdown and initialization #1319
Merged
Merged
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
651bf4d
Fix a race condition between ShardConsumer shutdown and initialization
akidambisrinivasan bde5ae9
Let healthchecks happen after initialization is complete
akidambisrinivasan 940f93b
Let healthchecks happen after initialization is complete
akidambisrinivasan da1ff05
Address review comments
akidambisrinivasan 78e3b27
Add more logs to the test
akidambisrinivasan 6a8e248
Address review comments
akidambisrinivasan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,9 @@ | |
import static org.mockito.Mockito.doThrow; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.never; | ||
import static org.mockito.Mockito.reset; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.timeout; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.verifyNoMoreInteractions; | ||
|
@@ -45,6 +47,7 @@ | |
import java.util.Optional; | ||
import java.util.concurrent.BrokenBarrierException; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.CyclicBarrier; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
|
@@ -53,16 +56,20 @@ | |
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Function; | ||
|
||
import org.awaitility.Awaitility; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.Ignore; | ||
import org.junit.Rule; | ||
import org.junit.Test; | ||
import org.junit.rules.TestName; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.ArgumentCaptor; | ||
import org.mockito.Mock; | ||
import org.mockito.MockitoAnnotations; | ||
import org.mockito.invocation.InvocationOnMock; | ||
import org.mockito.runners.MockitoJUnitRunner; | ||
import org.reactivestreams.Subscriber; | ||
|
@@ -148,6 +155,7 @@ public class ShardConsumerTest { | |
|
||
@Before | ||
public void before() { | ||
MockitoAnnotations.initMocks(this); | ||
shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); | ||
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-" + testName.getMethodName() + "-%04d") | ||
.setDaemon(true).build(); | ||
|
@@ -848,6 +856,114 @@ public void testLongRunningTasks() throws Exception { | |
verifyNoMoreInteractions(taskExecutionListener); | ||
} | ||
|
||
@Test | ||
public void testEmptyShardProcessingRaceCondition() throws Exception { | ||
RecordsPublisher mockPublisher = mock(RecordsPublisher.class); | ||
ExecutorService mockExecutor = mock(ExecutorService.class); | ||
ConsumerState mockState = mock(ConsumerState.class); | ||
ShardConsumer consumer = new ShardConsumer(mockPublisher, mockExecutor, shardInfo, Optional.of(1L), | ||
shardConsumerArgument, mockState, Function.identity(), 1, taskExecutionListener, 0); | ||
|
||
when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS); | ||
when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS); | ||
ConsumerTask mockTask = mock(ConsumerTask.class); | ||
when(mockState.createTask(any(), any(), any())).thenReturn(mockTask); | ||
// Simulate successful BlockedOnParent task execution | ||
// and successful Initialize task execution | ||
when(mockTask.call()).thenReturn(new TaskResult(false)); | ||
|
||
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to initiate async" + | ||
" processing of blocked on parent task"); | ||
consumer.executeLifecycle(); | ||
ArgumentCaptor<Runnable> taskToExecute = ArgumentCaptor.forClass(Runnable.class); | ||
verify(mockExecutor, timeout(100)).execute(taskToExecute.capture()); | ||
taskToExecute.getValue().run(); | ||
log.info("RecordProcessor Thread: Simulated successful execution of Blocked on parent task"); | ||
reset(mockExecutor); | ||
|
||
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to move to InitializingState" + | ||
" and initiate async processing of initialize task"); | ||
Comment on lines
+883
to
+884
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need logs in unit tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes its a complex unit test, the logs help for someone to follow along. |
||
when(mockState.successTransition()).thenReturn(mockState); | ||
when(mockState.state()).thenReturn(ShardConsumerState.INITIALIZING); | ||
when(mockState.taskType()).thenReturn(TaskType.INITIALIZE); | ||
consumer.executeLifecycle(); | ||
verify(mockExecutor, timeout(100)).execute(taskToExecute.capture()); | ||
log.info("RecordProcessor Thread: Simulated successful execution of Initialize task"); | ||
taskToExecute.getValue().run(); | ||
|
||
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to move to ProcessingState" + | ||
" and mark initialization future as complete"); | ||
when(mockState.state()).thenReturn(ShardConsumerState.PROCESSING); | ||
consumer.executeLifecycle(); | ||
|
||
// Simulate the race where | ||
// scheduler invokes executeLifecycle which performs Publisher.subscribe(subscriber) | ||
// on recordProcessor thread | ||
// but before scheduler thread finishes initialization, handleInput is invoked | ||
// on record processor thread. | ||
|
||
// Since ShardConsumer creates its own instance of subscriber that cannot be mocked | ||
// this test sequence will appear a little odd. | ||
// In order to control the order in which execution occurs, lets first invoke | ||
// handleInput, although this will never happen, since there isn't a way | ||
// to control the precise timing of the thread execution, this is the best way | ||
CountDownLatch processTaskLatch = new CountDownLatch(1); | ||
new Thread(() -> { | ||
reset(mockState); | ||
when(mockState.taskType()).thenReturn(TaskType.PROCESS); | ||
ConsumerTask mockProcessTask = mock(ConsumerTask.class); | ||
when(mockState.createTask(any(), any(), any())).thenReturn(mockProcessTask); | ||
when(mockProcessTask.call()).then(input -> { | ||
// first we want to wait for subscribe to be called, | ||
// but we cannot control the timing, so wait for 10 seconds | ||
// to let the main thread invoke executeLifecyle which | ||
// will perform subscribe | ||
processTaskLatch.countDown(); | ||
log.info("Record Processor Thread: Holding shardConsumer lock, waiting for 10 seconds to" + | ||
" let subscribe be called by scheduler thread"); | ||
Thread.sleep(10 * 1000); | ||
log.info("RecordProcessor Thread: Done waiting"); | ||
// then return shard end result | ||
log.info("RecordProcessor Thread: Simulating execution of ProcessTask and returning shard-end result"); | ||
return new TaskResult(true); | ||
}); | ||
Subscription mockSubscription = mock(Subscription.class); | ||
consumer.handleInput(ProcessRecordsInput.builder().isAtShardEnd(true).build(), mockSubscription); | ||
}).start(); | ||
|
||
processTaskLatch.await(); | ||
|
||
// invoke executeLifecycle, which should invoke subscribe | ||
// meanwhile if scheduler tries to acquire the ShardConsumer lock it will | ||
// be blocked during initialization processing because handleInput was | ||
// already invoked and will be holding the lock. Thereby creating the | ||
// race condition we want. | ||
reset(mockState); | ||
AtomicBoolean successTransitionCalled = new AtomicBoolean(false); | ||
when(mockState.successTransition()).then(input -> { | ||
successTransitionCalled.set(true); | ||
return mockState; | ||
}); | ||
AtomicBoolean shutdownTransitionCalled = new AtomicBoolean(false); | ||
when(mockState.shutdownTransition(any())).then(input -> { | ||
shutdownTransitionCalled.set(true); | ||
return mockState; | ||
}); | ||
when(mockState.state()).then(input -> { | ||
if (successTransitionCalled.get() && shutdownTransitionCalled.get()) { | ||
return ShardConsumerState.SHUTTING_DOWN; | ||
} | ||
return ShardConsumerState.PROCESSING; | ||
}); | ||
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to invoke subscribe and" + | ||
" complete initialization"); | ||
consumer.executeLifecycle(); | ||
// initialization should be done by now, make sure shard consumer did not | ||
// perform shutdown processing yet. | ||
log.info("Verifying scheduler did not perform shutdown transition during initialization"); | ||
verify(mockState, times(0)).shutdownTransition(any()); | ||
} | ||
|
||
private void mockSuccessfulShutdown(CyclicBarrier taskCallBarrier) { | ||
mockSuccessfulShutdown(taskCallBarrier, null); | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np : make all variable finals ?