diff --git a/api/src/main/java/org/eclipse/microprofile/reactive/messaging/Message.java b/api/src/main/java/org/eclipse/microprofile/reactive/messaging/Message.java index ceb75fc395..3fd2ca759a 100644 --- a/api/src/main/java/org/eclipse/microprofile/reactive/messaging/Message.java +++ b/api/src/main/java/org/eclipse/microprofile/reactive/messaging/Message.java @@ -66,12 +66,12 @@ private static BiFunction> validateNa private static Function> wrapAck(Message message) { var ackM = message.getAckWithMetadata(); - return ackM != EMPTY_ACK ? ackM : validateAck(message.getAck()); + return ackM != null ? ackM : validateAck(message.getAck()); } private static BiFunction> wrapNack(Message message) { var nackM = message.getNackWithMetadata(); - return nackM != EMPTY_NACK ? nackM : validateNack(message.getNack()); + return nackM != null ? nackM : validateNack(message.getNack()); } private static Message newMessage(T payload, Metadata metadata) { @@ -502,7 +502,7 @@ default Supplier> getAck() { */ @Experimental("metadata propagation is a SmallRye-specific feature") default Function> getAckWithMetadata() { - return EMPTY_ACK; + return null; } /** @@ -517,7 +517,7 @@ default Function> getNack() { */ @Experimental("metadata propagation is a SmallRye-specific feature") default BiFunction> getNackWithMetadata() { - return EMPTY_NACK; + return null; } /** diff --git a/api/src/test/java/org/eclipse/microprofile/reactive/messaging/CustomLegacyMessageAckNackWithMetadataTest.java b/api/src/test/java/org/eclipse/microprofile/reactive/messaging/CustomLegacyMessageAckNackWithMetadataTest.java new file mode 100644 index 0000000000..f499a8654d --- /dev/null +++ b/api/src/test/java/org/eclipse/microprofile/reactive/messaging/CustomLegacyMessageAckNackWithMetadataTest.java @@ -0,0 +1,376 @@ +package org.eclipse.microprofile.reactive.messaging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.junit.jupiter.api.Test; + +public class CustomLegacyMessageAckNackWithMetadataTest { + + private final MyMetadata myMetadata = new MyMetadata("bar"); + + @Test + public void testCreationFromPayloadAndAck() { + AtomicInteger count = new AtomicInteger(0); + Message message = new Message<>() { + @Override + public String getPayload() { + return "foo"; + } + + @Override + public Supplier> getAck() { + return this::ack; + } + + @Override + public CompletionStage ack() { + count.incrementAndGet(); + return CompletableFuture.completedFuture(null); + } + + }; + assertThat(message.getPayload()).isEqualTo("foo"); + assertThat(message.getMetadata()).hasSize(0); + assertThat(message.getAck()).isNotNull(); + assertThat(message.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(message.ack().toCompletableFuture().join()).isNull(); + assertThat(message.ack(Metadata.empty()).toCompletableFuture().join()).isNull(); + assertThat(message.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(message.nack(new Exception("cause"), Metadata.empty()).toCompletableFuture().join()).isNull(); + + assertThat(count).hasValue(2); + + } + + @Test + public void testCreationFromPayloadMetadataAndAck() { + AtomicInteger count = new AtomicInteger(0); + Message message = new Message<>() { + @Override + public String getPayload() { + return "foo"; + } + + @Override + public Metadata getMetadata() { + return Metadata.of(myMetadata); + } + + @Override + public Supplier> getAck() { + return this::ack; + } + + @Override + public CompletionStage ack() { + count.incrementAndGet(); + return CompletableFuture.completedFuture(null); + } + + }; + assertThat(message.getPayload()).isEqualTo("foo"); + assertThat(message.getMetadata()).hasSize(1).containsExactly(myMetadata); + assertThat(message.getAck()).isNotNull(); + assertThat(message.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(message.ack().toCompletableFuture().join()).isNull(); + assertThat(message.ack(Metadata.empty()).toCompletableFuture().join()).isNull(); + assertThat(message.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(count).hasValue(2); + + assertThat(Message.of("foo", null, () -> CompletableFuture.completedFuture(null)).getMetadata()) + .isEmpty(); + } + + @Test + public void testCreationFromPayloadMetadataAsIterableAndAck() { + List metadata = Arrays.asList(myMetadata, new AtomicInteger(2)); + AtomicInteger count = new AtomicInteger(0); + Message message = new Message<>() { + @Override + public String getPayload() { + return "foo"; + } + + @Override + public Metadata getMetadata() { + return Metadata.from(metadata); + } + + @Override + public Supplier> getAck() { + return this::ack; + } + + @Override + public CompletionStage ack() { + count.incrementAndGet(); + return CompletableFuture.completedFuture(null); + } + + }; + + assertThat(message.getPayload()).isEqualTo("foo"); + assertThat(message.getMetadata()).hasSize(2).contains(myMetadata); + assertThat(message.getAck()).isNotNull(); + assertThat(message.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(message.ack().toCompletableFuture().join()).isNull(); + assertThat(message.ack(Metadata.from(metadata)).toCompletableFuture().join()).isNull(); + assertThat(message.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(message.nack(new Exception("cause"), Metadata.from(metadata)).toCompletableFuture().join()).isNull(); + assertThat(count).hasValue(2); + + assertThatThrownBy(() -> Message.of("foo", (Iterable) null, () -> CompletableFuture.completedFuture(null))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testCreationFromPayloadMetadataAckAndNack() { + AtomicInteger ack = new AtomicInteger(0); + AtomicInteger nack = new AtomicInteger(0); + Message message = new CustomLegacyMessage<>("foo", Metadata.of(myMetadata), ack, nack); + + assertThat(message.getPayload()).isEqualTo("foo"); + assertThat(message.getMetadata()).hasSize(1).containsExactly(myMetadata); + assertThat(message.getAck()).isNotNull(); + assertThat(message.getNack()).isNotNull(); + + assertThat(message.ack().toCompletableFuture().join()).isNull(); + assertThat(message.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(message.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(message.nack(new Exception("cause"), Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(ack).hasValue(2); + assertThat(nack).hasValue(2); + } + + @Test + public void testWithPayload() { + AtomicInteger ack = new AtomicInteger(0); + AtomicInteger nack = new AtomicInteger(0); + Message message = new CustomLegacyMessage<>("foo", Metadata.of(myMetadata), ack, nack); + + Message created = message.withPayload("bar"); + assertThat(created.getPayload()).isEqualTo("bar"); + assertThat(created.getMetadata()).hasSize(1).containsExactly(myMetadata); + assertThat(created.getAck()).isNotNull(); + assertThat(created.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(created.ack().toCompletableFuture().join()).isNull(); + assertThat(created.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause"), Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(ack).hasValue(2); + assertThat(nack).hasValue(2); + + } + + @Test + public void testWithMetadata() { + AtomicInteger ack = new AtomicInteger(0); + AtomicInteger nack = new AtomicInteger(0); + Message message = new CustomLegacyMessage<>("foo", Metadata.of(myMetadata), ack, nack); + MyMetadata mm = new MyMetadata("hello"); + Message created = message.withMetadata(Metadata.of(mm)); + assertThat(created.getPayload()).isEqualTo("foo"); + assertThat(created.getMetadata()).hasSize(1).containsExactly(mm); + assertThat(created.getAck()).isNotNull(); + assertThat(created.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(created.ack().toCompletableFuture().join()).isNull(); + assertThat(created.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause"), Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(ack).hasValue(2); + assertThat(nack).hasValue(2); + + } + + @Test + public void testWithAck() { + AtomicInteger ack = new AtomicInteger(0); + AtomicInteger ack2 = new AtomicInteger(0); + AtomicInteger nack = new AtomicInteger(0); + Message message = new CustomLegacyMessage<>("foo", Metadata.of(myMetadata), ack, nack); + Message created = message.withAck(() -> { + ack2.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }); + assertThat(created.getPayload()).isEqualTo("foo"); + assertThat(created.getMetadata()).hasSize(1).containsExactly(myMetadata); + assertThat(created.getAck()).isNotNull(); + assertThat(created.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(created.ack().toCompletableFuture().join()).isNull(); + assertThat(created.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause"), Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(ack2).hasValue(2); + assertThat(ack).hasValue(0); + assertThat(nack).hasValue(2); + + } + + @Test + public void testWithNack() { + AtomicInteger ack = new AtomicInteger(0); + AtomicInteger nack = new AtomicInteger(0); + AtomicInteger nack2 = new AtomicInteger(0); + Message message = new CustomLegacyMessage<>("foo", Metadata.of(myMetadata), ack, nack); + + Message created = message.withNack(t -> { + assertThat(t).hasMessage("cause"); + nack2.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }); + assertThat(created.getPayload()).isEqualTo("foo"); + assertThat(created.getMetadata()).hasSize(1).containsExactly(myMetadata); + assertThat(created.getAck()).isNotNull(); + assertThat(created.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(created.ack().toCompletableFuture().join()).isNull(); + assertThat(created.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause"), Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(ack).hasValue(2); + assertThat(nack2).hasValue(2); + assertThat(nack).hasValue(0); + } + + @Test + public void testAddMetadata() { + AtomicInteger ack = new AtomicInteger(0); + AtomicInteger nack = new AtomicInteger(0); + Message message = new CustomLegacyMessage<>("foo", Metadata.of(myMetadata), ack, nack); + Message created = message.addMetadata(new AtomicInteger(2)); + assertThat(created.getPayload()).isEqualTo("foo"); + assertThat(created.getMetadata()).hasSize(2).contains(myMetadata); + assertThat(created.getAck()).isNotNull(); + assertThat(created.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(created.ack().toCompletableFuture().join()).isNull(); + assertThat(created.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause"), Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(ack).hasValue(2); + assertThat(nack).hasValue(2); + } + + @Test + public void testAckAndNackNull() { + AtomicInteger ack = new AtomicInteger(0); + AtomicInteger nack = new AtomicInteger(0); + Message message = new CustomLegacyMessage<>("foo", Metadata.of(myMetadata), ack, nack); + Message created = message.withAck(null).withNack(null); + assertThat(created.getPayload()).isEqualTo("foo"); + assertThat(created.getMetadata()).hasSize(1).contains(myMetadata); + assertThat(created.getAck()).isNotNull(); + assertThat(created.getNack()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); + + assertThat(created.ack().toCompletableFuture().join()).isNull(); + assertThat(created.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause")).toCompletableFuture().join()).isNull(); + assertThat(created.nack(new Exception("cause"), Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); + assertThat(ack).hasValue(0); + assertThat(nack).hasValue(0); + } + + @Test + public void testAccessingMetadata() { + Message message = Message.of("hello", Metadata.of(myMetadata)).addMetadata(new AtomicInteger(2)); + + assertThat(message.getMetadata(MyMetadata.class)) + .hasValueSatisfying(m -> assertThat(m.getValue()).isEqualTo("bar")); + assertThat(message.getMetadata(AtomicInteger.class)).hasValueSatisfying(m -> assertThat(m.get()).isEqualTo(2)); + assertThat(message.getMetadata(String.class)).isEmpty(); + assertThatThrownBy(() -> message.getMetadata(null)).isInstanceOf(IllegalArgumentException.class); + } + + private static class MyMetadata { + private final String value; + + public MyMetadata(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + } + + private static class CustomLegacyMessage implements Message { + + T payload; + Metadata metadata; + AtomicInteger ack; + AtomicInteger nack; + + public CustomLegacyMessage(T payload, Metadata metadata, AtomicInteger ack, AtomicInteger nack) { + this.payload = payload; + this.metadata = metadata; + this.ack = ack; + this.nack = nack; + } + + @Override + public T getPayload() { + return payload; + } + + @Override + public Metadata getMetadata() { + return metadata; + } + + @Override + public Supplier> getAck() { + return this::ack; + } + + @Override + public Function> getNack() { + return this::nack; + } + + @Override + public CompletionStage ack() { + ack.incrementAndGet(); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletionStage nack(Throwable reason, Metadata metadata) { + assertThat(reason).hasMessage("cause"); + nack.incrementAndGet(); + return CompletableFuture.completedFuture(null); + } + } +} diff --git a/api/src/test/java/org/eclipse/microprofile/reactive/messaging/CustomMessageAckNackWithMetadataTest.java b/api/src/test/java/org/eclipse/microprofile/reactive/messaging/CustomMessageAckNackWithMetadataTest.java index cfa49ebbcc..be24190575 100644 --- a/api/src/test/java/org/eclipse/microprofile/reactive/messaging/CustomMessageAckNackWithMetadataTest.java +++ b/api/src/test/java/org/eclipse/microprofile/reactive/messaging/CustomMessageAckNackWithMetadataTest.java @@ -24,8 +24,8 @@ public void testCreationFromPayloadOnly() { assertThat(message.getMetadata()).isEmpty(); assertThat(message.getAck()).isNotNull(); assertThat(message.getNack()).isNotNull(); - assertThat(message.getAckWithMetadata()).isNotNull(); - assertThat(message.getNackWithMetadata()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); assertThat(message.ack().toCompletableFuture().join()).isNull(); assertThat(message.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); @@ -50,8 +50,8 @@ public Metadata getMetadata() { assertThat(message.getMetadata()).hasSize(1).containsExactly(myMetadata); assertThat(message.getAck()).isNotNull(); assertThat(message.getNack()).isNotNull(); - assertThat(message.getAckWithMetadata()).isNotNull(); - assertThat(message.getNackWithMetadata()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); assertThat(message.ack().toCompletableFuture().join()).isNull(); assertThat(message.ack(Metadata.of(myMetadata)).toCompletableFuture().join()).isNull(); @@ -94,8 +94,8 @@ public Metadata getMetadata() { assertThat(message.getMetadata()).hasSize(2); assertThat(message.getAck()).isNotNull(); assertThat(message.getNack()).isNotNull(); - assertThat(message.getAckWithMetadata()).isNotNull(); - assertThat(message.getNackWithMetadata()).isNotNull(); + assertThat(message.getAckWithMetadata()).isNull(); + assertThat(message.getNackWithMetadata()).isNull(); assertThat(message.ack().toCompletableFuture().join()).isNull(); assertThat(message.ack(Metadata.from(metadata)).toCompletableFuture().join()).isNull(); @@ -131,7 +131,7 @@ public CompletionStage ack(Metadata metadata) { assertThat(message.getAck()).isNotNull(); assertThat(message.getNack()).isNotNull(); assertThat(message.getAckWithMetadata()).isNotNull(); - assertThat(message.getNackWithMetadata()).isNotNull(); + assertThat(message.getNackWithMetadata()).isNull(); assertThat(message.ack().toCompletableFuture().join()).isNull(); assertThat(message.ack(Metadata.empty()).toCompletableFuture().join()).isNull(); @@ -173,7 +173,7 @@ public CompletionStage ack(Metadata metadata) { assertThat(message.getAck()).isNotNull(); assertThat(message.getNack()).isNotNull(); assertThat(message.getAckWithMetadata()).isNotNull(); - assertThat(message.getNackWithMetadata()).isNotNull(); + assertThat(message.getNackWithMetadata()).isNull(); assertThat(message.ack().toCompletableFuture().join()).isNull(); assertThat(message.ack(Metadata.empty()).toCompletableFuture().join()).isNull(); @@ -217,7 +217,7 @@ public CompletionStage ack(Metadata metadata) { assertThat(message.getAck()).isNotNull(); assertThat(message.getNack()).isNotNull(); assertThat(message.getAckWithMetadata()).isNotNull(); - assertThat(message.getNackWithMetadata()).isNotNull(); + assertThat(message.getNackWithMetadata()).isNull(); assertThat(message.ack().toCompletableFuture().join()).isNull(); assertThat(message.ack(Metadata.from(metadata)).toCompletableFuture().join()).isNull();