Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix more Kafka committing errors #471

Merged
merged 7 commits into from
Dec 11, 2023

Conversation

gregschohn
Copy link
Collaborator

@gregschohn gregschohn commented Dec 8, 2023

Bugfixes include:

  • Run touch() and readNextTrafficStreamChunk() activities for the Kafka client (except for close as per this issue) on a dedicated thread, rather than using CompletableFuture.supplyAsync(), which could change the actual thread being used.
  • Putting synchronized blocks around the OffsetLifecycleTracker's add, remove, toString() calls since those will be called from the Kafka worker threads and the BlockingTrafficSource's (TrafficReplayer's main) thread.
  • TrackingKafkaConsumer.resume now calls resume instead of pause() (major issue that resulted in the consumer staying paused until restart)
  • touch() calculations were off, resulting in us falling out of the consumer group when records were in flight. Now, the test to determine if touch() should be called is based upon if a record has been returned by getNextBatchOfRecords has yet to be fully committed to Kafka.
  • Consumer generations are now bound on a partition by partition basis so that we can more selectively punt and preserve commits for no-longer-tracked vs still-tracked partitions.
  • ParsedHttpMessagesAsDicts threw an exception when trying to call array() on a ByteBuf that was a CompositeByteBuf. Now that code fills the byte array with a slower getBytes() call.
  • Commits were being called multiple times if there were multiple requests within a TrafficStream. This PR includes a test for that, but a merged PR includes the fix (to use trafficStreamKeysBeingHeld ONLY for holds and not for stream identification too)

Refactorings include

  • Splitting the gzip file input traffic stream away from the upgrades that were done for legacy streams. That makes the new super class very useful to use to reconstruct error situations that may have been found in the wild.

Description

  • Category Bug fixes
  • Why these changes are required? Every time touch() was called on the kafka traffic source, the client would stay indefinitely paused. Other issues were present that would have caused issues with guaranteeing continuous 'at-least-once' delivery.
  • What is the old behavior before changes and new behavior after changes? Replayer would hang until it was manually restarted.

Issues Resolved

Continuation of https://opensearch.atlassian.net/browse/MIGRATIONS-1379

Is this a backport? If so, please add backport PR # and/or commits #

Testing

Only gradle.

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Pushing this with the plan to add more unit tests shortly

Signed-off-by: Greg Schohn <[email protected]>
@gregschohn gregschohn marked this pull request as ready for review December 8, 2023 16:03
Copy link

codecov bot commented Dec 8, 2023

Codecov Report

Attention: 13 lines in your changes are missing coverage. Please review.

Comparison is base (9fa9ff0) 72.88% compared to head (ae2f026) 73.59%.
Report is 1 commits behind head on main.

❗ Current head ae2f026 differs from pull request most recent head 2dfecaa. Consider uploading reports for the commit 2dfecaa to get more accurate results

Files Patch % Lines
...igrations/replay/kafka/OffsetLifecycleTracker.java 73.07% 3 Missing and 4 partials ⚠️
...h/migrations/replay/ParsedHttpMessagesAsDicts.java 33.33% 3 Missing and 1 partial ⚠️
...migrations/replay/kafka/TrackingKafkaConsumer.java 95.45% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #471      +/-   ##
============================================
+ Coverage     72.88%   73.59%   +0.71%     
- Complexity     1165     1182      +17     
============================================
  Files           124      124              
  Lines          4846     4890      +44     
  Branches        436      439       +3     
============================================
+ Hits           3532     3599      +67     
+ Misses         1021      998      -23     
  Partials        293      293              
Flag Coverage Δ
unittests 73.59% <85.55%> (+0.71%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Uncovering a lot of obscure race conditions along the way.

Signed-off-by: Greg Schohn <[email protected]>
@gregschohn gregschohn marked this pull request as draft December 8, 2023 23:44
… moving all the documents over...

BUT - I'm still getting NPEs that indicate that the OffsetLifecycleTracker's pQueue is empty when I'm trying to pull a value from it.

Signed-off-by: Greg Schohn <[email protected]>
@gregschohn gregschohn changed the title Whoops - resume() was implemented as another call to pause(). Resume() was implemented as another call to pause(). Dec 10, 2023
…code, & the introduction of a failing test for double-commits.

Signed-off-by: Greg Schohn <[email protected]>
One change through the merge (deviating from both of the prior commits) - trafficStreamKeysBeingHeld becomes lazily allocated.  Some patterns may have long runs of transactions within single traffic streams and therefore won't be holding any keys at all.

Signed-off-by: Greg Schohn <[email protected]>

# Conflicts:
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java
@gregschohn gregschohn changed the title Resume() was implemented as another call to pause(). Fix more Kafka committing errors Dec 11, 2023
@gregschohn gregschohn marked this pull request as ready for review December 11, 2023 14:32
@gregschohn gregschohn changed the base branch from main to capture-and-replay-v0.1.0 December 11, 2023 17:34
@gregschohn gregschohn changed the base branch from capture-and-replay-v0.1.0 to main December 11, 2023 17:35
Comment on lines 184 to 187
metricsLogger.atSuccess(MetricsEvent.PARSED_TRAFFIC_STREAM_FROM_KAFKA)
.setAttribute(MetricsAttributeKey.CONNECTION_ID, ts.getConnectionId())
.setAttribute(MetricsAttributeKey.TOPIC_NAME, trackingKafkaConsumer.topic)
.setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, ts.getSerializedSize()).emit();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already do this is in the next four lines?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoot - yes we do. Bad merge. I've pulled one of these away

@@ -68,6 +71,7 @@ public class KafkaTrafficCaptureSource implements ISimpleTrafficCaptureSource {


final TrackingKafkaConsumer trackingKafkaConsumer;
private final ExecutorService kafkaExecutor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind me again what prompted the need for a dedicated kafkaExecutor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka consumers aren't threadsafe. The polling and touch calls are already built with an async pattern anyway, so callers might already be coming in from any number of threads (for commit/poll, not touch). Given that the kafka thread does blocking IO, that thread is going to be pretty busy. Since the callstack will be remarkably similar for all Kafka interactions, having some thread affinity seemed like a good idea to 1) make it easier to know where the kafka thread was (we should name that thread), 2) it's probably a tiny performance boost, and 3) it might create deadlock (just as the other way could have too), but we shouldn't ever get a race condition - plus, everything is more deterministic.

@gregschohn gregschohn merged commit 4ee54ef into opensearch-project:main Dec 11, 2023
@gregschohn gregschohn deleted the FixKafkaResume branch December 12, 2023 03:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants