Skip to content

Commit

Permalink
feat: support Camel Kafka props (#96)
Browse files Browse the repository at this point in the history
* feat: support camel props for Kafka SASL
* feat: Camel Kafka brokers prop

EVNT-611
  • Loading branch information
vkrizan authored Jul 25, 2022
1 parent b7b1e14 commit 3fc6e40
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ public class ClowderConfigSource implements ConfigSource {

public static final String CLOWDER_CONFIG_SOURCE = "ClowderConfigSource";

// Kafka SASL config keys.
// Kafka config keys.
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_SASL_JAAS_CONFIG_KEY = "kafka.sasl.jaas.config";
public static final String KAFKA_SASL_MECHANISM_KEY = "kafka.sasl.mechanism";
public static final String KAFKA_SECURITY_PROTOCOL_KEY = "kafka.security.protocol";
public static final String KAFKA_SSL_TRUSTSTORE_LOCATION_KEY = "kafka.ssl.truststore.location";
public static final String KAFKA_SSL_TRUSTSTORE_TYPE_KEY = "kafka.ssl.truststore.type";

// Camel Kafka config keys.
public static final String CAMEL_KAFKA_BROKERS = "camel.component.kafka.brokers";
public static final String CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY = "camel.component.kafka.sasl-jaas-config";
public static final String CAMEL_KAFKA_SASL_MECHANISM_KEY = "camel.component.kafka.sasl-mechanism";
public static final String CAMEL_KAFKA_SECURITY_PROTOCOL_KEY = "camel.component.kafka.security-protocol";
public static final String CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY = "camel.component.kafka.ssl-truststore-location";
public static final String CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY = "camel.component.kafka.ssl-truststore-type";

// Kafka SASL config values.
public static final String KAFKA_SSL_TRUSTSTORE_TYPE_VALUE = "PEM";

Expand All @@ -43,7 +52,12 @@ public class ClowderConfigSource implements ConfigSource {
KAFKA_SASL_MECHANISM_KEY,
KAFKA_SECURITY_PROTOCOL_KEY,
KAFKA_SSL_TRUSTSTORE_LOCATION_KEY,
KAFKA_SSL_TRUSTSTORE_TYPE_KEY
KAFKA_SSL_TRUSTSTORE_TYPE_KEY,
CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY,
CAMEL_KAFKA_SASL_MECHANISM_KEY,
CAMEL_KAFKA_SECURITY_PROTOCOL_KEY,
CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY,
CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY
);

Logger log = Logger.getLogger(getClass().getName());
Expand Down Expand Up @@ -122,7 +136,7 @@ public String getValue(String configKey) {
if (configKey.equals("quarkus.http.port")) {
return String.valueOf(root.webPort);
}
if (configKey.equals("kafka.bootstrap.servers")) {
if (configKey.equals(KAFKA_BOOTSTRAP_SERVERS) || configKey.equals(CAMEL_KAFKA_BROKERS)) {
if (root.kafka == null) {
throw new IllegalStateException("Kafka base object not present, can't set Kafka values");
}
Expand Down Expand Up @@ -161,6 +175,7 @@ public String getValue(String configKey) {
if (saslBroker.isPresent()) {
switch (configKey) {
case KAFKA_SASL_JAAS_CONFIG_KEY:
case CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY:
String username = saslBroker.get().sasl.username;
String password = saslBroker.get().sasl.password;
switch (saslBroker.get().sasl.saslMechanism) {
Expand All @@ -170,12 +185,16 @@ public String getValue(String configKey) {
return "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
}
case KAFKA_SASL_MECHANISM_KEY:
case CAMEL_KAFKA_SASL_MECHANISM_KEY:
return saslBroker.get().sasl.saslMechanism;
case KAFKA_SECURITY_PROTOCOL_KEY:
case CAMEL_KAFKA_SECURITY_PROTOCOL_KEY:
return saslBroker.get().sasl.securityProtocol;
case KAFKA_SSL_TRUSTSTORE_LOCATION_KEY:
case CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY:
return createTempKafkaCertFile(saslBroker.get().cacert);
case KAFKA_SSL_TRUSTSTORE_TYPE_KEY:
case CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY:
return KAFKA_SSL_TRUSTSTORE_TYPE_VALUE;
default:
throw new IllegalStateException("Unexpected Kafka SASL config key: " + configKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.KAFKA_SSL_TRUSTSTORE_LOCATION_KEY;
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.KAFKA_SSL_TRUSTSTORE_TYPE_KEY;
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.KAFKA_SSL_TRUSTSTORE_TYPE_VALUE;
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY;
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SASL_MECHANISM_KEY;
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SECURITY_PROTOCOL_KEY;
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY;
import static com.redhat.cloud.common.clowder.configsource.ClowderConfigSource.CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -64,15 +69,15 @@ void testWebPort() {

@Test
void testKafkaBootstrap() {
String boostrap = ccs.getValue("kafka.bootstrap.servers");
assertEquals("ephemeral-host.svc:29092", boostrap);
assertEquals("ephemeral-host.svc:29092", ccs.getValue("kafka.bootstrap.servers"));
assertEquals("ephemeral-host.svc:29092", ccs.getValue("camel.component.kafka.brokers"));
}

@Test
void testKafkaBootstrapServers() {
ClowderConfigSource ccs2 = new ClowderConfigSource("target/test-classes/cdappconfig2.json", APP_PROPS_MAP);
String boostrap = ccs2.getValue("kafka.bootstrap.servers");
assertEquals("ephemeral-host.svc:29092,other-host.svc:39092", boostrap);
assertEquals("ephemeral-host.svc:29092,other-host.svc:39092", ccs2.getValue("kafka.bootstrap.servers"));
assertEquals("ephemeral-host.svc:29092,other-host.svc:39092", ccs2.getValue("camel.component.kafka.brokers"));
}

@Test
Expand Down Expand Up @@ -240,28 +245,50 @@ void testKafkaNoAuthtype() {
assertNull(ccs.getValue(KAFKA_SECURITY_PROTOCOL_KEY));
assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY));
assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY));

assertNull(ccs.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY));
}

@Test
void testKafkaSaslPlainAuthtype() {
ClowderConfigSource ccs2 = new ClowderConfigSource("target/test-classes/cdappconfig_kafka_sasl_plain_authtype.json", APP_PROPS_MAP);
assertEquals("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"john\" password=\"doe\";", ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY));
String expJasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"john\" password=\"doe\";";
assertEquals(expJasConfig, ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY));
assertEquals("PLAIN", ccs2.getValue(KAFKA_SASL_MECHANISM_KEY));
assertEquals("SASL_SSL", ccs2.getValue(KAFKA_SECURITY_PROTOCOL_KEY));
assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY));
assertNull(ccs.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY));

assertEquals(expJasConfig, ccs2.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY));
assertEquals("PLAIN", ccs2.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY));
assertEquals("SASL_SSL", ccs2.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY));
}

@Test
void testKafkaSaslScramAuthtype() throws IOException {
ClowderConfigSource ccs2 = new ClowderConfigSource("target/test-classes/cdappconfig_kafka_sasl_scram_authtype.json", APP_PROPS_MAP);
assertEquals("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"john\" password=\"doe\";", ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY));
String expJasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"john\" password=\"doe\";";
assertEquals(expJasConfig, ccs2.getValue(KAFKA_SASL_JAAS_CONFIG_KEY));
assertEquals("SCRAM-SHA-512", ccs2.getValue(KAFKA_SASL_MECHANISM_KEY));
assertEquals("SASL_SSL", ccs2.getValue(KAFKA_SECURITY_PROTOCOL_KEY));
String truststoreLocation = ccs2.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY);
String cert = Files.readString(Path.of(truststoreLocation), UTF_8);
assertEquals(EXPECTED_CERT, cert);
assertEquals(KAFKA_SSL_TRUSTSTORE_TYPE_VALUE, ccs2.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY));

assertEquals(expJasConfig, ccs2.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY));
assertEquals("SCRAM-SHA-512", ccs2.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY));
assertEquals("SASL_SSL", ccs2.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY));
String camelTruststoreLocation = ccs2.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY);
String camelCert = Files.readString(Path.of(camelTruststoreLocation), UTF_8);
assertEquals(EXPECTED_CERT, camelCert);
assertEquals(KAFKA_SSL_TRUSTSTORE_TYPE_VALUE, ccs2.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY));
}

@Test
Expand All @@ -272,5 +299,11 @@ void testKafkaMtlsAuthtype() {
assertNull(ccs2.getValue(KAFKA_SECURITY_PROTOCOL_KEY));
assertNull(ccs2.getValue(KAFKA_SSL_TRUSTSTORE_LOCATION_KEY));
assertNull(ccs2.getValue(KAFKA_SSL_TRUSTSTORE_TYPE_KEY));

assertNull(ccs2.getValue(CAMEL_KAFKA_SASL_JAAS_CONFIG_KEY));
assertNull(ccs2.getValue(CAMEL_KAFKA_SASL_MECHANISM_KEY));
assertNull(ccs2.getValue(CAMEL_KAFKA_SECURITY_PROTOCOL_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_LOCATION_KEY));
assertNull(ccs.getValue(CAMEL_KAFKA_SSL_TRUSTSTORE_TYPE_KEY));
}
}

0 comments on commit 3fc6e40

Please sign in to comment.