Skip to content

Commit

Permalink
Merge branch 'main' into MavenTagRemoveSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreKurait authored Apr 9, 2024
2 parents 5f065ee + 8c13003 commit 9274ea2
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ broker_endpoints="${MIGRATION_KAFKA_BROKER_ENDPOINTS}"
msk_auth_settings=""
kafka_command_settings=""
s3_bucket_name=""
partition_offsets=""
partition_limits=""
if [ -n "$ECS_AGENT_URI" ]; then
msk_auth_settings="--kafka-traffic-enable-msk-auth"
kafka_command_settings="--command-config aws/msk-iam-auth.properties"
Expand All @@ -25,7 +27,9 @@ usage() {
echo "Options:"
echo " --timeout-seconds Timeout for how long process will try to collect the Kafka records. Default is 60 seconds."
echo " --enable-s3 Option to store created archive on S3."
echo " --s3-bucket-name Option to specify a given S3 bucket to store archive on".
echo " --s3-bucket-name Option to specify a given S3 bucket to store archive on."
echo " --partition-offsets Option to specify partition offsets in the format 'partition_id:offset,partition_id:offset'. Behavior defaults to using first offset in partition."
echo " --partition-limits Option to specify number of records to print per partition in the format 'partition_id:num_records,partition_id:num_records'."
echo ""
exit 1
}
Expand All @@ -47,6 +51,16 @@ while [[ $# -gt 0 ]]; do
shift
shift
;;
--partition-offsets)
partition_offsets="$2"
shift
shift
;;
--partition-limits)
partition_limits="$2"
shift
shift
;;
-h|--h|--help)
usage
;;
Expand All @@ -60,26 +74,49 @@ while [[ $# -gt 0 ]]; do
esac
done

partition_offsets=$(./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$broker_endpoints" --topic "$topic" --time -1 $(echo "$kafka_command_settings"))
comma_sep_partition_offsets=$(echo $partition_offsets | sed 's/ /,/g')
echo "Collected offsets from current Kafka topic: "
echo $comma_sep_partition_offsets
if [ -n "$partition_offsets" ]; then
# Prepend the topic name to each partition offset
partition_offsets_with_topic=$(echo "$partition_offsets" | awk -v topic="$topic" 'BEGIN{RS=",";ORS=","}{print topic ":" $0}' | sed 's/,$//')
else
partition_offsets_with_topic=""
fi

if [ -n "$partition_limits" ]; then
# Prepend the topic name to each partition limit
partition_limits_with_topic=$(echo "$partition_limits" | awk -v topic="$topic" 'BEGIN{RS=",";ORS=","}{print topic ":" $0}' | sed 's/,$//')
else
partition_limits_with_topic=""
fi

# Printing existing offsets in topic
all_consumers_partition_offsets=$(./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$broker_endpoints" --topic "$topic" --time -1 $(echo "$kafka_command_settings"))
comma_sep_all_consumers_partition_offsets="${all_consumers_partition_offsets// /,}"
echo "Existing offsets from current Kafka topic across all consumer groups: "
echo "$comma_sep_all_consumers_partition_offsets"

epoch_ts=$(date +%s)
dir_name="kafka_export_$epoch_ts"
mkdir -p $dir_name
archive_name="kafka_export_from_migration_console_$epoch_ts.tar.gz"
archive_name="kafka_export_from_migration_console_$epoch_ts.proto.gz"
group="exportFromMigrationConsole_$(hostname -s)_$$_$epoch_ts"
echo "Group name: $group"

# Construct the command dynamically
runJavaCmd="./runJavaWithClasspath.sh org.opensearch.migrations.replay.KafkaPrinter --kafka-traffic-brokers \"$broker_endpoints\" --kafka-traffic-topic \"$topic\" --kafka-traffic-group-id \"$group\" $msk_auth_settings --timeout-seconds \"$timeout_seconds\""

if [ -n "$partition_offsets_with_topic" ]; then
runJavaCmd+=" --partition-offsets \"$partition_offsets_with_topic\""
fi

if [ -n "$partition_limits_with_topic" ]; then
runJavaCmd+=" --partition-limits \"$partition_limits_with_topic\""
fi

# Execute the command
set -o xtrace
./runJavaWithClasspath.sh org.opensearch.migrations.replay.KafkaPrinter --kafka-traffic-brokers "$broker_endpoints" --kafka-traffic-topic "$topic" --kafka-traffic-group-id "$group" $(echo "$msk_auth_settings") --timeout-seconds "$timeout_seconds" --partition-limits "$comma_sep_partition_offsets" --output-directory "./$dir_name"
eval "$runJavaCmd" | gzip -c -9 > "$dir_name/$archive_name"
set +o xtrace

cd $dir_name
tar -czvf "$archive_name" *.proto && rm *.proto
cd ..

# Remove created consumer group
./kafka/bin/kafka-consumer-groups.sh --bootstrap-server "$broker_endpoints" --timeout 100000 --delete --group "$group" $(echo "$kafka_command_settings")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
Expand All @@ -21,7 +22,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -106,6 +106,10 @@ static class Parameters {
arity=0,
description = "Creates a single output file with output from all partitions combined. Requires '--output-directory' to be specified.")
boolean combinePartitionOutput;
@Parameter(required = false,
names = {"--partition-offsets"},
description = "Partition offsets to start consuming from. Defaults to first offset in partition. Format: 'topic_name:partition_id:offset,topic_name:partition_id:offset'")
List<String> partitionOffsets = new ArrayList<>();

}

Expand Down Expand Up @@ -142,7 +146,6 @@ public static void main(String[] args) throws FileNotFoundException {
Properties properties = new Properties();
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

if (params.kafkaTrafficPropertyFile != null) {
try (InputStream input = new FileInputStream(params.kafkaTrafficPropertyFile)) {
Expand Down Expand Up @@ -177,6 +180,19 @@ public static void main(String[] args) throws FileNotFoundException {
}
}

Map<TopicPartition, Long> startingOffsets = new HashMap<>();
if (!params.partitionOffsets.isEmpty()) {
for (String partitionOffset : params.partitionOffsets) {
String[] elements = partitionOffset.split(":");
if (elements.length != 3) {
throw new ParameterException("Partition offset provided does not match the expected format: topic_name:partition_id:offset, actual value: " + partitionOffset);
}
TopicPartition partition = new TopicPartition(elements[0], Integer.parseInt(elements[1]));
long offset = Long.parseLong(elements[2]);
startingOffsets.put(partition, offset);
}
}

String baseOutputPath = params.outputDirectoryPath == null ? "./" : params.outputDirectoryPath;
baseOutputPath = !baseOutputPath.endsWith("/") ? baseOutputPath + "/" : baseOutputPath;
String uuid = UUID.randomUUID().toString();
Expand All @@ -203,7 +219,42 @@ public static void main(String[] args) throws FileNotFoundException {
}

try (KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singleton(topic));
consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() {
private final Set<TopicPartition> partitionsAssignedAtSomeTime = new HashSet<>();
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Partitions Assigned: {}", partitions);

// Seek partitions assigned for the first time to the beginning
var partitionsAssignedFirstTime = new HashSet<>(partitions);
partitionsAssignedFirstTime.retainAll(partitionsAssignedAtSomeTime);
consumer.seekToBeginning(partitionsAssignedFirstTime);
partitionsAssignedAtSomeTime.addAll(partitionsAssignedFirstTime);

// Seek partitions to provided offset if current reader is earlier
partitions.forEach(partition -> {
Long offset = startingOffsets.get(partition);
var currentOffset = consumer.position(partition);
if (offset == null) {
log.info("Did not find specified startingOffset for partition {}", partition);
}
else if (currentOffset < offset) {
consumer.seek(partition, offset);
log.info("Found a specified startingOffset for partition {} that is greater than "
+ "current offset {}. Seeking to {}", partition, currentOffset, offset);
} else {
log.info("Not changing fetch offsets because current offset is {} and startingOffset is {} "
+ "for partition {}",
currentOffset, offset, partition);

}
});
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
});
pipeRecordsToProtoBufDelimited(consumer, getDelimitedProtoBufOutputter(capturedRecords, partitionOutputStreams, separatePartitionOutputs),
params.timeoutSeconds, capturedRecords);
} catch (WakeupException e) {
Expand Down
6 changes: 3 additions & 3 deletions jenkins/release.jenkinsFile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pipeline {
)
String assetUrl = null
def parsedJson = readJSON text: assets
def assetName = arguments.downloadReleaseAssetName ?: 'artifacts.tar.gz'
parsedJson.each { item ->
if(item.name == 'artifacts.tar.gz') {
echo "Downloading artifacts from ${item.url}"
Expand Down Expand Up @@ -80,7 +79,8 @@ pipeline {

publishToMaven(
signingArtifactsPath: "$WORKSPACE/repository/",
mavenArtifactsPath: "$WORKSPACE/repository/"
mavenArtifactsPath: "$WORKSPACE/repository/",
autoPublish: true
)
}
}
Expand All @@ -103,4 +103,4 @@ pipeline {
}
}
}


0 comments on commit 9274ea2

Please sign in to comment.