Skip to content

Commit

Permalink
DEVX-2277: Change topic creation to use Confluent Server v3 REST API (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
javabrett authored Jan 6, 2021
1 parent a6c443e commit 851accf
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 45 deletions.
59 changes: 16 additions & 43 deletions scripts/helper/create-topics.sh
Original file line number Diff line number Diff line change
@@ -1,47 +1,20 @@
#!/bin/bash
set -euo pipefail
IFS=$'\n\t'

# Create Kafka topic users, using appSA principal
export KAFKA_LOG4J_OPTS="-Dlog4j.rootLogger=DEBUG,stdout -Dlog4j.logger.kafka=DEBUG,stdout" && kafka-topics \
--bootstrap-server kafka1:11091 \
--command-config /etc/kafka/secrets/appSA.config \
--topic users \
--create \
--replication-factor 2 \
--partitions 2 \
--config confluent.value.schema.validation=true
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
source ${DIR}/functions.sh

# Create Kafka topics with prefix wikipedia, using connectorSA principal
export KAFKA_LOG4J_OPTS="-Dlog4j.rootLogger=DEBUG,stdout -Dlog4j.logger.kafka=DEBUG,stdout" && kafka-topics \
--bootstrap-server kafka1:11091 \
--command-config /etc/kafka/secrets/connectorSA_without_interceptors_ssl.config \
--topic wikipedia.parsed \
--create \
--replication-factor 2 \
--partitions 2 \
--config confluent.value.schema.validation=true
export KAFKA_LOG4J_OPTS="-Dlog4j.rootLogger=DEBUG,stdout -Dlog4j.logger.kafka=DEBUG,stdout" && kafka-topics \
--bootstrap-server kafka1:11091 \
--command-config /etc/kafka/secrets/connectorSA_without_interceptors_ssl.config \
--topic wikipedia.parsed.count-by-domain \
--create \
--replication-factor 2 \
--partitions 2
export KAFKA_LOG4J_OPTS="-Dlog4j.rootLogger=DEBUG,stdout -Dlog4j.logger.kafka=DEBUG,stdout" && kafka-topics \
--bootstrap-server kafka1:11091 \
--command-config /etc/kafka/secrets/connectorSA_without_interceptors_ssl.config \
--topic wikipedia.failed \
--create \
--replication-factor 2 \
--partitions 2
KAFKA_CLUSTER_ID=$(get_kafka_cluster_id_from_container)
echo "KAFKA_CLUSTER_ID: ${KAFKA_CLUSTER_ID}"

# Create Kafka topics with prefix WIKIPEDIA or EN_WIKIPEDIA, using ksqlDBUser principal
for t in WIKIPEDIABOT WIKIPEDIANOBOT EN_WIKIPEDIA_GT_1 EN_WIKIPEDIA_GT_1_COUNTS
do
export KAFKA_LOG4J_OPTS="-Dlog4j.rootLogger=DEBUG,stdout -Dlog4j.logger.kafka=DEBUG,stdout" && kafka-topics \
--bootstrap-server kafka1:11091 \
--command-config /etc/kafka/secrets/ksqlDBUser_without_interceptors_ssl.config \
--topic "$t" \
--create \
--replication-factor 2 \
--partitions 2
done
auth="superUser:superUser"

create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} users true ${auth}
create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} wikipedia.parsed true ${auth}
create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} wikipedia.parsed.count-by-domain false ${auth}
create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} wikipedia.failed false ${auth}
create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} WIKIPEDIABOT false ${auth}
create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} WIKIPEDIANOBOT false ${auth}
create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} EN_WIKIPEDIA_GT_1 false ${auth}
create_topic kafka1:8091 ${KAFKA_CLUSTER_ID} EN_WIKIPEDIA_GT_1_COUNTS false ${auth}
39 changes: 38 additions & 1 deletion scripts/helper/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ verify_installed()
preflight_checks()
{
# Verify appropriate tools are installed on host
for cmd in jq docker-compose keytool docker openssl xargs awk; do
for cmd in curl jq docker-compose keytool docker openssl xargs awk; do
verify_installed $cmd || exit 1
done

Expand Down Expand Up @@ -257,3 +257,40 @@ END
exit 1
fi
}

create_topic() {

local DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"

broker_host_port=$1
cluster_id=$2
topic_name=$3
confluent_value_schema_validation=$4
auth=$5

# note --tlsv1.2 below sets the _minimum_ allowed TLS version - expect TLS 1.3 to be negotiated here
{
IFS= read -rd '' out
IFS= read -rd '' http_code
IFS= read -rd '' status
} < <({ out=$(curl -sS -X POST \
-o /dev/stderr \
-w "%{http_code}" \
-u ${auth} \
--tlsv1.2 \
--cacert /etc/kafka/secrets/snakeoil-ca-1.crt \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--data-binary @<(jq -n --arg topic_name "${topic_name}" --arg confluent_value_schema_validation "${confluent_value_schema_validation}" -f ${DIR}/topic.jq) \
"https://${broker_host_port}/kafka/v3/clusters/${cluster_id}/topics"); } 2>&1; printf '\0%s' "$out" "$?") || true

echo "response code: " $http_code
echo $out| jq || true

if [[ $status -ne 0 || $http_code -gt 299 || -z $out || $out =~ "error_code" ]]; then
echo "ERROR: create topic failed $out"
return 1
fi

return 0
}
11 changes: 11 additions & 0 deletions scripts/helper/topic.jq
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"topic_name": $topic_name,
"partitions_count": 2,
"replication_factor": 2,
"configs": [
{
"name": "confluent.value.schema.validation",
"value": $confluent_value_schema_validation
}
]
}
2 changes: 1 addition & 1 deletion scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ docker-compose up -d schemaregistry connect control-center

echo
echo -e "Create topics in Kafka cluster:"
docker-compose exec kafka1 bash -c "/tmp/helper/create-topics.sh" || exit 1
docker-compose exec tools bash -c "/tmp/helper/create-topics.sh" || exit 1

# Verify Confluent Control Center has started
MAX_WAIT=300
Expand Down

0 comments on commit 851accf

Please sign in to comment.