From 220a244887823e0280986152d35f82c53819d21f Mon Sep 17 00:00:00 2001 From: l3r8yJ Date: Sat, 8 Jul 2023 23:04:52 +0300 Subject: [PATCH] messages --- .../github/eocqrs/kafka/fake/FkRecords.java | 16 +- .../kafka/act/AssignPartitionsTest.java | 5 +- .../eocqrs/kafka/act/CommitAsyncTest.java | 5 +- .../eocqrs/kafka/act/CommitSyncTest.java | 5 +- .../github/eocqrs/kafka/act/WakeupTest.java | 5 +- .../eocqrs/kafka/consumer/KfConsumerTest.java | 95 +++++++---- .../settings/KfConsumerParamsTest.java | 5 +- .../eocqrs/kafka/fake/FkConsumerTest.java | 149 +++++++++++------- .../eocqrs/kafka/fake/FkMetadataTaskTest.java | 7 +- .../eocqrs/kafka/fake/FkProducerTest.java | 38 +++-- .../eocqrs/kafka/fake/FkRecordsTest.java | 25 +-- .../eocqrs/kafka/fake/storage/InFileTest.java | 15 +- 12 files changed, 227 insertions(+), 143 deletions(-) diff --git a/src/main/java/io/github/eocqrs/kafka/fake/FkRecords.java b/src/main/java/io/github/eocqrs/kafka/fake/FkRecords.java index 5e50ae7c..29066722 100644 --- a/src/main/java/io/github/eocqrs/kafka/fake/FkRecords.java +++ b/src/main/java/io/github/eocqrs/kafka/fake/FkRecords.java @@ -81,21 +81,27 @@ public FkRecords( */ @Override public ConsumerRecords value() throws Exception { - final Map>> part - = new HashMap<>(0); final List> recs = new ListOf<>(); this.datasets.forEach( dataset -> recs.add( new ConsumerRecord<>( this.topic, - DEFAULT_PARTITION, - ZERO_OFFSET, + FkRecords.DEFAULT_PARTITION, + FkRecords.ZERO_OFFSET, null, dataset ) ) ); - part.put(new TopicPartition(this.topic, DEFAULT_PARTITION), recs); + final Map>> part + = new HashMap<>(0); + part.put( + new TopicPartition( + this.topic, + FkRecords.DEFAULT_PARTITION + ), + recs + ); return new ConsumerRecords<>(part); } } diff --git a/src/test/java/io/github/eocqrs/kafka/act/AssignPartitionsTest.java b/src/test/java/io/github/eocqrs/kafka/act/AssignPartitionsTest.java index f3ffc918..94d54330 100644 --- a/src/test/java/io/github/eocqrs/kafka/act/AssignPartitionsTest.java +++ b/src/test/java/io/github/eocqrs/kafka/act/AssignPartitionsTest.java @@ -51,6 +51,9 @@ void assignsPartitions( ) { final Action assign = new AssignPartitions(mck, new TopicPartition("test", 1)); - Assertions.assertDoesNotThrow(assign::apply); + Assertions.assertDoesNotThrow( + assign::apply, + () -> "Creates %s without any exceptions thrown".formatted(assign) + ); } } diff --git a/src/test/java/io/github/eocqrs/kafka/act/CommitAsyncTest.java b/src/test/java/io/github/eocqrs/kafka/act/CommitAsyncTest.java index 20985c12..df1d3986 100644 --- a/src/test/java/io/github/eocqrs/kafka/act/CommitAsyncTest.java +++ b/src/test/java/io/github/eocqrs/kafka/act/CommitAsyncTest.java @@ -49,6 +49,9 @@ void commitsAsync( @Mock final KafkaConsumer mck ) { final Action async = new CommitAsync(mck); - Assertions.assertDoesNotThrow(async::apply); + Assertions.assertDoesNotThrow( + async::apply, + () -> "Creates %s without any exceptions thrown".formatted(async) + ); } } diff --git a/src/test/java/io/github/eocqrs/kafka/act/CommitSyncTest.java b/src/test/java/io/github/eocqrs/kafka/act/CommitSyncTest.java index 0f570b30..33cbcb08 100644 --- a/src/test/java/io/github/eocqrs/kafka/act/CommitSyncTest.java +++ b/src/test/java/io/github/eocqrs/kafka/act/CommitSyncTest.java @@ -49,6 +49,9 @@ void commitsSync( @Mock final KafkaConsumer mck ) { final Action sync = new CommitSync(mck); - Assertions.assertDoesNotThrow(sync::apply); + Assertions.assertDoesNotThrow( + sync::apply, + () -> "Creates %s without any exceptions thrown".formatted(sync) + ); } } diff --git a/src/test/java/io/github/eocqrs/kafka/act/WakeupTest.java b/src/test/java/io/github/eocqrs/kafka/act/WakeupTest.java index c250c629..543fe249 100644 --- a/src/test/java/io/github/eocqrs/kafka/act/WakeupTest.java +++ b/src/test/java/io/github/eocqrs/kafka/act/WakeupTest.java @@ -47,6 +47,9 @@ final class WakeupTest { @Test void wakeups(@Mock final KafkaConsumer mck) { final Action wakeup = new Wakeup(mck); - Assertions.assertDoesNotThrow(wakeup::apply); + Assertions.assertDoesNotThrow( + wakeup::apply, + () -> "Creates %s without any exceptions thrown".formatted(wakeup) + ); } } diff --git a/src/test/java/io/github/eocqrs/kafka/consumer/KfConsumerTest.java b/src/test/java/io/github/eocqrs/kafka/consumer/KfConsumerTest.java index 4cfb57f1..3ba9bff8 100644 --- a/src/test/java/io/github/eocqrs/kafka/consumer/KfConsumerTest.java +++ b/src/test/java/io/github/eocqrs/kafka/consumer/KfConsumerTest.java @@ -22,8 +22,10 @@ package io.github.eocqrs.kafka.consumer; +import com.jcabi.xml.XMLDocument; import io.github.eocqrs.kafka.Consumer; import io.github.eocqrs.kafka.ConsumerSettings; +import io.github.eocqrs.kafka.Params; import io.github.eocqrs.kafka.consumer.settings.KfConsumerParams; import io.github.eocqrs.kafka.parameters.BootstrapServers; import io.github.eocqrs.kafka.parameters.GroupId; @@ -39,6 +41,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.cactoos.io.ResourceOf; import org.cactoos.list.ListOf; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -47,6 +50,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; /** @@ -64,23 +68,27 @@ void subscribesWithoutException( ) { Mockito.when(settings.consumer()).thenReturn(consumer); final Consumer underTest = new KfConsumer<>(settings); - assertDoesNotThrow( - () -> { - underTest.subscribe(new ListOf<>("transactions-info")); - underTest.subscribe("transactions-info"); - underTest.subscribe(new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(final Collection partitions) { - } + assertAll( + () -> assertDoesNotThrow( + () -> { + underTest.subscribe(new ListOf<>("transactions-info")); + underTest.subscribe("transactions-info"); + underTest.subscribe(new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(final Collection partitions) { + } - @Override - public void onPartitionsAssigned(final Collection partitions) { - } - }, "transactions-info"); - } - ); - assertDoesNotThrow( - underTest::close + @Override + public void onPartitionsAssigned(final Collection partitions) { + } + }, "transactions-info"); + }, + "Should create an %s and subscribe without exceptions".formatted(underTest) + ), + () -> assertDoesNotThrow( + underTest::close, + () -> "Should close %s without an exception".formatted(underTest) + ) ); } @@ -90,12 +98,12 @@ void recordsPollingDoesntThrowException( @Mock final KafkaConsumer origin ) { Mockito.when(settings.consumer()).thenReturn(origin); - final Consumer consumer = - new KfConsumer<>(settings); - assertDoesNotThrow(() -> - consumer.records( - "TEST", Duration.ofSeconds(5L) - ) + assertDoesNotThrow( + () -> + new KfConsumer<>(settings).records( + "TEST", Duration.ofSeconds(5L) + ), + () -> "Should to poll things without exception" ); } @@ -107,35 +115,52 @@ void unsubscribesWithoutException( Mockito.when(settings.consumer()).thenReturn(origin); final Consumer consumer = new KfConsumer<>(settings); - assertDoesNotThrow(consumer::unsubscribe); - assertDoesNotThrow(consumer::close); + assertAll( + () -> assertDoesNotThrow(consumer::unsubscribe), + () -> assertDoesNotThrow(consumer::close), + () -> "%s subscribes and unsubscribes without exceptions".formatted(consumer) + ); } @Test void constructsConsumerWithXML() throws Exception { + final String xml = "consumer.xml"; final Consumer consumer = new KfConsumer<>( - new KfXmlFlexible("consumer.xml") + new KfXmlFlexible(xml) .consumer() ); - assertThat(consumer).isNotNull(); + assertThat(consumer).isNotNull() + .describedAs( + "%s should be created from %s without exceptions".formatted( + consumer, + new XMLDocument( + new ResourceOf(xml).stream() + ) + ) + ); } @Test void constructsConsumerWithParams() { + final Params params = new KfParams( + new BootstrapServers("localhost:9092"), + new GroupId("1"), + new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"), + new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer") + ); final Consumer consumer = new KfConsumer<>( new KfFlexible<>( - new KfConsumerParams( - new KfParams( - new BootstrapServers("localhost:9092"), - new GroupId("1"), - new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"), - new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer") - ) - ) + new KfConsumerParams(params) + ) + ); + assertThat(consumer).isNotNull() + .describedAs( + "Consumer %s created from %s without exceptions".formatted( + consumer, + params.asXml() ) ); - assertThat(consumer).isNotNull(); } } diff --git a/src/test/java/io/github/eocqrs/kafka/consumer/settings/KfConsumerParamsTest.java b/src/test/java/io/github/eocqrs/kafka/consumer/settings/KfConsumerParamsTest.java index c46a45b1..086f4778 100644 --- a/src/test/java/io/github/eocqrs/kafka/consumer/settings/KfConsumerParamsTest.java +++ b/src/test/java/io/github/eocqrs/kafka/consumer/settings/KfConsumerParamsTest.java @@ -39,9 +39,10 @@ final class KfConsumerParamsTest { @Test void representsXmlCorrectly() { final Params mock = Mockito.mock(Params.class); - Mockito.when(mock.asXml()).thenReturn("103"); + final String group = "103"; + Mockito.when(mock.asXml()).thenReturn(group); MatcherAssert.assertThat( - "Represents right XML settings", + "Should return %s inside a consumer tag".formatted(group), new KfConsumerParams(mock).asXml(), Matchers.equalTo("\n103\n\n") ); diff --git a/src/test/java/io/github/eocqrs/kafka/fake/FkConsumerTest.java b/src/test/java/io/github/eocqrs/kafka/fake/FkConsumerTest.java index 88ee2119..aa89e583 100644 --- a/src/test/java/io/github/eocqrs/kafka/fake/FkConsumerTest.java +++ b/src/test/java/io/github/eocqrs/kafka/fake/FkConsumerTest.java @@ -51,6 +51,8 @@ import java.util.UUID; import java.util.logging.Level; +import static org.junit.jupiter.api.Assertions.assertAll; + /** * Test case for {@link FkConsumer}. * @@ -81,7 +83,8 @@ void createsWithMockBroker(@Mock final FkBroker mock) throws IOException { final Consumer consumer = new FkConsumer(UUID.randomUUID(), mock); MatcherAssert.assertThat( - "Fake consumer creates with mock broker", + "Should create fake consumer %s with mock broker %s" + .formatted(consumer, mock), consumer, Matchers.notNullValue() ); @@ -96,7 +99,7 @@ void creates() throws IOException { this.broker ); MatcherAssert.assertThat( - "Fake consumer creates", + "Fake consumer %s creates".formatted(consumer), consumer, Matchers.notNullValue() ); @@ -115,25 +118,31 @@ void subscribesToTopics() throws Exception { with ); consumer.subscribe(new ListOf<>(topic)); - MatcherAssert.assertThat( - "topic subscriptions in right format", - with.data( - "broker/subs/sub[topic = '%s']/topic/text()" - .formatted( - topic - ) - ), - Matchers.contains(topic) - ); - MatcherAssert.assertThat( - "Consumer ID in right format", - with.data( - "broker/subs/sub[consumer = '%s']/consumer/text()" - .formatted( - uuid.toString() - ) - ), - Matchers.contains(uuid.toString()) + assertAll( + () -> + MatcherAssert.assertThat( + "topic %s subscriptions in right format" + .formatted(topic), + with.data( + "broker/subs/sub[topic = '%s']/topic/text()" + .formatted( + topic + ) + ), + Matchers.contains(topic) + ), + () -> + MatcherAssert.assertThat( + "Consumer ID %s in right format" + .formatted(uuid), + with.data( + "broker/subs/sub[consumer = '%s']/consumer/text()" + .formatted( + uuid.toString() + ) + ), + Matchers.contains(uuid.toString()) + ) ); consumer.close(); } @@ -165,33 +174,38 @@ public String toString() { } }; consumer.subscribe(listener, topic); - MatcherAssert.assertThat( - "topic subscriptions in right format", - with.data( - "broker/subs/sub[topic = '%s']/topic/text()" - .formatted( - topic - ) - ), - Matchers.contains(topic) - ); - MatcherAssert.assertThat( - "Consumer ID in right format", - with.data( - "broker/subs/sub[consumer = '%s']/consumer/text()" - .formatted( - uuid.toString() - ) + assertAll( + () -> MatcherAssert.assertThat( + "topic %s subscriptions in right format" + .formatted(topic), + with.data( + "broker/subs/sub[topic = '%s']/topic/text()" + .formatted( + topic + ) + ), + Matchers.contains(topic) ), - Matchers.contains(uuid.toString()) - ); - MatcherAssert.assertThat( - "Consumer Rebalance Listener in right format", - with.data( - "broker/subs/sub[listener = '%s']/listener/text()" - .formatted(listener.toString()) + () -> MatcherAssert.assertThat( + "Consumer ID %s in right format" + .formatted(uuid), + with.data( + "broker/subs/sub[consumer = '%s']/consumer/text()" + .formatted( + uuid.toString() + ) + ), + Matchers.contains(uuid.toString()) ), - Matchers.contains(rebalance) + () -> MatcherAssert.assertThat( + "Consumer %s Rebalance Listener %s in right format" + .formatted(consumer, listener), + with.data( + "broker/subs/sub[listener = '%s']/listener/text()" + .formatted(listener.toString()) + ), + Matchers.contains(rebalance) + ) ); consumer.close(); } @@ -206,7 +220,11 @@ void subscribesWithVarArgs() throws Exception { UUID.randomUUID(), with ); - Assertions.assertDoesNotThrow(() -> consumer.subscribe(topic)); + Assertions.assertDoesNotThrow( + () -> consumer.subscribe(topic), + () -> "%s shouldn't throw exception on subscription" + .formatted(consumer) + ); consumer.close(); } @@ -219,7 +237,9 @@ void throwsIfNull() throws IOException { ); Assertions.assertThrows( IllegalStateException.class, - () -> consumer.subscribe((String) null) + () -> consumer.subscribe((String) null), + () -> "Should %s throw ISE on null subscription" + .formatted(consumer) ); consumer.close(); } @@ -233,15 +253,19 @@ void throwsIfNullWithListener() { ); Assertions.assertThrows( IllegalStateException.class, - () -> consumer.subscribe(new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(final Collection collection) { - } + () -> consumer.subscribe( + new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(final Collection collection) { + } - @Override - public void onPartitionsAssigned(final Collection collection) { - } - }, (String) null) + @Override + public void onPartitionsAssigned(final Collection collection) { + } + }, (String) null + ), + () -> "%s should throw exception on null topic" + .formatted(consumer) ); } @@ -335,7 +359,9 @@ void pollsRecordsWithoutException() throws Exception { ); Assertions.assertDoesNotThrow( () -> - consumer.records("test", Duration.ofSeconds(5L)) + consumer.records("test", Duration.ofSeconds(5L)), + () -> + "%s polls without exceptions" ); consumer.close(); } @@ -386,15 +412,18 @@ void pollsRecords() throws Exception { consumer.records(topic, Duration.ofSeconds(1L)); final List datasets = new ListOf<>(); first.forEach(rec -> datasets.add(rec.value())); + final List topics = List.of("test1", "test2", "test3"); MatcherAssert.assertThat( - "First datasets in right format", + "Datasets %s should contain all of %s" + .formatted(datasets, topics), datasets, - Matchers.contains("test1", "test2", "test3") + Matchers.containsInAnyOrder(topics.toArray()) ); datasets.clear(); second.forEach(rec -> datasets.add(rec.value())); MatcherAssert.assertThat( - "Second datasets are empty", + "Second datasets %s should be empty" + .formatted(datasets), datasets.isEmpty(), Matchers.equalTo(true) ); diff --git a/src/test/java/io/github/eocqrs/kafka/fake/FkMetadataTaskTest.java b/src/test/java/io/github/eocqrs/kafka/fake/FkMetadataTaskTest.java index a15ca8e7..c81acb7b 100644 --- a/src/test/java/io/github/eocqrs/kafka/fake/FkMetadataTaskTest.java +++ b/src/test/java/io/github/eocqrs/kafka/fake/FkMetadataTaskTest.java @@ -54,11 +54,10 @@ void getsInRightFormat() throws ExecutionException, InterruptedException { 0 ); final Future future = - new FkMetadataTask( - metadata - ); + new FkMetadataTask(metadata); MatcherAssert.assertThat( - "Metadata in right format", + "Metadata %s in right format for %s" + .formatted(metadata, future.get()), future.get(), Matchers.equalTo(metadata) ); diff --git a/src/test/java/io/github/eocqrs/kafka/fake/FkProducerTest.java b/src/test/java/io/github/eocqrs/kafka/fake/FkProducerTest.java index a9031dbe..2b34ac77 100644 --- a/src/test/java/io/github/eocqrs/kafka/fake/FkProducerTest.java +++ b/src/test/java/io/github/eocqrs/kafka/fake/FkProducerTest.java @@ -25,6 +25,7 @@ package io.github.eocqrs.kafka.fake; import com.jcabi.log.Logger; +import io.github.eocqrs.kafka.Message; import io.github.eocqrs.kafka.Producer; import io.github.eocqrs.kafka.data.Tkv; import io.github.eocqrs.kafka.data.WithPartition; @@ -41,6 +42,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.Future; import java.util.logging.Level; @@ -76,7 +78,8 @@ void createsFakeProducerWithMockBroker(@Mock final FkBroker mock) final Producer producer = new FkProducer<>(UUID.randomUUID(), mock); MatcherAssert.assertThat( - "Fake producer creates with mock broker", + "Fake producer %s created with mock broker %s" + .formatted(producer, mock), producer, Matchers.notNullValue() ); @@ -88,7 +91,8 @@ void createsFakeProducer() throws IOException { final Producer producer = new FkProducer<>(UUID.randomUUID(), this.broker); MatcherAssert.assertThat( - "Fake producer creates", + "Fake producer created %s" + .formatted(producer), producer, Matchers.notNullValue() ); @@ -127,7 +131,10 @@ void sendsMessageWithoutTopicExistence() throws Exception { "data" ) ) - ) + ), + () -> + "Throws %s when topic not exist" + .formatted(IllegalArgumentException.class) ); producer.close(); } @@ -144,25 +151,28 @@ void sendsMessage() throws Exception { UUID.randomUUID(), after ); + final Message tkv = new Tkv<>( + topic, + "test-key", + data + ); producer.send( new WithPartition<>( partition, - new Tkv<>( + tkv + ) + ); + final Collection sent = after.data( + "broker/topics/topic[name = '%s']/datasets/dataset[value = '%s']/text()" + .formatted( topic, - "test-key", data ) - ) ); MatcherAssert.assertThat( - "Sent data is not blank", - after.data( - "broker/topics/topic[name = '%s']/datasets/dataset[value = '%s']/text()" - .formatted( - topic, - data - ) - ).isEmpty(), + "%s data is not blank after producer sent %s" + .formatted(sent, tkv.value()), + sent.isEmpty(), Matchers.equalTo(false) ); producer.close(); diff --git a/src/test/java/io/github/eocqrs/kafka/fake/FkRecordsTest.java b/src/test/java/io/github/eocqrs/kafka/fake/FkRecordsTest.java index 7b005c4b..e765543d 100644 --- a/src/test/java/io/github/eocqrs/kafka/fake/FkRecordsTest.java +++ b/src/test/java/io/github/eocqrs/kafka/fake/FkRecordsTest.java @@ -22,14 +22,12 @@ package io.github.eocqrs.kafka.fake; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.cactoos.list.ListOf; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; -import java.util.function.Consumer; - /** * Test case for {@link FkRecords}. * @@ -46,7 +44,8 @@ void readsRecordsInRightFormat() throws Exception { .value() .forEach(rec -> MatcherAssert.assertThat( - "Records in right format", + "Values %s should contain the record %s" + .formatted(new ListOf<>(value), rec), rec.value(), Matchers.equalTo(value) )); @@ -55,15 +54,17 @@ void readsRecordsInRightFormat() throws Exception { @Test void readsCountInRightFormat() throws Exception { final String topic = "test"; + final ConsumerRecords value = new FkRecords( + topic, + new ListOf<>( + "first", + "second" + ) + ).value(); MatcherAssert.assertThat( - "Records Count in right format", - new FkRecords( - topic, - new ListOf<>( - "first", - "second" - ) - ).value().count(), + "Records %s size in equals 2" + .formatted(value), + value.count(), Matchers.equalTo(2) ); } diff --git a/src/test/java/io/github/eocqrs/kafka/fake/storage/InFileTest.java b/src/test/java/io/github/eocqrs/kafka/fake/storage/InFileTest.java index a36ca9c2..00dec711 100644 --- a/src/test/java/io/github/eocqrs/kafka/fake/storage/InFileTest.java +++ b/src/test/java/io/github/eocqrs/kafka/fake/storage/InFileTest.java @@ -28,6 +28,8 @@ import org.junit.jupiter.api.Test; import org.xembly.Directives; +import static org.junit.jupiter.api.Assertions.assertAll; + /** * Test case for {@link InFile} * @@ -41,7 +43,7 @@ final class InFileTest { void createsStorageInXmlFile() throws Exception { final FkStorage storage = new InFile(); MatcherAssert.assertThat( - "Storage has root tag", + "Storage %s has root tag".formatted(storage.xml()), storage.xml().nodes("broker").isEmpty(), Matchers.equalTo(false) ); @@ -56,7 +58,7 @@ void appliesDirectives() throws Exception { .addIf("servers") ); MatcherAssert.assertThat( - "XML has right format", + "Storage %s contains servers tag".formatted(storage.xml()), storage.xml().nodes("broker/servers").isEmpty(), Matchers.equalTo(false) ); @@ -65,11 +67,10 @@ void appliesDirectives() throws Exception { @Test void locksAndUnlocks() throws Exception { final FkStorage storage = new InFile(); - Assertions.assertDoesNotThrow( - storage::lock - ); - Assertions.assertDoesNotThrow( - storage::unlock + assertAll( + () -> Assertions.assertDoesNotThrow(storage::lock), + () -> Assertions.assertDoesNotThrow(storage::unlock), + () -> "%s locks and unlocks without exceptions".formatted(storage.xml()) ); } } \ No newline at end of file