From 3fc6e40fc919a09017112605fea85368f5658f78 Mon Sep 17 00:00:00 2001 From: Viliam Krizan Date: Mon, 25 Jul 2022 10:58:15 +0200 Subject: [PATCH] feat: support Camel Kafka props (#96) * feat: support camel props for Kafka SASL * feat: Camel Kafka brokers prop EVNT-611 --- .../configsource/ClowderConfigSource.java | 25 +++++++++-- .../configsource/ConfigSourceTest.java | 45 ++++++++++++++++--- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/redhat/cloud/common/clowder/configsource/ClowderConfigSource.java b/src/main/java/com/redhat/cloud/common/clowder/configsource/ClowderConfigSource.java index 8401ae7..f15be14 100644 --- a/src/main/java/com/redhat/cloud/common/clowder/configsource/ClowderConfigSource.java +++ b/src/main/java/com/redhat/cloud/common/clowder/configsource/ClowderConfigSource.java @@ -25,13 +25,22 @@ public class ClowderConfigSource implements ConfigSource { public static final String CLOWDER_CONFIG_SOURCE = "ClowderConfigSource"; - // Kafka SASL config keys. + // Kafka config keys. + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String KAFKA_SASL_JAAS_CONFIG_KEY = "kafka.sasl.jaas.config"; public static final String KAFKA_SASL_MECHANISM_KEY = "kafka.sasl.mechanism"; public static final String KAFKA_SECURITY_PROTOCOL_KEY = "kafka.security.protocol"; public static final String KAFKA_SSL_TRUSTSTORE_LOCATION_KEY = "kafka.ssl.truststore.location"; public static final String KAFKA_SSL_TRUSTSTORE_TYPE_KEY = "kafka.ssl.truststore.type"; + // Camel Kafka config keys. + public static final String CAMEL_KAFKA_BROKERS = "camel.component.kafka.brokers"; + public static final String CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY = "camel.component.kafka.sasl-jaas-config"; + public static final String CAMEL_KAFKA_SASL_MECHANISM_KEY = "camel.component.kafka.sasl-mechanism"; + public static final String CAMEL_KAFKA_SECURITY_PROTOCOL_KEY = "camel.component.kafka.security-protocol"; + public static final String CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY = "camel.component.kafka.ssl-truststore-location"; + public static final String CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY = "camel.component.kafka.ssl-truststore-type"; + // Kafka SASL config values. public static final String KAFKA_SSL_TRUSTSTORE_TYPE_VALUE = "PEM"; @@ -43,7 +52,12 @@ public class ClowderConfigSource implements ConfigSource { KAFKA_SASL_MECHANISM_KEY, KAFKA_SECURITY_PROTOCOL_KEY, KAFKA_SSL_TRUSTSTORE_LOCATION_KEY, - KAFKA_SSL_TRUSTSTORE_TYPE_KEY + KAFKA_SSL_TRUSTSTORE_TYPE_KEY, + CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY, + CAMEL_KAFKA_SASL_MECHANISM_KEY, + CAMEL_KAFKA_SECURITY_PROTOCOL_KEY, + CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY, + CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY ); Logger log = Logger.getLogger(getClass().getName()); @@ -122,7 +136,7 @@ public String getValue(String configKey) { if (configKey.equals("quarkus.http.port")) { return String.valueOf(root.webPort); } - if (configKey.equals("kafka.bootstrap.servers")) { + if (configKey.equals(KAFKA_BOOTSTRAP_SERVERS) || configKey.equals(CAMEL_KAFKA_BROKERS)) { if (root.kafka == null) { throw new IllegalStateException("Kafka base object not present, can't set Kafka values"); } @@ -161,6 +175,7 @@ public String getValue(String configKey) { if (saslBroker.isPresent()) { switch (configKey) { case KAFKA_SASL_JAAS_CONFIG_KEY: + case CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY: String username = saslBroker.get().sasl.username; String password = saslBroker.get().sasl.password; switch (saslBroker.get().sasl.saslMechanism) { @@ -170,12 +185,16 @@ public String getValue(String configKey) { return "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; } case KAFKA_SASL_MECHANISM_KEY: + case CAMEL_KAFKA_SASL_MECHANISM_KEY: return saslBroker.get().sasl.saslMechanism; case KAFKA_SECURITY_PROTOCOL_KEY: + case CAMEL_KAFKA_SECURITY_PROTOCOL_KEY: return saslBroker.get().sasl.securityProtocol; case KAFKA_SSL_TRUSTSTORE_LOCATION_KEY: + case CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY: return createTempKafkaCertFile(saslBroker.get().cacert); case KAFKA_SSL_TRUSTSTORE_TYPE_KEY: + case CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY: return KAFKA_SSL_TRUSTSTORE_TYPE_VALUE; default: throw new IllegalStateException("Unexpected Kafka SASL config key: " + configKey); diff --git a/src/test/java/com/redhat/cloud/common/clowder/configsource/ConfigSourceTest.java b/src/test/java/com/redhat/cloud/common/clowder/configsource/ConfigSourceTest.java index 329050f..6d9291f 100644 --- a/src/test/java/com/redhat/cloud/common/clowder/configsource/ConfigSourceTest.java +++ b/src/test/java/com/redhat/cloud/common/clowder/configsource/ConfigSourceTest.java @@ -20,6 +20,11 @@ import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.KAFKA_SSL_TRUSTSTORE_LOCATION_KEY; import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.KAFKA_SSL_TRUSTSTORE_TYPE_KEY; import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.KAFKA_SSL_TRUSTSTORE_TYPE_VALUE; +import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY; +import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SASL_MECHANISM_KEY; +import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SECURITY_PROTOCOL_KEY; +import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY; +import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -64,15 +69,15 @@ void testWebPort() { @Test void testKafkaBootstrap() { - String boostrap = ccs.getValue("kafka.bootstrap.servers"); - assertEquals("ephemeral-host.svc:29092", boostrap); + assertEquals("ephemeral-host.svc:29092", ccs.getValue("kafka.bootstrap.servers")); + assertEquals("ephemeral-host.svc:29092", ccs.getValue("camel.component.kafka.brokers")); } @Test void testKafkaBootstrapServers() { ClowderConfigSource ccs2 = new ClowderConfigSource("target/test-classes/cdappconfig2.json", APP_PROPS_MAP); - String boostrap = ccs2.getValue("kafka.bootstrap.servers"); - assertEquals("ephemeral-host.svc:29092,other-host.svc:39092", boostrap); + assertEquals("ephemeral-host.svc:29092,other-host.svc:39092", ccs2.getValue("kafka.bootstrap.servers")); + assertEquals("ephemeral-host.svc:29092,other-host.svc:39092", ccs2.getValue("camel.component.kafka.brokers")); } @Test @@ -240,28 +245,50 @@ void testKafkaNoAuthtype() { assertNull(ccs.getValue(KAFKA_SECURITY_PROTOCOL_KEY)); assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY)); assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); + + assertNull(ccs.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); } @Test void testKafkaSaslPlainAuthtype() { ClowderConfigSource ccs2 = new ClowderConfigSource("target/test-classes/cdappconfig_kafka_sasl_plain_authtype.json", APP_PROPS_MAP); - assertEquals("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"john\" password=\"doe\";", ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY)); + String expJasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"john\" password=\"doe\";"; + assertEquals(expJasConfig, ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY)); assertEquals("PLAIN", ccs2.getValue(KAFKA_SASL_MECHANISM_KEY)); assertEquals("SASL_SSL", ccs2.getValue(KAFKA_SECURITY_PROTOCOL_KEY)); assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY)); assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); + + assertEquals(expJasConfig, ccs2.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY)); + assertEquals("PLAIN", ccs2.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY)); + assertEquals("SASL_SSL", ccs2.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); } @Test void testKafkaSaslScramAuthtype() throws IOException { ClowderConfigSource ccs2 = new ClowderConfigSource("target/test-classes/cdappconfig_kafka_sasl_scram_authtype.json", APP_PROPS_MAP); - assertEquals("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"john\" password=\"doe\";", ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY)); + String expJasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"john\" password=\"doe\";"; + assertEquals(expJasConfig, ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY)); assertEquals("SCRAM-SHA-512", ccs2.getValue(KAFKA_SASL_MECHANISM_KEY)); assertEquals("SASL_SSL", ccs2.getValue(KAFKA_SECURITY_PROTOCOL_KEY)); String truststoreLocation = ccs2.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY); String cert = Files.readString(Path.of(truststoreLocation), UTF_8); assertEquals(EXPECTED_CERT, cert); assertEquals(KAFKA_SSL_TRUSTSTORE_TYPE_VALUE, ccs2.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); + + assertEquals(expJasConfig, ccs2.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY)); + assertEquals("SCRAM-SHA-512", ccs2.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY)); + assertEquals("SASL_SSL", ccs2.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY)); + String camelTruststoreLocation = ccs2.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY); + String camelCert = Files.readString(Path.of(camelTruststoreLocation), UTF_8); + assertEquals(EXPECTED_CERT, camelCert); + assertEquals(KAFKA_SSL_TRUSTSTORE_TYPE_VALUE, ccs2.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); } @Test @@ -272,5 +299,11 @@ void testKafkaMtlsAuthtype() { assertNull(ccs2.getValue(KAFKA_SECURITY_PROTOCOL_KEY)); assertNull(ccs2.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY)); assertNull(ccs2.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); + + assertNull(ccs2.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY)); + assertNull(ccs2.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY)); + assertNull(ccs2.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY)); + assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY)); } }