Skip to content

Commit

Permalink
Merge pull request #33834 from ozangunalp/kafka_serde_discovery_fix_c…
Browse files Browse the repository at this point in the history
…hannel_name

Kafka serde discovery: handle channel names containing dot
  • Loading branch information
cescoffier authored Jun 6, 2023
2 parents 8cc5e03 + 81f3014 commit f7cfbd3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import static io.quarkus.smallrye.reactivemessaging.kafka.deployment.SmallRyeReactiveMessagingKafkaProcessor.getChannelPropertyKey;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -57,7 +59,7 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> channelsManagedB

String channelType = incoming ? "incoming" : "outgoing";
return isKafkaConnector.computeIfAbsent(channelType + "|" + channelName, ignored -> {
String connectorKey = "mp.messaging." + channelType + "." + channelName + ".connector";
String connectorKey = getChannelPropertyKey(channelName, "connector", incoming);
String connector = getConfig()
.getOptionalValue(connectorKey, String.class)
.orElse("ignored");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ private static List<String> getChannelProperties(String keySuffix, Config config
return values;
}

static String channelPropertyFormat = "mp.messaging.%s.%s.%s";

static String getChannelPropertyKey(String channelName, String propertyName, boolean incoming) {
return String.format(channelPropertyFormat, incoming ? "incoming" : "outgoing",
channelName.contains(".") ? "\"" + channelName + "\"" : channelName, propertyName);
}

@BuildStep
public void checkpointRedis(BuildProducer<AdditionalBeanBuildItem> additionalBean,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
Expand Down Expand Up @@ -173,7 +180,7 @@ public void defaultChannelConfiguration(
if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
continue;
}
String key = "mp.messaging.incoming." + channelName + ".graceful-shutdown";
String key = getChannelPropertyKey(channelName, "graceful-shutdown", true);
discoveryState.ifNotYetConfigured(key, () -> {
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false"));
});
Expand Down Expand Up @@ -215,12 +222,11 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type outgoingType = getOutgoingTypeFromMethod(method);
processOutgoingType(discovery, outgoingType, (keySerializer, valueSerializer) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer);
getChannelPropertyKey(channelName, "key.serializer", false), keySerializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer);
getChannelPropertyKey(channelName, "value.serializer", false), valueSerializer);

handleAdditionalProperties("mp.messaging.outgoing." + channelName + ".", discovery,
config, keySerializer, valueSerializer);
handleAdditionalProperties(channelName, false, discovery, config, keySerializer, valueSerializer);
}, generatedClass, reflection, alreadyGeneratedSerializers);
}

Expand All @@ -246,30 +252,29 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type outgoingType = getOutgoingTypeFromChannelInjectionPoint(injectionPointType);
processOutgoingType(discovery, outgoingType, (keySerializer, valueSerializer) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer);
getChannelPropertyKey(channelName, "key.serializer", false), keySerializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer);
getChannelPropertyKey(channelName, "value.serializer", false), valueSerializer);

handleAdditionalProperties("mp.messaging.outgoing." + channelName + ".", discovery,
config, keySerializer, valueSerializer);
handleAdditionalProperties(channelName, false, discovery, config, keySerializer, valueSerializer);
}, generatedClass, reflection, alreadyGeneratedSerializers);
}
}

private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, String channelName, Type injectionPointType) {
if (injectionPointType != null && isKafkaEmitter(injectionPointType)) {
String transactionalIdKey = getChannelPropertyKey(channelName, "transactional.id", false);
String enableIdempotenceKey = getChannelPropertyKey(channelName, "enable.idempotence", false);
String acksKey = getChannelPropertyKey(channelName, "acks", false);
LOGGER.infof("Transactional producer detected for channel '%s', setting following default config values: "
+ "'mp.messaging.outgoing.%s.transactional.id=${quarkus.application.name}-${channelName}', "
+ "'mp.messaging.outgoing.%s.enable.idempotence=true', "
+ "'mp.messaging.outgoing.%s.acks=all'", channelName, channelName, channelName, channelName);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".transactional.id",
+ "'" + transactionalIdKey + "=${quarkus.application.name}-${channelName}', "
+ "'" + enableIdempotenceKey + "=true', "
+ "'" + acksKey + "=all'", channelName);
produceRuntimeConfigurationDefaultBuildItem(discovery, config, transactionalIdKey,
"${quarkus.application.name}-" + channelName);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".enable.idempotence", "true");
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".acks", "all");
produceRuntimeConfigurationDefaultBuildItem(discovery, config, enableIdempotenceKey, "true");
produceRuntimeConfigurationDefaultBuildItem(discovery, config, acksKey, "all");
}
}

Expand All @@ -283,16 +288,15 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery,
alreadyGeneratedDeserializers);

produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer);
getChannelPropertyKey(channelName, "key.deserializer", true), keyDeserializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer);
getChannelPropertyKey(channelName, "value.deserializer", true), valueDeserializer);
if (Boolean.TRUE.equals(isBatchType)) {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.incoming." + channelName + ".batch", "true");
getChannelPropertyKey(channelName, "batch", true), "true");
}

handleAdditionalProperties("mp.messaging.incoming." + channelName + ".", discovery,
config, keyDeserializer, valueDeserializer);
handleAdditionalProperties(channelName, true, discovery, config, keyDeserializer, valueDeserializer);
});
}

Expand All @@ -308,15 +312,16 @@ private Type getInjectionPointType(AnnotationInstance annotation) {
}
}

private void handleAdditionalProperties(String configPropertyBase, DefaultSerdeDiscoveryState discovery,
private void handleAdditionalProperties(String channelName, boolean incoming, DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, Result... results) {
for (Result result : results) {
if (result == null) {
continue;
}

result.additionalProperties.forEach((key, value) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config, configPropertyBase + key, value);
String configKey = getChannelPropertyKey(channelName, key, incoming);
produceRuntimeConfigurationDefaultBuildItem(discovery, config, configKey, value);
});
}
}
Expand Down Expand Up @@ -946,22 +951,16 @@ private void processAnnotationsForReflectiveClassPayload(IndexView index, Config
}

private boolean isSerdeJson(IndexView index, Config config, String channelName, boolean serializer, boolean isKey) {
ConfigValue configValue = config.getConfigValue(getConfigName(channelName, serializer, isKey));
String configKey = getChannelPropertyKey(channelName, (isKey ? "key" : "value") + "." +
(serializer ? "serializer" : "deserializer"), !serializer);
ConfigValue configValue = config.getConfigValue(configKey);
if (configValue.getValue() != null) {
DotName serdeName = DotName.createSimple(configValue.getValue());
return serializer ? isSubclassOfJsonSerializer(index, serdeName) : isSubclassOfJsonDeserializer(index, serdeName);
}
return false;
}

String getConfigName(String channelName, boolean serializer, boolean isKey) {
return "mp.messaging." +
(serializer ? "outgoing" : "incoming") + "." +
channelName + "." +
(isKey ? "key" : "value") + "." +
(serializer ? "serializer" : "deserializer");
}

private boolean isSubclassOfJsonSerializer(IndexView index, DotName serializerName) {
return isSubclassOf(index, DotNames.OBJECT_MAPPER_SERIALIZER, serializerName) ||
isSubclassOf(index, DotNames.JSONB_SERIALIZER, serializerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2727,5 +2727,32 @@ void method2(JsonObject msg) {

}

@Test
void channelNameContainingDot() {
Tuple[] expectations = {
tuple("mp.messaging.incoming.\"new.channel\".key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"),
tuple("mp.messaging.incoming.\"new.channel\".value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"),
tuple("mp.messaging.outgoing.\"new.channel.out\".key.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
tuple("mp.messaging.outgoing.\"new.channel.out\".value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"),
tuple("mp.messaging.outgoing.\"new.channel.out\".transactional.id", "${quarkus.application.name}-new.channel.out"),
tuple("mp.messaging.outgoing.\"new.channel.out\".enable.idempotence", "true"),
tuple("mp.messaging.outgoing.\"new.channel.out\".acks", "all"),
};
doTest(expectations, ChannelContainingDot.class);
}


private static class ChannelContainingDot {

@Incoming("new.channel")
void method1(KafkaRecord<Integer, String> msg) {

}

@Channel("new.channel.out")
KafkaTransactions<ProducerRecord<Long, JsonObject>> transactions;

}


}

0 comments on commit f7cfbd3

Please sign in to comment.