Skip to content

Commit

Permalink
test(streams): add shouldUseSpecifiedNameForProcessValuesOperation
Browse files Browse the repository at this point in the history
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
  • Loading branch information
fonsdant committed Nov 26, 2024
1 parent 320c2c1 commit 3b42776
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
import org.apache.kafka.test.MockApiFixedKeyProcessorSupplier;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
Expand Down Expand Up @@ -1305,6 +1306,14 @@ public void shouldUseSpecifiedNameForProcessOperation() {
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-processor");
}

@Test
public void shouldUseSpecifiedNameForProcessValuesOperation() {
builder.stream(STREAM_TOPIC).processValues(new MockApiFixedKeyProcessorSupplier<>(), Named.as("test-fixed-key-processor"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-fixed-key-processor");
}

@Test
public void shouldUseSpecifiedNameForPrintOperation() {
builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
Expand Down

0 comments on commit 3b42776

Please sign in to comment.