Skip to content

Commit

Permalink
Support Serde detection for Instance injection of channels
Browse files Browse the repository at this point in the history
Resolves #44500
  • Loading branch information
ozangunalp committed Nov 15, 2024
1 parent 65b55fd commit 75fde6c
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

final class DotNames {
// @formatter:off
static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName());
static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,15 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery,
}

private Type getInjectionPointType(AnnotationInstance annotation) {
switch (annotation.target().kind()) {
case FIELD:
return annotation.target().asField().type();
case METHOD_PARAMETER:
MethodParameterInfo parameter = annotation.target().asMethodParameter();
return parameter.method().parameterType(parameter.position());
default:
return null;
}
return switch (annotation.target().kind()) {
case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type());
case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type());
default -> null;
};
}

private Type handleInstanceChannelInjection(Type type) {
return DotNames.INSTANCE.equals(type.name()) ? type.asParameterizedType().arguments().get(0) : type;
}

private void handleAdditionalProperties(String channelName, boolean incoming, DefaultSerdeDiscoveryState discovery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -111,6 +112,7 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in

assertThat(configs)
.extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue)
.hasSize(expectations.length)
.allSatisfy(tuple -> {
Object[] e = tuple.toArray();
String key = (String) e[0];
Expand Down Expand Up @@ -3048,5 +3050,22 @@ private static class ChannelChildSerializer {
Multi<JsonbDto> channel2;
}

@Test
void instanceInjectionPoint() {
Tuple[] expectations = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"),
};
doTest(expectations, InstanceInjectionPoint.class);
}

private static class InstanceInjectionPoint {
@Channel("channel1")
Instance<Emitter<String>> emitter1;

@Channel("channel2")
Instance<Multi<Integer>> channel2;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

final class DotNames {
// @formatter:off
static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName());
static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.DotName;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.MethodParameterInfo;
import org.jboss.jandex.Type;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -144,15 +143,15 @@ private static String incomingSchemaKey(String channelName) {
}

private Type getInjectionPointType(AnnotationInstance annotation) {
switch (annotation.target().kind()) {
case FIELD:
return annotation.target().asField().type();
case METHOD_PARAMETER:
MethodParameterInfo parameter = annotation.target().asMethodParameter();
return parameter.method().parameterType(parameter.position());
default:
return null;
}
return switch (annotation.target().kind()) {
case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type());
case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type());
default -> null;
};
}

private Type handleInstanceChannelInjection(Type type) {
return DotNames.INSTANCE.equals(type.name()) ? type.asParameterizedType().arguments().get(0) : type;
}

private void produceRuntimeConfigurationDefaultBuildItem(DefaultSchemaDiscoveryState discovery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;

Expand Down Expand Up @@ -2108,5 +2109,22 @@ Multi<GenericPayload<Long>> method4() {
}
}

@Test
void instanceInjectionPoint() {
Tuple[] expectations = {
tuple("mp.messaging.outgoing.channel1.schema", "STRING"),
tuple("mp.messaging.incoming.channel2.schema", "INT32"),
};
doTest(expectations, InstanceInjectionPoint.class);
}

private static class InstanceInjectionPoint {
@Channel("channel1")
Instance<Emitter<String>> emitter1;

@Channel("channel2")
Instance<Multi<Integer>> channel2;
}


}

0 comments on commit 75fde6c

Please sign in to comment.