Skip to content

Commit

Permalink
Merge pull request #1411 from DependencyTrack/rename-topic-prefix-config
Browse files Browse the repository at this point in the history
Rename kafka topic prefix for apache kafka compatibility
  • Loading branch information
nscuro authored Jul 25, 2024
2 parents 8fb0a45 + 2fe6fa2 commit ae1f7ab
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public enum KafkaTopic {

public String getName() {
SmallRyeConfig config = ConfigProvider.getConfig().unwrap(SmallRyeConfig.class);
var prefixConfig = config.getConfigValue("kafka.topic.prefix");
var prefixConfig = config.getConfigValue("dt.kafka.topic.prefix");
if (prefixConfig.getValue() != null)
return prefixConfig.getValue() + name;
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
return Map.of(
"kafka.topic.prefix", "customPrefix."
"dt.kafka.topic.prefix", "customPrefix."
);
}

@Test
void testKafkaTopicConfigWithPrefix() {
System.setProperty("kafka.topic.prefix", "customPrefix.");
System.setProperty("dt.kafka.topic.prefix", "customPrefix.");
assertEquals("customPrefix.dtrack.vulnerability.mirror.command", KafkaTopic.VULNERABILITY_MIRROR_COMMAND.getName());
}
}
Expand Down
4 changes: 2 additions & 2 deletions docs/reference/configuration/api-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -3264,7 +3264,7 @@ Defines the socket / read timeout in seconds for outbound HTTP connections.

---

### kafka.topic.prefix
### dt.kafka.topic.prefix



Expand All @@ -3284,7 +3284,7 @@ Defines the socket / read timeout in seconds for outbound HTTP connections.
</tr>
<tr>
<th style="text-align: right">ENV</th>
<td style="border-width: 0"><code>KAFKA_TOPIC_PREFIX</code></td>
<td style="border-width: 0"><code>DT_KAFKA_TOPIC_PREFIX</code></td>
</tr>
</tbody>
</table>
Expand Down
6 changes: 3 additions & 3 deletions docs/reference/configuration/mirror-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ Defines the maximum size of a Kafka producer request in bytes. <br/><br/> Some

---

### kafka.topic.prefix
### dt.kafka.topic.prefix

Defines an optional prefix to assume for all Kafka topics the application consumes from, or produces to. The prefix will also be prepended to the application's consumer group ID.

Expand All @@ -402,7 +402,7 @@ Defines an optional prefix to assume for all Kafka topics the application consu
</tr>
<tr>
<th style="text-align: right">ENV</th>
<td style="border-width: 0"><code>KAFKA_TOPIC_PREFIX</code></td>
<td style="border-width: 0"><code>DT_KAFKA_TOPIC_PREFIX</code></td>
</tr>
</tbody>
</table>
Expand All @@ -426,7 +426,7 @@ Defines the ID to uniquely identify this application in the Kafka cluster. <br/
</tr>
<tr>
<th style="text-align: right">Default</th>
<td style="border-width: 0"><code>${kafka.topic.prefix}hyades-mirror-service</code></td>
<td style="border-width: 0"><code>${dt.kafka.topic.prefix}hyades-mirror-service</code></td>
</tr>
<tr>
<th style="text-align: right">ENV</th>
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/configuration/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ environment variable can be used.

| Environment Variable | Description | Default | Required |
|:---------------------|:-----------------------|:--------|:--------:|
| `KAFKA_TOPIC_PREFIX` | Prefix for topic names | - ||
| `DT_KAFKA_TOPIC_PREFIX` | Prefix for topic names | - ||

### Notification Publisher

Expand Down
6 changes: 3 additions & 3 deletions docs/reference/configuration/repo-meta-analyzer.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ Comma-separated list of brokers to use for establishing the initial connection t

---

### kafka.topic.prefix
### dt.kafka.topic.prefix

Defines an optional prefix to assume for all Kafka topics the application consumes from, or produces to. The prefix will also be prepended to the application's consumer group ID.

Expand All @@ -582,7 +582,7 @@ Defines an optional prefix to assume for all Kafka topics the application consu
</tr>
<tr>
<th style="text-align: right">ENV</th>
<td style="border-width: 0"><code>KAFKA_TOPIC_PREFIX</code></td>
<td style="border-width: 0"><code>DT_KAFKA_TOPIC_PREFIX</code></td>
</tr>
</tbody>
</table>
Expand All @@ -606,7 +606,7 @@ Defines the ID to uniquely identify this application in the Kafka cluster. <br/
</tr>
<tr>
<th style="text-align: right">Default</th>
<td style="border-width: 0"><code>${kafka.topic.prefix}hyades-repository-meta-analyzer</code></td>
<td style="border-width: 0"><code>${dt.kafka.topic.prefix}hyades-repository-meta-analyzer</code></td>
</tr>
<tr>
<th style="text-align: right">ENV</th>
Expand Down
14 changes: 7 additions & 7 deletions mirror-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,24 @@ kafka.max.request.size=2097152
# @category: Kafka
# @example: acme-
# @type: string
kafka.topic.prefix=
dt.kafka.topic.prefix=

# Defines the ID to uniquely identify this application in the Kafka cluster.
# <br/><br/>
# Refer to <https://kafka.apache.org/documentation/#streamsconfigs_application.id> for details.
#
# @category: Kafka
# @type: string
quarkus.kafka-streams.application-id=${kafka.topic.prefix}hyades-mirror-service
quarkus.kafka-streams.application-id=${dt.kafka.topic.prefix}hyades-mirror-service

# @category: Kafka
# @hidden
quarkus.kafka-streams.topics=\
${kafka.topic.prefix}dtrack.vulnerability,\
${kafka.topic.prefix}dtrack.vulnerability.digest,\
${kafka.topic.prefix}dtrack.vulnerability.mirror.command,\
${kafka.topic.prefix}dtrack.vulnerability.mirror.state,\
${kafka.topic.prefix}dtrack.epss
${dt.kafka.topic.prefix}dtrack.vulnerability,\
${dt.kafka.topic.prefix}dtrack.vulnerability.digest,\
${dt.kafka.topic.prefix}dtrack.vulnerability.mirror.command,\
${dt.kafka.topic.prefix}dtrack.vulnerability.mirror.state,\
${dt.kafka.topic.prefix}dtrack.epss

# @category: Kafka
# @hidden
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private static ParallelStreamProcessor<String, Notification> createParallelConsu
.createEosStreamProcessor(parallelConsumerOptions);

final Optional<String> optionalPrefix = ConfigProvider.getConfig()
.getOptionalValue("kafka.topic.prefix", String.class)
.getOptionalValue("dt.kafka.topic.prefix", String.class)
.map(Pattern::quote);
final var topicPattern = Pattern.compile(optionalPrefix.orElse("") + "dtrack\\.notification\\..+");
parallelConsumer.subscribe(topicPattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ quarkus.native.additional-build-args=\
kafka.consumer.group.id=${quarkus.application.name}
quarkus.kafka.snappy.enabled=true

kafka.topic.prefix=
dt.kafka.topic.prefix=

# Quarkus' ClassLoader black magic doesn't play well with
# native libraries like the one required by Snappy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ quarkus.log.category."org.apache.kafka".level=WARN
# @category: Kafka
# @example: acme-
# @type: string
kafka.topic.prefix=
dt.kafka.topic.prefix=

# Defines the ID to uniquely identify this application in the Kafka cluster.
# <br/><br/>
# Refer to <https://kafka.apache.org/documentation/#streamsconfigs_application.id> for details.
#
# @category: Kafka
# @type: string
quarkus.kafka-streams.application-id=${kafka.topic.prefix}hyades-repository-meta-analyzer
quarkus.kafka-streams.application-id=${dt.kafka.topic.prefix}hyades-repository-meta-analyzer

# @category: Kafka
# @hidden
quarkus.kafka-streams.topics=\
${kafka.topic.prefix}dtrack.repo-meta-analysis.component,\
${kafka.topic.prefix}dtrack.repo-meta-analysis.result
${dt.kafka.topic.prefix}dtrack.repo-meta-analysis.component,\
${dt.kafka.topic.prefix}dtrack.repo-meta-analysis.result

# Defines the interval in milliseconds at which consumer offsets are committed to the Kafka brokers.
# The Kafka default of `30s` has been modified to `5s`.
Expand Down
52 changes: 26 additions & 26 deletions scripts/create-topics.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,48 +39,48 @@ rpk cluster health --watch --exit-when-healthy \
--api-urls "$(echo "$REDPANDA_BROKERS" | sed -E 's/:[[:digit:]]+/:9644/g')"

notification_topics=(
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.analyzer"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.bom"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.configuration"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.datasource-mirroring"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.file-system"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.integration"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerability"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerable-dependency"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.policy-violation"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-audit-change"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-created"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-vuln-analysis-complete"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.repository"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.vex"
"${KAFKA_TOPIC_PREFIX:-}dtrack.notification.user"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.analyzer"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.bom"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.configuration"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.datasource-mirroring"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.file-system"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.integration"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerability"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.new-vulnerable-dependency"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.policy-violation"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-audit-change"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-created"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.project-vuln-analysis-complete"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.repository"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.vex"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.notification.user"
)
for topic_name in "${notification_topics[@]}"; do
create_topic "$topic_name" "${NOTIFICATION_TOPICS_PARTITIONS:-1}" "retention.ms=${NOTIFICATION_TOPICS_RETENTION_MS:-43200000}"
done

repo_meta_analysis_topics=(
"${KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.component"
"${KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.result"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.component"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.repo-meta-analysis.result"
)
for topic_name in "${repo_meta_analysis_topics[@]}"; do
create_topic "$topic_name" "${REPO_META_ANALYSIS_TOPICS_PARTITIONS:-3}" "retention.ms=${REPO_META_ANALYSIS_TOPICS_RETENTION_MS:-43200000}"
done

vuln_analysis_topics=(
"${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.component"
"${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.scanner.result"
"${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result"
"${KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result.processed"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.component"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.scanner.result"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result"
"${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vuln-analysis.result.processed"
)
for topic_name in "${vuln_analysis_topics[@]}"; do
create_topic "$topic_name" "${VULN_ANALYSIS_TOPICS_PARTITIONS:-3}" "retention.ms=${VULN_ANALYSIS_TOPICS_RETENTION_MS:-43200000}"
done

create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.command" "1" "retention.ms=${VULN_MIRROR_TOPICS_RETENTION_MS:-43200000}"
create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.state" "1" "cleanup.policy=compact segment.bytes=67108864 max.compaction.lag.ms=1"
create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.digest" "1" "cleanup.policy=compact segment.bytes=134217728"
create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact"
create_topic "${KAFKA_TOPIC_PREFIX:-}dtrack.epss" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact"
create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.command" "1" "retention.ms=${VULN_MIRROR_TOPICS_RETENTION_MS:-43200000}"
create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.mirror.state" "1" "cleanup.policy=compact segment.bytes=67108864 max.compaction.lag.ms=1"
create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability.digest" "1" "cleanup.policy=compact segment.bytes=134217728"
create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.vulnerability" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact"
create_topic "${DT_KAFKA_TOPIC_PREFIX:-}dtrack.epss" "${VULN_MIRROR_TOPICS_PARTITIONS:-3}" "cleanup.policy=compact"

echo "All topics created successfully"
10 changes: 5 additions & 5 deletions vulnerability-analyzer/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ quarkus.log.category."org.apache.kafka".level=WARN
## Kafka
#
%dev.kafka.bootstrap.servers=localhost:9092
kafka.topic.prefix=
quarkus.kafka-streams.application-id=${kafka.topic.prefix}hyades-vulnerability-analyzer
dt.kafka.topic.prefix=
quarkus.kafka-streams.application-id=${dt.kafka.topic.prefix}hyades-vulnerability-analyzer
quarkus.kafka-streams.application-server=localhost:8092
quarkus.kafka-streams.topics=\
${kafka.topic.prefix}dtrack.vuln-analysis.component,\
${kafka.topic.prefix}dtrack.vuln-analysis.scanner.result,\
${kafka.topic.prefix}dtrack.vuln-analysis.result
${dt.kafka.topic.prefix}dtrack.vuln-analysis.component,\
${dt.kafka.topic.prefix}dtrack.vuln-analysis.scanner.result,\
${dt.kafka.topic.prefix}dtrack.vuln-analysis.result
quarkus.kafka.snappy.enabled=true
kafka.retry-attempts=2
kafka-streams.topology.optimization=all
Expand Down

0 comments on commit ae1f7ab

Please sign in to comment.