Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Add ability to export Kafka records #488

Merged
merged 8 commits into from
Feb 13, 2024

Conversation

lewijacn
Copy link
Collaborator

@lewijacn lewijacn commented Jan 18, 2024

Description

Introduces a kafkaExport.sh script onto the migration console for exporting all initially detected records for a given topic and n number of partitions to a gzip archive file. This uses the packaged Kafka console scripts to detect all partitions for a topic and the given number of records in each partition. Then feeds this info to the KafkaPrinter which has been updated to allow limits for partitions, and will complete when all limits have been reached.

image

Also included was the ability to export this archive to S3, mainly for the AWS use case but applicable to a docker case with the proper AWS credentials. To accommodate this, an S3 bucket for migration-artifacts is now created in the CDK deployment and linked to the migration console so that needed permissions are available for exports. This S3 bucket should also be useful for any Replayer log/tuple exports or other use cases in the future.

image

Minor additional changes:

  • Added ability to specify a different docker build context besides the directory of the Dockerfile for CDK ECS services. This is not currently in use by any of the ECS services, but was in use with testing having a build context at the root of the project.
  • Increased OSB to 1.2 which alleviated a conflict between awscli and osb previously encountered on the migration console.

Issues Resolved

https://opensearch.atlassian.net/browse/MIGRATIONS-1458
https://opensearch.atlassian.net/browse/MIGRATIONS-1459

Testing

Local/Cloud testing and unit testing for KafkaPrinter

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link

codecov bot commented Jan 18, 2024

Codecov Report

Attention: 54 lines in your changes are missing coverage. Please review.

Comparison is base (ffbb46e) 76.06% compared to head (9a2b755) 75.49%.

Files Patch % Lines
...org/opensearch/migrations/replay/KafkaPrinter.java 34.93% 52 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #488      +/-   ##
============================================
- Coverage     76.06%   75.49%   -0.57%     
- Complexity     1356     1360       +4     
============================================
  Files           158      158              
  Lines          6011     6085      +74     
  Branches        509      530      +21     
============================================
+ Hits           4572     4594      +22     
- Misses         1088     1138      +50     
- Partials        351      353       +2     
Flag Coverage Δ
unittests 75.49% <34.93%> (-0.57%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@lewijacn lewijacn marked this pull request as ready for review January 19, 2024 21:47
Copy link
Collaborator

@gregschohn gregschohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a number of changes that I'd like to see to this PR, but the risk and tech-debt being introduced in the grand scheme of the codebase is minor, so I'll defer to your judgment about what should or needs to be addressed now and what can wait.

Since we don't know what the long-term needs are for using kafka topics exported in the format supported here, I'm fine with a more incremental approach. Do we have any tests on the replayer to confirm that it works (well) with files of this format? We DO have tests that integrate kafka and the replayer, but I cannot recall any for delimited stdio streams.

Comment on lines +29 to +33
if (projectName == "migrationConsole") {
def destDir = "src/main/docker/${projectName}/build/jars"
CommonUtils.copyArtifact(project, "trafficReplayer", projectName, destDir)
dependsOn "copyArtifact_${projectName}"
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now, but we should look into a way to do this through extension or dependency injection rather than special casing.

pip3 install urllib3==1.25.11 opensearch-benchmark==1.1.0 awscurl tqdm
# TODO upon the next release of opensearch-benchmark the awscli package should be installed by pip3, with the expected boto3 version upgrade resolving the current conflicts between opensearch-benchmark and awscli
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && unzip awscliv2.zip && ./aws/install && rm -rf aws awscliv2.zip
pip3 install urllib3 opensearch-benchmark==1.2.0 awscurl tqdm awscli
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THANK YOU! Now this will work on arm cpus too!

project.task("copyArtifact_${destProjectName}", type: Copy) {
dependsOn ":${artifactProjectName}:build"
dependsOn ":${artifactProjectName}:jar"
if (destProjectName == "trafficCaptureProxyServerTest") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like it should be a special case. We should just kill the test project altogether.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just remove this if block, or are you saying the entire trafficCaptureProxyServerTest module should be removed? Not sure what our future plans are for it or if we use it today

# Add Traffic Replayer jars for running KafkaPrinter from this container
COPY build/jars /root/jars
RUN printf "#!/bin/sh\njava -cp `echo /root/jars/*.jar | tr \ :` \"\$@\" " > /root/runJavaWithClasspath.sh
RUN chmod +x /root/runJavaWithClasspath.sh
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for later (and again from code predating these changes), we should use the gradle-provided wrappers instead of making our own.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed 👍

group="export_$epoch_ts"

set -o xtrace
./runJavaWithClasspath.sh org.opensearch.migrations.replay.KafkaPrinter --kafka-traffic-brokers "$broker_endpoints" --kafka-traffic-topic "$topic" --kafka-traffic-group-id "$group" $(echo "$msk_auth_settings") --timeout-seconds "$timeout_seconds" --partition-limit "$comma_sep_partition_offsets" >> "$file_name"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you run 'echo' rather than just use the msk_auth_settings variable directly?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a partition limit? I can understand if we have 100 partitions across 10TB that you don't want to try to pull all of that in a single process to a single file, but there could be a default with a warning for those doing quick and dirty tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first question: I had to use echo to handle some wonkiness with wanting to add an argument name versus just a value e.g. --kafka-traffic-enable-msk-auth

Second question: Maybe partition limit is a bit confusing here. This is more or less just a means to tell our KafkaPrinter when to stop otherwise we would continue to listen indefinitely. The user does not have the option to specify these currently with the script and by default we try to get as many records as are detected at the start of the script.

Comment on lines +59 to +61
comma_sep_partition_offsets=$(echo $partition_offsets | sed 's/ /,/g')
echo "Collected offsets from current Kafka topic: "
echo $comma_sep_partition_offsets
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can only be one line, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this becomes a line of topic:partition:num_records topic:partition:num_records ...

./runJavaWithClasspath.sh org.opensearch.migrations.replay.KafkaPrinter --kafka-traffic-brokers "$broker_endpoints" --kafka-traffic-topic "$topic" --kafka-traffic-group-id "$group" $(echo "$msk_auth_settings") --timeout-seconds "$timeout_seconds" --partition-limit "$comma_sep_partition_offsets" >> "$file_name"
set +o xtrace

tar -czvf "$archive_name" "$file_name" && rm "$file_name"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume that you're keeping open the option to have multiple files in one tar.gz file? That doesn't seem unreasonable to require clients to deal with that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think that you'll want to be doing this for multiple partitions. If we have a topic with 2 (or 200) partitions, we don't want to fold all of those into a single file, losing the partition assignments. We can always glue them back together later. While it's true that we can repartition as we'd like, maybe using the original scheme - or maybe not - it feels like it would be more scalable and easier to manage if we didn't do merges of partitions within this script.

I'm fine with this staying as-is for now, but we'll need to start thinking about how we're partitioning as we horizontally scale replayers.

epoch_ts=$(date +%s)
file_name="kafka_export_$epoch_ts.proto"
archive_name="$file_name.tar.gz"
group="export_$epoch_ts"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a bit more uniqueness. Can you use a random value in here too or the hostname and pid too?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make export a bit less generic. I know that this is a simple utility, but somebody could mistake this group for something else. You could make this something like exportFromMigrationConsole_.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made this more unique now

var records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT);
binaryReceiver.accept(StreamSupport.stream(records.spliterator(), false)
.map(ConsumerRecord::value));
binaryReceiver.accept(StreamSupport.stream(records.spliterator(), false));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to flatten records from across different partitions into just one stream? See my comments about preserving partition boundaries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added default to preserve partition boundaries and create separate files

private static final Logger log = LoggerFactory.getLogger(KafkaPrinter.class);
public static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofSeconds(1);

static class Partition {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already a tuple class for topics and partitions in org.apache.kafka.common (TopicPartition)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks was unaware, have adjusted to use now 👍

@lewijacn lewijacn requested a review from AndreKurait as a code owner February 9, 2024 20:15
Copy link
Collaborator

@gregschohn gregschohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some things that would be nice to change, but they aren't critical; especially given the support nature of this code.

Comment on lines +187 to +188
OutputStream os = params.outputDirectoryPath == null ? System.out : new FileOutputStream(String.format("%s%s_%s_%s.proto", baseOutputPath, params.kafkaTrafficTopic, "all", uuid));
partitionOutputStreams.put(0, CodedOutputStream.newInstance(os));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block can roll into the next block - excise and make the first if statement include captureRecords.isEmpty() ||...

set +o xtrace

tar -czvf "$archive_name" "$file_name" && rm "$file_name"
cd $dir_name
tar -czvf "$archive_name" *.proto && rm *.proto
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine doing this later, but this whole script would be much faster and easier to manage if you wrote compressed streams first, then tarred them together. As it is, you'll have a huge number of bytes going to disk (which might be remote) and back. Since the java program uses multiple files, you'd have to manage it within the java program (with a GZipInputStream).

static java.util.function.Consumer<Stream<ConsumerRecord<String, byte[]>>> getDelimitedProtoBufOutputter(OutputStream outputStream, Map<Partition, PartitionTracker> capturedRecords) {
CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(outputStream);
static java.util.function.Consumer<Stream<ConsumerRecord<String, byte[]>>> getDelimitedProtoBufOutputter(Map<TopicPartition, PartitionTracker> capturedRecords,
Map<Integer, CodedOutputStream> partitionOutputStreams, boolean separatePartitionOutputs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have asked before, but why are you using a CodedOutputStream (full disclosure, I wrote it with a CodedOutputStream) here instead of writeDelimitedTo? That would let you pass in a compressed output stream and you wouldn't need to worry about the uncompressed buffering problem.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of using this writeDelimitedTo function but it seems to be on the Message interface, which for us it seems we would have to take the byte[] and have our TrafficStream protobuf object parse it, then we could use writeDelimitedTo. I wasn't sure after these requirements if there would be much benefit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh - good point. The current code is probably more efficient, so until we have a reason to, there's probably not a good reason to make the change now.

@lewijacn lewijacn merged commit 8c6fa53 into opensearch-project:main Feb 13, 2024
5 of 7 checks passed
gregschohn pushed a commit to gregschohn/opensearch-migrations that referenced this pull request Feb 20, 2024
* MIGRATIONS-1459: Add ability to export Kafka records

Signed-off-by: Tanner Lewis <[email protected]>
@lewijacn lewijacn deleted the allow-kafka-export branch March 29, 2024 17:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants