diff --git a/modules/kafka/src/main/scala/com/dimafeng/testcontainers/KafkaContainer.scala b/modules/kafka/src/main/scala/com/dimafeng/testcontainers/KafkaContainer.scala index a065804d..e2488706 100644 --- a/modules/kafka/src/main/scala/com/dimafeng/testcontainers/KafkaContainer.scala +++ b/modules/kafka/src/main/scala/com/dimafeng/testcontainers/KafkaContainer.scala @@ -1,51 +1,29 @@ package com.dimafeng.testcontainers import org.testcontainers.containers.{KafkaContainer => JavaKafkaContainer} +import org.testcontainers.utility.DockerImageName -class KafkaContainer(confluentPlatformVersion: Option[String] = None, - externalZookeeper: Option[String] = None) extends SingleContainer[JavaKafkaContainer] { +case class KafkaContainer(dockerImageName: DockerImageName = DockerImageName.parse(KafkaContainer.defaultDockerImageName) + ) extends SingleContainer[JavaKafkaContainer] { - @deprecated("Please use reflective methods of the scala container or `configure` method") - val kafkaContainer: JavaKafkaContainer = { - if (confluentPlatformVersion.isEmpty) { - new JavaKafkaContainer() - } else { - new JavaKafkaContainer(confluentPlatformVersion.get) - } - } - - if (externalZookeeper.isEmpty) { - kafkaContainer.withEmbeddedZookeeper() - } else { - kafkaContainer.withExternalZookeeper(externalZookeeper.get) - } - - override val container: JavaKafkaContainer = kafkaContainer + override val container: JavaKafkaContainer = new JavaKafkaContainer(dockerImageName) def bootstrapServers: String = container.getBootstrapServers } object KafkaContainer { + val defaultImage = "confluentinc/cp-kafka" val defaultTag = "5.2.1" + val defaultDockerImageName = s"$defaultImage:$defaultTag" - def apply(confluentPlatformVersion: String = null, - externalZookeeper: String = null): KafkaContainer = { - new KafkaContainer(Option(confluentPlatformVersion), Option(externalZookeeper)) - } - - case class Def( - confluentPlatformVersion: String = defaultTag, - externalZookeeper: Option[String] = None - ) extends ContainerDef { + case class Def(dockerImageName: DockerImageName = DockerImageName.parse(KafkaContainer.defaultDockerImageName) + ) extends ContainerDef { override type Container = KafkaContainer override def createContainer(): KafkaContainer = { - new KafkaContainer( - confluentPlatformVersion = Some(confluentPlatformVersion), - externalZookeeper = externalZookeeper - ) + new KafkaContainer(dockerImageName) } } -} +} \ No newline at end of file diff --git a/modules/kafka/src/test/scala/com/dimafeng/testcontainers/integration/SchemaRegistrySpec.scala b/modules/kafka/src/test/scala/com/dimafeng/testcontainers/integration/SchemaRegistrySpec.scala index d828154b..17f78167 100644 --- a/modules/kafka/src/test/scala/com/dimafeng/testcontainers/integration/SchemaRegistrySpec.scala +++ b/modules/kafka/src/test/scala/com/dimafeng/testcontainers/integration/SchemaRegistrySpec.scala @@ -7,6 +7,7 @@ import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.testcontainers.containers.Network +import org.testcontainers.utility.DockerImageName import java.util.Properties import scala.collection.JavaConverters._ @@ -26,26 +27,26 @@ class SchemaRegistrySpec extends AnyFlatSpec with ForAllTestContainer with Match //a way to communicate containers val network: Network = Network.newNetwork() - val kafkaContainer: KafkaContainer = KafkaContainer.Def(kafkaVersion).createContainer() + val kafkaContainer: KafkaContainer = KafkaContainer.Def(DockerImageName.parse(s"confluentinc/cp-kafka:$kafkaVersion")).createContainer() val schemaRegistryContainer: GenericContainer = SchemaRegistryContainer.Def(network, hostName, kafkaVersion).createContainer() kafkaContainer.container - .withNetwork(network) - .withNetworkAliases(hostName) - .withEnv( - Map[String, String]( - "KAFKA_BROKER_ID" -> brokerId.toString, - "KAFKA_HOST_NAME" -> hostName, - "KAFKA_AUTO_CREATE_TOPICS_ENABLE" -> "false" - ).asJava - ) + .withNetwork(network) + .withNetworkAliases(hostName) + .withEnv( + Map[String, String]( + "KAFKA_BROKER_ID" -> brokerId.toString, + "KAFKA_HOST_NAME" -> hostName, + "KAFKA_AUTO_CREATE_TOPICS_ENABLE" -> "false" + ).asJava + ) override val container: MultipleContainers = MultipleContainers(kafkaContainer, schemaRegistryContainer) def getKafkaAddress: String = kafkaContainer.bootstrapServers def getSchemaRegistryAddress: String = - s"http://${schemaRegistryContainer.container.getHost}:${schemaRegistryContainer.container.getMappedPort(SchemaRegistryContainer.defaultSchemaPort)}" + s"http://${schemaRegistryContainer.container.getHost}:${schemaRegistryContainer.container.getMappedPort(SchemaRegistryContainer.defaultSchemaPort)}" "Schema registry container" should "be started" in {