Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka printer improvements #555

Merged

Conversation

AndreKurait
Copy link
Member

Description

Updates the KafkaPrinter with the ability to take in a commit offset to start printing from per partition.
Updates the KafkaExport script to be able to provide the commit offsets and record limits per partition.
Updates the KafkaExport script to directly compress the kafkaPrinter output script instead of first writing the uncompressed output to disc.

Note: Unit tests are currently lacking for the kafkaPrinter. Will focus on usability enhancements first then add testing once we know what we need.

  • Category: Enhancement
  • Why these changes are required? Improved usability of the KafkaExport script.
  • What is the old behavior before changes and new behavior after changes? KafkaExport script now can process more data with less disk space and has increased functionality with commit offsets and partition limits

Issues Resolved

https://opensearch.atlassian.net/browse/MIGRATIONS-1642

Is this a backport? If so, please add backport PR # and/or commits #

Testing

Ran the script with various configurations in an ECS container and verified correct publishing through s3

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • [ x] Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@AndreKurait AndreKurait marked this pull request as ready for review April 5, 2024 19:55
else
partition_limits_with_topic=""
fi

partition_offsets=$(./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$broker_endpoints" --topic "$topic" --time -1 $(echo "$kafka_command_settings"))
comma_sep_partition_offsets=$(echo $partition_offsets | sed 's/ /,/g')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, we're now ignoring these partition offsets in favor of partition_offsets_with_topic.
First of all, is it correct that we're now ignoring them? If so, let's remove this code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have always just been a print statement which allows a user to understand what the offset on the replayer may be, i've updated the variable names to make that more clear

echo " --s3-bucket-name Option to specify a given S3 bucket to store archive on".
echo " --s3-bucket-name Option to specify a given S3 bucket to store archive on."
echo " --partition-offsets Option to specify partition offsets in the format 'partition_id:offset,partition_id:offset'."
echo " --partition-limits Option to specify partition limits in the format 'partition_id:num_records,partition_id:num_records'."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on how partition offsets and partition limits work together? If I provide both for a given partition_id, does that mean it will print messages from offset->offset+num_records? What if I only provide one of them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I've updated the description to hopefully make that more clear.

When partition-offsets is not set for a partition, it defaults to the beginning of the partition in kafka

Copy link

codecov bot commented Apr 9, 2024

Codecov Report

Attention: Patch coverage is 0% with 32 lines in your changes are missing coverage. Please review.

Project coverage is 76.29%. Comparing base (44434ae) to head (e330325).
Report is 3 commits behind head on main.

Files Patch % Lines
...org/opensearch/migrations/replay/KafkaPrinter.java 0.00% 32 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #555      +/-   ##
============================================
- Coverage     76.54%   76.29%   -0.25%     
- Complexity     1408     1409       +1     
============================================
  Files           155      155              
  Lines          6033     6063      +30     
  Branches        543      548       +5     
============================================
+ Hits           4618     4626       +8     
- Misses         1049     1070      +21     
- Partials        366      367       +1     
Flag Coverage Δ
unittests 76.29% <0.00%> (-0.25%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@AndreKurait AndreKurait force-pushed the KafkaPrinterCommitOffset branch from 734953f to 16a6128 Compare April 9, 2024 15:03
@@ -142,7 +146,6 @@ public static void main(String[] args) throws FileNotFoundException {
Properties properties = new Properties();
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to explicitly seek to the beginning on the first time a partition has been assigned to handle case if a groupId is reused

@AndreKurait AndreKurait merged commit 8c13003 into opensearch-project:main Apr 9, 2024
5 of 7 checks passed
@AndreKurait AndreKurait deleted the KafkaPrinterCommitOffset branch April 9, 2024 21:00
@@ -25,7 +27,9 @@ usage() {
echo "Options:"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naïve question, how do we know this change works / isn't broken by a future change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants