diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh index a788de9631..70a0a50326 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh @@ -4,6 +4,8 @@ broker_endpoints="${MIGRATION_KAFKA_BROKER_ENDPOINTS}" msk_auth_settings="" kafka_command_settings="" s3_bucket_name="" +partition_offsets="" +partition_limits="" if [ -n "$ECS_AGENT_URI" ]; then msk_auth_settings="--kafka-traffic-enable-msk-auth" kafka_command_settings="--command-config aws/msk-iam-auth.properties" @@ -25,7 +27,9 @@ usage() { echo "Options:" echo " --timeout-seconds Timeout for how long process will try to collect the Kafka records. Default is 60 seconds." echo " --enable-s3 Option to store created archive on S3." - echo " --s3-bucket-name Option to specify a given S3 bucket to store archive on". + echo " --s3-bucket-name Option to specify a given S3 bucket to store archive on." + echo " --partition-offsets Option to specify partition offsets in the format 'partition_id:offset,partition_id:offset'. Behavior defaults to using first offset in partition." + echo " --partition-limits Option to specify number of records to print per partition in the format 'partition_id:num_records,partition_id:num_records'." echo "" exit 1 } @@ -47,6 +51,16 @@ while [[ $# -gt 0 ]]; do shift shift ;; + --partition-offsets) + partition_offsets="$2" + shift + shift + ;; + --partition-limits) + partition_limits="$2" + shift + shift + ;; -h|--h|--help) usage ;; @@ -60,26 +74,49 @@ while [[ $# -gt 0 ]]; do esac done -partition_offsets=$(./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$broker_endpoints" --topic "$topic" --time -1 $(echo "$kafka_command_settings")) -comma_sep_partition_offsets=$(echo $partition_offsets | sed 's/ /,/g') -echo "Collected offsets from current Kafka topic: " -echo $comma_sep_partition_offsets +if [ -n "$partition_offsets" ]; then + # Prepend the topic name to each partition offset + partition_offsets_with_topic=$(echo "$partition_offsets" | awk -v topic="$topic" 'BEGIN{RS=",";ORS=","}{print topic ":" $0}' | sed 's/,$//') +else + partition_offsets_with_topic="" +fi + +if [ -n "$partition_limits" ]; then + # Prepend the topic name to each partition limit + partition_limits_with_topic=$(echo "$partition_limits" | awk -v topic="$topic" 'BEGIN{RS=",";ORS=","}{print topic ":" $0}' | sed 's/,$//') +else + partition_limits_with_topic="" +fi + +# Printing existing offsets in topic +all_consumers_partition_offsets=$(./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$broker_endpoints" --topic "$topic" --time -1 $(echo "$kafka_command_settings")) +comma_sep_all_consumers_partition_offsets="${all_consumers_partition_offsets// /,}" +echo "Existing offsets from current Kafka topic across all consumer groups: " +echo "$comma_sep_all_consumers_partition_offsets" epoch_ts=$(date +%s) dir_name="kafka_export_$epoch_ts" mkdir -p $dir_name -archive_name="kafka_export_from_migration_console_$epoch_ts.tar.gz" +archive_name="kafka_export_from_migration_console_$epoch_ts.proto.gz" group="exportFromMigrationConsole_$(hostname -s)_$$_$epoch_ts" echo "Group name: $group" +# Construct the command dynamically +runJavaCmd="./runJavaWithClasspath.sh org.opensearch.migrations.replay.KafkaPrinter --kafka-traffic-brokers \"$broker_endpoints\" --kafka-traffic-topic \"$topic\" --kafka-traffic-group-id \"$group\" $msk_auth_settings --timeout-seconds \"$timeout_seconds\"" + +if [ -n "$partition_offsets_with_topic" ]; then + runJavaCmd+=" --partition-offsets \"$partition_offsets_with_topic\"" +fi + +if [ -n "$partition_limits_with_topic" ]; then + runJavaCmd+=" --partition-limits \"$partition_limits_with_topic\"" +fi + +# Execute the command set -o xtrace -./runJavaWithClasspath.sh org.opensearch.migrations.replay.KafkaPrinter --kafka-traffic-brokers "$broker_endpoints" --kafka-traffic-topic "$topic" --kafka-traffic-group-id "$group" $(echo "$msk_auth_settings") --timeout-seconds "$timeout_seconds" --partition-limits "$comma_sep_partition_offsets" --output-directory "./$dir_name" +eval "$runJavaCmd" | gzip -c -9 > "$dir_name/$archive_name" set +o xtrace -cd $dir_name -tar -czvf "$archive_name" *.proto && rm *.proto -cd .. - # Remove created consumer group ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server "$broker_endpoints" --timeout 100000 --delete --group "$group" $(echo "$kafka_command_settings") diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java index 848e363c05..044020bafc 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java @@ -8,6 +8,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -21,7 +22,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -106,6 +106,10 @@ static class Parameters { arity=0, description = "Creates a single output file with output from all partitions combined. Requires '--output-directory' to be specified.") boolean combinePartitionOutput; + @Parameter(required = false, + names = {"--partition-offsets"}, + description = "Partition offsets to start consuming from. Defaults to first offset in partition. Format: 'topic_name:partition_id:offset,topic_name:partition_id:offset'") + List partitionOffsets = new ArrayList<>(); } @@ -142,7 +146,6 @@ public static void main(String[] args) throws FileNotFoundException { Properties properties = new Properties(); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); if (params.kafkaTrafficPropertyFile != null) { try (InputStream input = new FileInputStream(params.kafkaTrafficPropertyFile)) { @@ -177,6 +180,19 @@ public static void main(String[] args) throws FileNotFoundException { } } + Map startingOffsets = new HashMap<>(); + if (!params.partitionOffsets.isEmpty()) { + for (String partitionOffset : params.partitionOffsets) { + String[] elements = partitionOffset.split(":"); + if (elements.length != 3) { + throw new ParameterException("Partition offset provided does not match the expected format: topic_name:partition_id:offset, actual value: " + partitionOffset); + } + TopicPartition partition = new TopicPartition(elements[0], Integer.parseInt(elements[1])); + long offset = Long.parseLong(elements[2]); + startingOffsets.put(partition, offset); + } + } + String baseOutputPath = params.outputDirectoryPath == null ? "./" : params.outputDirectoryPath; baseOutputPath = !baseOutputPath.endsWith("/") ? baseOutputPath + "/" : baseOutputPath; String uuid = UUID.randomUUID().toString(); @@ -203,7 +219,42 @@ public static void main(String[] args) throws FileNotFoundException { } try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) { - consumer.subscribe(Collections.singleton(topic)); + consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() { + private final Set partitionsAssignedAtSomeTime = new HashSet<>(); + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("Partitions Assigned: {}", partitions); + + // Seek partitions assigned for the first time to the beginning + var partitionsAssignedFirstTime = new HashSet<>(partitions); + partitionsAssignedFirstTime.retainAll(partitionsAssignedAtSomeTime); + consumer.seekToBeginning(partitionsAssignedFirstTime); + partitionsAssignedAtSomeTime.addAll(partitionsAssignedFirstTime); + + // Seek partitions to provided offset if current reader is earlier + partitions.forEach(partition -> { + Long offset = startingOffsets.get(partition); + var currentOffset = consumer.position(partition); + if (offset == null) { + log.info("Did not find specified startingOffset for partition {}", partition); + } + else if (currentOffset < offset) { + consumer.seek(partition, offset); + log.info("Found a specified startingOffset for partition {} that is greater than " + + "current offset {}. Seeking to {}", partition, currentOffset, offset); + } else { + log.info("Not changing fetch offsets because current offset is {} and startingOffset is {} " + + "for partition {}", + currentOffset, offset, partition); + + } + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + } + }); pipeRecordsToProtoBufDelimited(consumer, getDelimitedProtoBufOutputter(capturedRecords, partitionOutputStreams, separatePartitionOutputs), params.timeoutSeconds, capturedRecords); } catch (WakeupException e) { diff --git a/jenkins/release.jenkinsFile b/jenkins/release.jenkinsFile index 55bb165ca8..1dea5fc4ee 100644 --- a/jenkins/release.jenkinsFile +++ b/jenkins/release.jenkinsFile @@ -50,7 +50,6 @@ pipeline { ) String assetUrl = null def parsedJson = readJSON text: assets - def assetName = arguments.downloadReleaseAssetName ?: 'artifacts.tar.gz' parsedJson.each { item -> if(item.name == 'artifacts.tar.gz') { echo "Downloading artifacts from ${item.url}" @@ -80,7 +79,8 @@ pipeline { publishToMaven( signingArtifactsPath: "$WORKSPACE/repository/", - mavenArtifactsPath: "$WORKSPACE/repository/" + mavenArtifactsPath: "$WORKSPACE/repository/", + autoPublish: true ) } } @@ -103,4 +103,4 @@ pipeline { } } } - \ No newline at end of file +