Skip to content

Commit

Permalink
test(streams-scala): add KStreamTest#testProcessValuesCorrectlyRecords
Browse files Browse the repository at this point in the history
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
  • Loading branch information
fonsdant committed Nov 27, 2024
1 parent 78927da commit 5b8bbc8
Showing 1 changed file with 18 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,9 @@ package org.apache.kafka.streams.scala.kstream

import java.time.Duration.ofSeconds
import java.time.{Duration, Instant}
import org.apache.kafka.streams.kstream.{
JoinWindows,
Named,
ValueTransformer,
ValueTransformerSupplier,
ValueTransformerWithKey,
ValueTransformerWithKeySupplier
}
import org.apache.kafka.streams.kstream.{JoinWindows, Named}
import org.apache.kafka.streams.processor.api
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.processor.api.{Processor, ProcessorSupplier}
import org.apache.kafka.streams.processor.api.{FixedKeyRecord, Processor, ProcessorSupplier}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
Expand All @@ -40,7 +32,6 @@ import org.junit.jupiter.api.Test

import java.util
import java.util.Collections
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

class KStreamTest extends TestDriver {
Expand Down Expand Up @@ -287,64 +278,29 @@ class KStreamTest extends TestDriver {
testDriver.close()
}

@nowarn
@Test
def testCorrectlyFlatTransformValuesInRecords(): Unit = {
class TestTransformer extends ValueTransformer[String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}

override def transform(value: String): Iterable[String] =
Array(s"$value-transformed")

override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"

val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransformValues(new ValueTransformerSupplier[String, Iterable[String]] {
def get(): ValueTransformer[String, Iterable[String]] =
new TestTransformer
})
.to(sinkTopic)

val now = Instant.now()
val testDriver = createTestDriver(builder, now)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)
def testProcessValuesCorrectlyRecords(): Unit = {
val processorSupplier: api.FixedKeyProcessorSupplier[String, String, String] =
() => new api.FixedKeyProcessor[String, String, String] {
private var context: api.FixedKeyProcessorContext[String, String] = _

testInput.pipeInput("1", "value", now)

assertEquals("value-transformed", testOutput.readValue)

assertTrue(testOutput.isEmpty)

testDriver.close()
}

@nowarn
@Test
def testCorrectlyFlatTransformValuesInRecordsWithKey(): Unit = {
class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}
override def init(context: api.FixedKeyProcessorContext[String, String]): Unit = {
this.context = context
}

override def transform(key: String, value: String): Iterable[String] =
Array(s"$value-transformed-$key")
override def process(record: FixedKeyRecord[String, String]): Unit = {
val processedValue = s"${record.value()}-processed"
context.forward(record.withValue(processedValue))
}
}

override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"

val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransformValues(new ValueTransformerWithKeySupplier[String, String, Iterable[String]] {
def get(): ValueTransformerWithKey[String, String, Iterable[String]] =
new TestTransformer
})
.processValues(processorSupplier)
.to(sinkTopic)

val now = Instant.now()
Expand All @@ -354,7 +310,9 @@ class KStreamTest extends TestDriver {

testInput.pipeInput("1", "value", now)

assertEquals("value-transformed-1", testOutput.readValue)
val result = testOutput.readKeyValue()
assertEquals("value-processed", result.value)
assertEquals("1", result.key)

assertTrue(testOutput.isEmpty)

Expand Down

0 comments on commit 5b8bbc8

Please sign in to comment.