Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Table Setup race condition #390

Merged
merged 2 commits into from
Jul 12, 2022
Merged

Fix Table Setup race condition #390

merged 2 commits into from
Jul 12, 2022

Conversation

kskitek
Copy link
Contributor

@kskitek kskitek commented Jul 11, 2022

We have identified a race condition when setting up goka tables. The investigation begun with an error That topic/partition is already being consumed.
The error was checked on versions 1.1.2, 1.1.5 and 1.1.6.

  • This error is returned from sarama when a consumer tried to consume topic partition before previous partitionConsumer was removed (sarama, consumer.addChild and consumer.removeChild).
  • goka calls sarama.Consumer.ConsumePartition two times for each table
    • with offset 0 - PartitionTable.SetupAndRecover
    • with last offset - PartitionTable.CatchupForever
  • After SetupAndRecover and before CatchupForever there should be no consumers of a partition. The partition is released by calling sarama.PartitionConsumer.AsyncClose in defer of PartitionTable.load.
    • AsyncClose itself is asynchronous but in the defer, PartitionTable actually synchronizes using drainConsumer.
    • The problem is that drainConsumer waits for messages and errors channels for max 1 second - we don't have any guarantees that the partition consumer is actually closed and removed after 1 second

Reproduction:

  • patch sarama to force invalid order of execution
diff --git a/consumer.go b/consumer.go
index 46bdade..5860caa 100644
--- a/consumer.go
+++ b/consumer.go
@@ -538,7 +539,10 @@ func (child *partitionConsumer) AsyncClose() {
 	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
 	// also just close itself)
 	child.closeOnce.Do(func() {
-		close(child.dying)
+		go func() {
+			<-time.After(time.Second * 5)
+			close(child.dying)
+		}()
 	})
 }
  • run tests
    • origin of this error is in real life nobl9 application - this is not just theoretical error

Test results:

TestRebalance
=== CONT  TestRebalance
    processor_test.go:367: 
        	Error Trace:	processor_test.go:367
        	Error:      	Received unexpected error:
        	           	10 errors occurred:
        	           		* Error closing consumer group: 5 errors occurred:
        	           		* Error creating partition consumer for topic goka-systemtest-rebalance-1657547130-table, partition 5, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
        	           		* Error creating partition consumer for topic goka-systemtest-rebalance-1657547130-table, partition 4, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
        	           		* Error creating partition consumer for topic goka-systemtest-rebalance-1657547130-table, partition 19, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
        	           		* Error creating partition consumer for topic goka-systemtest-rebalance-1657547130-table, partition 7, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
        	           		* Error creating partition consumer for topic goka-systemtest-rebalance-1657547130-table, partition 6, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)

    ...

I think TestView_Reconnect fails only due to too low timeout in tests

=== RUN   TestView_Reconnect
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-3] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/3: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-9] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/9: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-7] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/7: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-0] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/0: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-7] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-0] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-5] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/5: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-5] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-4] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/4: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-4] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-2] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/2: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-2] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-3] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-9] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-8] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/8: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-8] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-1] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/1: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-6] Error while starting up: 1 error occurred:
	* kafka: error while consuming goka_systemtest_view_reconnect_test-1657547119/6: unexpected EOF

2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-1] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:19 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-6] Will retry in 0 seconds (retried 1 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:5 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-5] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 5, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-5] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:2 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-2] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 2, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-2] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:3 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-3] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 3, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-3] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:9 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-9] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 9, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-9] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:4 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-4] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 4, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-4] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:6 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-6] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 6, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-6] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:1 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-1] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 1, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-1] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:8 = 6
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-8] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 8, offset 5: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-8] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:7 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-7] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 7, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-7] Will retry in 10 seconds (retried 2 times so far)
2022/07/11 15:45:20 ## ErrDoesNotCompute: goka_systemtest_view_reconnect_test-1657547119:0 = 0
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-0] Error while starting up: Error creating partition consumer for topic goka_systemtest_view_reconnect_test-1657547119, partition 0, offset -3: kafka: invalid configuration (That topic/partition is already being consumed)
2022/07/11 15:45:20 [View goka_systemtest_view_reconnect_test-1657547119 > PartTable-0] Will retry in 10 seconds (retried 2 times so far)
    utils_test.go:29: waiting for view-running timed out
--- FAIL: TestView_Reconnect (11.45s)
TestRecoverAhead

    processor_test.go:297: 
        	Error Trace:	processor_test.go:297
        	Error:      	Received unexpected error:
        	           	2 errors occurred:
        	           		* Error closing consumer group: 1 error occurred:
        	           		* Error creating partition consumer for topic goka-systemtest-recoverahead-1657547109-join, partition 0, offset 0: kafka: invalid configuration (That topic/partition is already being consumed)
        	           	
        	           	
        	           		* error consuming from group consumer: 1 error occurred:
        	           		* Error creating partition consumer for topic goka-systemtest-recoverahead-1657547109-join, partition 0, offset 0: kafka: invalid configuration (That topic/partition is already being consumed)
        	           	
        	           	
        	           	
        	Test:       	TestRecoverAhead
--- FAIL: TestRebalanceSharePartitions (10.47s)
2022/07/11 15:45:41 [Processor goka-systemtest-slow-callback-fail-1657547130] Error running/stopping partition processor 0: 1 error occurred:
	* error processing message (partition=0): panic in callback: asdf
TestProcessorSlowStuck.func1
	github.com/lovoo/goka/systemtest/processor_test.go:578

Solution

My proposal to fix the issue it to just drain the errors/messages channels until actually closed without timeout.

We are testing this change in our application and we haven't found any issues yet.
After this change tests are passing with just one change - increasing the timeout.

I might not fully understand what else might be affected

Copy link
Contributor

@frairon frairon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a million @kskitek for the fix, we have seen this issue for some time, but haven't found the time to actually investigate. The reason for the timeout on draining the consumer was merely defensive programming, so not the best idea in this case. The solution looks legit to me.
Thanks a lot again!

@mmreza79 mmreza79 merged commit 0f272ce into lovoo:master Jul 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants