Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Kafka 2.3.1
Browse files Browse the repository at this point in the history
Gustavo De Micheli committed Nov 12, 2019
1 parent a02a82a commit b2fde12
Showing 6 changed files with 17 additions and 28 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -2,8 +2,8 @@ import sbtrelease.Version

parallelExecution in ThisBuild := false

val kafkaVersion = "2.0.0"
val confluentVersion = "5.0.0"
val kafkaVersion = "2.3.1"
val confluentVersion = "5.3.1"
val akkaVersion = "2.5.14"

lazy val commonSettings = Seq(
Original file line number Diff line number Diff line change
@@ -13,9 +13,7 @@ import org.apache.avro.specific.{
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
extends Deserializer[T]
with NoOpConfiguration
with NoOpClose {
extends Deserializer[T] {

private val reader = new SpecificDatumReader[T](schema)

@@ -25,10 +23,7 @@ class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
}
}

class KafkaAvroSerializer[T <: SpecificRecord]()
extends Serializer[T]
with NoOpConfiguration
with NoOpClose {
class KafkaAvroSerializer[T <: SpecificRecord]() extends Serializer[T] {

private def toBytes(nullableData: T): Array[Byte] =
Option(nullableData).fold[Array[Byte]](null) { data =>
@@ -46,11 +41,3 @@ class KafkaAvroSerializer[T <: SpecificRecord]()
override def serialize(topic: String, data: T): Array[Byte] =
toBytes(data)
}

sealed trait NoOpConfiguration {
def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
}

sealed trait NoOpClose {
def close(): Unit = ()
}
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
val streams =
new KafkaStreams(topology, streamConfig(streamId, extraConfig))
new KafkaStreams(topology, streamProps(streamId, extraConfig))
streams.start()
try {
block
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.manub.embeddedkafka.streams

import java.nio.file.Files
import java.util.Properties

import net.manub.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetResetStrategy}
@@ -17,9 +18,9 @@ trait TestStreamsConfig {
* @param kafkaConfig the Kafka test configuration
* @return the Streams configuration
*/
def streamConfig(streamName: String,
extraConfig: Map[String, AnyRef] = Map.empty)(
implicit kafkaConfig: EmbeddedKafkaConfig): StreamsConfig = {
def streamProps(streamName: String,
extraConfig: Map[String, AnyRef] = Map.empty)(
implicit kafkaConfig: EmbeddedKafkaConfig): Properties = {
import scala.collection.JavaConverters._

val defaultConfig = Map(
@@ -33,6 +34,9 @@ trait TestStreamsConfig {
)
val configOverwrittenByExtra = defaultConfig ++
extraConfig
new StreamsConfig(configOverwrittenByExtra.asJava)

val props = new Properties()
props.putAll(configOverwrittenByExtra.asJava)
props
}
}
Original file line number Diff line number Diff line change
@@ -39,11 +39,9 @@ trait EmbeddedKafkaStreamsWithSchemaRegistry
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
val streams =
new KafkaStreams(
topology,
streamConfig(streamId,
extraConfig ++ consumerConfigForSchemaRegistry))
val streams = new KafkaStreams(
topology,
streamProps(streamId, extraConfig ++ consumerConfigForSchemaRegistry))
streams.start()
try {
block
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "2.1.0-SNAPSHOT"
version in ThisBuild := "2.3.1-SNAPSHOT"

0 comments on commit b2fde12

Please sign in to comment.