From 2fe6fa2e2db6dad0b693a96082135345d861f17e Mon Sep 17 00:00:00 2001 From: Sahiba Mittal Date: Wed, 24 Jul 2024 14:53:19 +0100 Subject: [PATCH] rename kafka topic prefix for apache kafka compatibility --- .../dependencytrack/common/KafkaTopic.java | 2 +- .../common/KafkaTopicTest.java | 4 +- docs/reference/configuration/api-server.md | 4 +- .../reference/configuration/mirror-service.md | 6 +-- docs/reference/configuration/overview.md | 2 +- .../configuration/repo-meta-analyzer.md | 6 +-- .../src/main/resources/application.properties | 14 ++--- .../config/ParallelConsumerConfiguration.java | 2 +- .../src/main/resources/application.properties | 2 +- .../src/main/resources/application.properties | 8 +-- scripts/create-topics.sh | 52 +++++++++---------- .../src/main/resources/application.properties | 10 ++-- 12 files changed, 56 insertions(+), 56 deletions(-) diff --git a/commons/src/main/java/org/dependencytrack/common/KafkaTopic.java b/commons/src/main/java/org/dependencytrack/common/KafkaTopic.java index 4417acd43..8267dff02 100644 --- a/commons/src/main/java/org/dependencytrack/common/KafkaTopic.java +++ b/commons/src/main/java/org/dependencytrack/common/KafkaTopic.java @@ -55,7 +55,7 @@ public enum KafkaTopic { public String getName() { SmallRyeConfig config = ConfigProvider.getConfig().unwrap(SmallRyeConfig.class); - var prefixConfig = config.getConfigValue("kafka.topic.prefix"); + var prefixConfig = config.getConfigValue("dt.kafka.topic.prefix"); if (prefixConfig.getValue() != null) return prefixConfig.getValue() + name; return name; diff --git a/commons/src/test/java/org/dependencytrack/common/KafkaTopicTest.java b/commons/src/test/java/org/dependencytrack/common/KafkaTopicTest.java index 75d92b07c..164b8d075 100644 --- a/commons/src/test/java/org/dependencytrack/common/KafkaTopicTest.java +++ b/commons/src/test/java/org/dependencytrack/common/KafkaTopicTest.java @@ -34,13 +34,13 @@ static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { return Map.of( - "kafka.topic.prefix", "customPrefix." + "dt.kafka.topic.prefix", "customPrefix." ); } @Test void testKafkaTopicConfigWithPrefix() { - System.setProperty("kafka.topic.prefix", "customPrefix."); + System.setProperty("dt.kafka.topic.prefix", "customPrefix."); assertEquals("customPrefix.dtrack.vulnerability.mirror.command", KafkaTopic.VULNERABILITY_MIRROR_COMMAND.getName()); } } diff --git a/docs/reference/configuration/api-server.md b/docs/reference/configuration/api-server.md index 2d4746170..d22a540c2 100644 --- a/docs/reference/configuration/api-server.md +++ b/docs/reference/configuration/api-server.md @@ -3264,7 +3264,7 @@ Defines the socket / read timeout in seconds for outbound HTTP connections. --- -### kafka.topic.prefix +### dt.kafka.topic.prefix @@ -3284,7 +3284,7 @@ Defines the socket / read timeout in seconds for outbound HTTP connections. ENV - KAFKA_TOPIC_PREFIX + DT_KAFKA_TOPIC_PREFIX diff --git a/docs/reference/configuration/mirror-service.md b/docs/reference/configuration/mirror-service.md index 5f1b53f03..27ecb5bcf 100644 --- a/docs/reference/configuration/mirror-service.md +++ b/docs/reference/configuration/mirror-service.md @@ -378,7 +378,7 @@ Defines the maximum size of a Kafka producer request in bytes.

Some --- -### kafka.topic.prefix +### dt.kafka.topic.prefix Defines an optional prefix to assume for all Kafka topics the application consumes from, or produces to. The prefix will also be prepended to the application's consumer group ID. @@ -402,7 +402,7 @@ Defines an optional prefix to assume for all Kafka topics the application consu ENV - KAFKA_TOPIC_PREFIX + DT_KAFKA_TOPIC_PREFIX @@ -426,7 +426,7 @@ Defines the ID to uniquely identify this application in the Kafka cluster.
Default - ${kafka.topic.prefix}hyades-mirror-service + ${dt.kafka.topic.prefix}hyades-mirror-service ENV diff --git a/docs/reference/configuration/overview.md b/docs/reference/configuration/overview.md index 1f6d500e8..d4dc89551 100644 --- a/docs/reference/configuration/overview.md +++ b/docs/reference/configuration/overview.md @@ -13,7 +13,7 @@ environment variable can be used. | Environment Variable | Description | Default | Required | |:---------------------|:-----------------------|:--------|:--------:| -| `KAFKA_TOPIC_PREFIX` | Prefix for topic names | - | ❌ | +| `DT_KAFKA_TOPIC_PREFIX` | Prefix for topic names | - | ❌ | ### Notification Publisher diff --git a/docs/reference/configuration/repo-meta-analyzer.md b/docs/reference/configuration/repo-meta-analyzer.md index cde5376e2..09f353f8c 100644 --- a/docs/reference/configuration/repo-meta-analyzer.md +++ b/docs/reference/configuration/repo-meta-analyzer.md @@ -558,7 +558,7 @@ Comma-separated list of brokers to use for establishing the initial connection t --- -### kafka.topic.prefix +### dt.kafka.topic.prefix Defines an optional prefix to assume for all Kafka topics the application consumes from, or produces to. The prefix will also be prepended to the application's consumer group ID. @@ -582,7 +582,7 @@ Defines an optional prefix to assume for all Kafka topics the application consu ENV - KAFKA_TOPIC_PREFIX + DT_KAFKA_TOPIC_PREFIX @@ -606,7 +606,7 @@ Defines the ID to uniquely identify this application in the Kafka cluster.
Default - ${kafka.topic.prefix}hyades-repository-meta-analyzer + ${dt.kafka.topic.prefix}hyades-repository-meta-analyzer ENV diff --git a/mirror-service/src/main/resources/application.properties b/mirror-service/src/main/resources/application.properties index a1c8539a0..5ad5e0d0c 100644 --- a/mirror-service/src/main/resources/application.properties +++ b/mirror-service/src/main/resources/application.properties @@ -92,7 +92,7 @@ kafka.max.request.size=2097152 # @category: Kafka # @example: acme- # @type: string -kafka.topic.prefix= +dt.kafka.topic.prefix= # Defines the ID to uniquely identify this application in the Kafka cluster. #

@@ -100,16 +100,16 @@ kafka.topic.prefix= # # @category: Kafka # @type: string -quarkus.kafka-streams.application-id=${kafka.topic.prefix}hyades-mirror-service +quarkus.kafka-streams.application-id=${dt.kafka.topic.prefix}hyades-mirror-service # @category: Kafka # @hidden quarkus.kafka-streams.topics=\ - ${kafka.topic.prefix}dtrack.vulnerability,\ - ${kafka.topic.prefix}dtrack.vulnerability.digest,\ - ${kafka.topic.prefix}dtrack.vulnerability.mirror.command,\ - ${kafka.topic.prefix}dtrack.vulnerability.mirror.state,\ - ${kafka.topic.prefix}dtrack.epss + ${dt.kafka.topic.prefix}dtrack.vulnerability,\ + ${dt.kafka.topic.prefix}dtrack.vulnerability.digest,\ + ${dt.kafka.topic.prefix}dtrack.vulnerability.mirror.command,\ + ${dt.kafka.topic.prefix}dtrack.vulnerability.mirror.state,\ + ${dt.kafka.topic.prefix}dtrack.epss # @category: Kafka # @hidden diff --git a/notification-publisher/src/main/java/org/dependencytrack/notification/config/ParallelConsumerConfiguration.java b/notification-publisher/src/main/java/org/dependencytrack/notification/config/ParallelConsumerConfiguration.java index 7df467421..9fc8de54a 100644 --- a/notification-publisher/src/main/java/org/dependencytrack/notification/config/ParallelConsumerConfiguration.java +++ b/notification-publisher/src/main/java/org/dependencytrack/notification/config/ParallelConsumerConfiguration.java @@ -105,7 +105,7 @@ private static ParallelStreamProcessor createParallelConsu .createEosStreamProcessor(parallelConsumerOptions); final Optional optionalPrefix = ConfigProvider.getConfig() - .getOptionalValue("kafka.topic.prefix", String.class) + .getOptionalValue("dt.kafka.topic.prefix", String.class) .map(Pattern::quote); final var topicPattern = Pattern.compile(optionalPrefix.orElse("") + "dtrack\\.notification\\..+"); parallelConsumer.subscribe(topicPattern); diff --git a/notification-publisher/src/main/resources/application.properties b/notification-publisher/src/main/resources/application.properties index ed44dbac8..2ce75e695 100644 --- a/notification-publisher/src/main/resources/application.properties +++ b/notification-publisher/src/main/resources/application.properties @@ -24,7 +24,7 @@ quarkus.native.additional-build-args=\ kafka.consumer.group.id=${quarkus.application.name} quarkus.kafka.snappy.enabled=true -kafka.topic.prefix= +dt.kafka.topic.prefix= # Quarkus' ClassLoader black magic doesn't play well with # native libraries like the one required by Snappy. diff --git a/repository-meta-analyzer/src/main/resources/application.properties b/repository-meta-analyzer/src/main/resources/application.properties index 80562fa01..4d252fa4f 100644 --- a/repository-meta-analyzer/src/main/resources/application.properties +++ b/repository-meta-analyzer/src/main/resources/application.properties @@ -39,7 +39,7 @@ quarkus.log.category."org.apache.kafka".level=WARN # @category: Kafka # @example: acme- # @type: string -kafka.topic.prefix= +dt.kafka.topic.prefix= # Defines the ID to uniquely identify this application in the Kafka cluster. #

@@ -47,13 +47,13 @@ kafka.topic.prefix= # # @category: Kafka # @type: string -quarkus.kafka-streams.application-id=${kafka.topic.prefix}hyades-repository-meta-analyzer +quarkus.kafka-streams.application-id=${dt.kafka.topic.prefix}hyades-repository-meta-analyzer # @category: Kafka # @hidden quarkus.kafka-streams.topics=\ - ${kafka.topic.prefix}dtrack.repo-meta-analysis.component,\ - ${kafka.topic.prefix}dtrack.repo-meta-analysis.result + ${dt.kafka.topic.prefix}dtrack.repo-meta-analysis.component,\ + ${dt.kafka.topic.prefix}dtrack.repo-meta-analysis.result # Defines the interval in milliseconds at which consumer offsets are committed to the Kafka brokers. # The Kafka default of `30s` has been modified to `5s`. diff --git a/scripts/create-topics.sh b/scripts/create-topics.sh index 5d71e2495..721ff6c4f 100644 --- a/scripts/create-topics.sh +++ b/scripts/create-topics.sh @@ -39,48 +39,48 @@ rpk cluster health --watch --exit-when-healthy \ --api-urls "$(echo "$REDPANDA_BROKERS" | sed -E 's/:[[:digit:]]+/:9644/g')" notification_topics=( - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.analyzer" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.bom" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.configuration" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.datasource-mirroring" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.file-system" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.integration" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerability" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerable-dependency" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.policy-violation" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-audit-change" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-created" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-vuln-analysis-complete" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.repository" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.vex" - "${KAFKA_TOPIC_PREFIX:-}dtrack.notification.user" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.analyzer" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.bom" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.configuration" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.datasource-mirroring" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.file-system" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.integration" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerability" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerable-dependency" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.policy-violation" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-audit-change" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-created" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-vuln-analysis-complete" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.repository" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.vex" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.user" ) for topic_name in "${notification_topics[@]}"; do create_topic "$topic_name" "${NOTIFICATION_TOPICS_PARTITIONS:-1}" "retention.ms=${NOTIFICATION_TOPICS_RETENTION_MS:-43200000}" done repo_meta_analysis_topics=( - "${KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.component" - "${KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.result" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.component" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.result" ) for topic_name in "${repo_meta_analysis_topics[@]}"; do create_topic "$topic_name" "${REPO_META_ANALYSIS_TOPICS_PARTITIONS:-3}" "retention.ms=${REPO_META_ANALYSIS_TOPICS_RETENTION_MS:-43200000}" done vuln_analysis_topics=( - "${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.component" - "${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.scanner.result" - "${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result" - "${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result.processed" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.component" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.scanner.result" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result" + "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result.processed" ) for topic_name in "${vuln_analysis_topics[@]}"; do create_topic "$topic_name" "${VULN_ANALYSIS_TOPICS_PARTITIONS:-3}" "retention.ms=${VULN_ANALYSIS_TOPICS_RETENTION_MS:-43200000}" done -create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.command" "1" "retention.ms=${VULN_MIRROR_TOPICS_RETENTION_MS:-43200000}" -create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.state" "1" "cleanup.policy=compact segment.bytes=67108864 max.compaction.lag.ms=1" -create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.digest" "1" "cleanup.policy=compact segment.bytes=134217728" -create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact" -create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.epss" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact" +create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.command" "1" "retention.ms=${VULN_MIRROR_TOPICS_RETENTION_MS:-43200000}" +create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.state" "1" "cleanup.policy=compact segment.bytes=67108864 max.compaction.lag.ms=1" +create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.digest" "1" "cleanup.policy=compact segment.bytes=134217728" +create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact" +create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.epss" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact" echo "All topics created successfully" \ No newline at end of file diff --git a/vulnerability-analyzer/src/main/resources/application.properties b/vulnerability-analyzer/src/main/resources/application.properties index 38ac55eda..83b7436e5 100644 --- a/vulnerability-analyzer/src/main/resources/application.properties +++ b/vulnerability-analyzer/src/main/resources/application.properties @@ -13,13 +13,13 @@ quarkus.log.category."org.apache.kafka".level=WARN ## Kafka # %dev.kafka.bootstrap.servers=localhost:9092 -kafka.topic.prefix= -quarkus.kafka-streams.application-id=${kafka.topic.prefix}hyades-vulnerability-analyzer +dt.kafka.topic.prefix= +quarkus.kafka-streams.application-id=${dt.kafka.topic.prefix}hyades-vulnerability-analyzer quarkus.kafka-streams.application-server=localhost:8092 quarkus.kafka-streams.topics=\ - ${kafka.topic.prefix}dtrack.vuln-analysis.component,\ - ${kafka.topic.prefix}dtrack.vuln-analysis.scanner.result,\ - ${kafka.topic.prefix}dtrack.vuln-analysis.result + ${dt.kafka.topic.prefix}dtrack.vuln-analysis.component,\ + ${dt.kafka.topic.prefix}dtrack.vuln-analysis.scanner.result,\ + ${dt.kafka.topic.prefix}dtrack.vuln-analysis.result quarkus.kafka.snappy.enabled=true kafka.retry-attempts=2 kafka-streams.topology.optimization=all