Skip to content

Commit

Permalink
Auto-create topic with N partitions (#879)
Browse files Browse the repository at this point in the history
* Introduce `KafkaNewTopics`, which creates Kafka topics via `AdminClient`

* Add tests

* Add documentation and code snippets

* Update src/main/docs/guide/kafkaApplications/kafkaNewTopics.adoc

Co-authored-by: Sergio del Amo <[email protected]>

* Update kafka/src/main/java/io/micronaut/configuration/kafka/admin/KafkaNewTopics.java

* Apply suggestions from code review

Co-authored-by: Sergio del Amo <[email protected]>

* Update kafka/src/main/java/io/micronaut/configuration/kafka/admin/KafkaNewTopics.java

* Update kafka/src/test/groovy/io/micronaut/configuration/kafka/admin/KafkaNewTopicsSpec.groovy

* Update KafkaNewTopics

* Remove unused import

* Make `KafkaNewTopics` depend directly on `AdminClient` and `NewTopic`

---------

Co-authored-by: Sergio del Amo <[email protected]>
  • Loading branch information
guillermocalvo and sdelamo authored Oct 3, 2023
1 parent 3ffac07 commit 46f8436
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.admin;

import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Experimental;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Creates Kafka topics via {@link AdminClient}.
*
* @author Guillermo Calvo
* @since 5.2
*/
@Context
@Requires(bean = AdminClient.class)
@Requires(bean = NewTopic.class)
public class KafkaNewTopics {

private static final Logger LOG = LoggerFactory.getLogger(KafkaNewTopics.class);

@NonNull
private final CreateTopicsResult result;

/**
* @param adminClient The Kafka admin client.
* @param options Optional {@link CreateTopicsOptions}.
* @param topics The list of {@link NewTopic} beans to create.
*/
public KafkaNewTopics(
@NonNull AdminClient adminClient,
@Nullable CreateTopicsOptions options,
@NonNull List<NewTopic> topics
) {
LOG.info("Creating new topics: {}", topics);
this.result = adminClient.createTopics(topics, options != null ? options : new CreateTopicsOptions());
}

/**
* @return The {@link CreateTopicsResult}.
*/
@Experimental
@NonNull
public CreateTopicsResult getResult() {
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.micronaut.configuration.kafka.admin

import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic
import spock.util.concurrent.PollingConditions

class KafkaNewTopicsSpec extends AbstractKafkaContainerSpec {

final static TOPIC_1 = 'new-topic-1'
final static TOPIC_2 = 'new-topic-2'
final static TOPIC_3 = '$illegal/topic:name!'

void "create kafka topics"() {
given:
final KafkaNewTopics kafkaNewTopics = context.getBean(KafkaNewTopics)

expect:
new PollingConditions(timeout: 10).eventually {
kafkaNewTopics.result.all().done == true
}
kafkaNewTopics.result.numPartitions(TOPIC_1).get() == 1
kafkaNewTopics.result.numPartitions(TOPIC_2).get() == 2
kafkaNewTopics.result.values()[TOPIC_3].completedExceptionally == true
}

@Requires(property = 'spec.name', value = 'KafkaNewTopicsSpec')
@Factory
static class MyTopicFactory {

@Bean
CreateTopicsOptions options() { new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false) }

@Bean
NewTopic topic1() { new NewTopic(TOPIC_1, Optional.of(1), Optional.empty()) }

@Bean
NewTopic topic2() { new NewTopic(TOPIC_2, Optional.of(2), Optional.empty()) }

@Bean
NewTopic topic3() { new NewTopic(TOPIC_3, Optional.of(4), Optional.empty()) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.micronaut.configuration.kafka.admin

import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Nullable
import jakarta.inject.Inject
import jakarta.inject.Singleton
import org.apache.kafka.clients.admin.NewTopic

@Property(name = "kafka.health.enabled", value = "false")
class KafkaNoNewTopicsSpec extends AbstractKafkaContainerSpec {

void "don't create any kafka topics"() {
expect:
context.findBean(NewTopic).isEmpty()
context.findBean(KafkaNewTopics).isEmpty()
context.getBean(MyService).kafkaNewTopics == null
}

@Requires(property = 'spec.name', value = 'KafkaNoNewTopicsSpec')
@Singleton
static class MyService {
@Inject
@Nullable
KafkaNewTopics kafkaNewTopics
}
}
14 changes: 14 additions & 0 deletions src/main/docs/guide/kafkaApplications/kafkaNewTopics.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

You can automatically add topics to the broker when your application starts. To do so, add a bean of type a link:{kafkaapi}/org/apache/kafka/clients/admin/NewTopic[`NewTopic`] for each topic you want to create. `NewTopic` instances let you specify the name, the number of partitions, the replication factor, the replicas assignments and the configuration properties you want to associate with the new topic. Additionally, you can add a bean of type link:{kafkaapi}/org/apache/kafka/clients/admin/CreateTopicsOptions[`CreateTopicsOptions`] that will be used when the new topics are created.

.Creating New Kafka Topics with Options

snippet::io.micronaut.kafka.docs.admin.MyTopicFactory[tags = 'imports,clazz']

NOTE: Creating topics is not a transactional operation, so it may succeed for some topics while fail for others. This operation also executes asynchronously, so it may take several seconds until all the brokers become aware that the topics have been created.

If you ever need to check if the operation has completed, you can `@Inject` or retrieve the api:io.micronaut.configuration.kafka.admin.KafkaNewTopics[] bean from the application context and then retrieve the link:{kafkaapi}/org/apache/kafka/clients/admin/CreateTopicsResult[operation result] that Kafka returned when the topics were created.

.Checking if Topic Creation is Done

snippet::io.micronaut.kafka.docs.admin.MyTopicFactoryTest[tags = 'result']
1 change: 1 addition & 0 deletions src/main/docs/guide/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ kafkaApplications:
kafkaHealth: Kafka Health Checks
kafkaMetrics: Kafka Metrics
kafkaTracing: Kafka Distributed Tracing
kafkaNewTopics: Creating New Topics
kafkaDisabled: Disabling Micronaut-Kafka
kafkaStreams:
title: Kafka Streams
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.micronaut.kafka.docs.admin

// tag::imports[]
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic
// end::imports[]

@Requires(property = "spec.name", value = "MyTopicFactoryTest")
// tag::clazz[]
@Requires(bean = AdminClient)
@Factory
class MyTopicFactory {

@Bean
CreateTopicsOptions options() {
new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false)
}

@Bean
NewTopic topic1() {
new NewTopic("my-new-topic-1", 1, (short) 1)
}

@Bean
NewTopic topic2() {
new NewTopic("my-new-topic-2", 2, (short) 1)
}
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.micronaut.kafka.docs.admin

import io.micronaut.configuration.kafka.admin.KafkaNewTopics
import io.micronaut.context.annotation.Property
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

@MicronautTest
@Property(name = "spec.name", value = "MyTopicFactoryTest")
class MyTopicFactoryTest extends Specification {

@Inject
KafkaNewTopics newTopics

void "test new topics"() {
expect:
new PollingConditions(timeout: 5).eventually {
areNewTopicsDone(newTopics)
newTopics.getResult().numPartitions("my-new-topic-1").get() == 1
newTopics.getResult().numPartitions("my-new-topic-2").get() == 2
}
}

// tag::result[]
boolean areNewTopicsDone(KafkaNewTopics newTopics) {
newTopics.result.all().done
}
// end::result[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.micronaut.kafka.docs.admin

// tag::imports[]
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic
// end::imports[]

@Requires(property = "spec.name", value = "MyTopicFactoryTest")
// tag::clazz[]
@Requires(bean = AdminClient::class)
@Factory
class MyTopicFactory {

@Bean
fun options(): CreateTopicsOptions {
return CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false)
}

@Bean
fun topic1(): NewTopic {
return NewTopic("my-new-topic-1", 1, 1)
}

@Bean
fun topic2(): NewTopic {
return NewTopic("my-new-topic-2", 2, 1)
}
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.micronaut.kafka.docs.admin

import io.micronaut.configuration.kafka.admin.KafkaNewTopics
import io.micronaut.context.ApplicationContext
import org.awaitility.Awaitility
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit

internal class MyTopicFactoryTest {

@Test
@Throws(ExecutionException::class, InterruptedException::class)
fun testNewTopics() {
ApplicationContext.run(
mapOf(
"kafka.enabled" to "true",
"spec.name" to "MyTopicFactoryTest"
)
).use { ctx ->
val newTopics = ctx.getBean(KafkaNewTopics::class.java)
Awaitility.await().atMost(5, TimeUnit.SECONDS).until { areNewTopicsDone(newTopics) }
assertEquals(1, newTopics.result.numPartitions("my-new-topic-1").get())
assertEquals(2, newTopics.result.numPartitions("my-new-topic-2").get())
}
}

// tag::result[]
fun areNewTopicsDone(newTopics: KafkaNewTopics): Boolean {
return newTopics.result.all().isDone
}
// end::result[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.micronaut.kafka.docs.admin;

// tag::imports[]
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
// end::imports[]

@Requires(property = "spec.name", value = "MyTopicFactoryTest")
// tag::clazz[]
@Requires(bean = AdminClient.class)
@Factory
public class MyTopicFactory {

@Bean
CreateTopicsOptions options() {
return new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false);
}

@Bean
NewTopic topic1() {
return new NewTopic("my-new-topic-1", 1, (short) 1);
}

@Bean
NewTopic topic2() {
return new NewTopic("my-new-topic-2", 2, (short) 1);
}
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.micronaut.kafka.docs.admin;

import io.micronaut.configuration.kafka.admin.KafkaNewTopics;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.concurrent.ExecutionException;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

class MyTopicFactoryTest {

@Test
void testNewTopics() throws ExecutionException, InterruptedException {
try (ApplicationContext ctx = ApplicationContext.run(Map.of(
"kafka.enabled", "true",
"spec.name", "MyTopicFactoryTest"
))) {
final KafkaNewTopics newTopics = ctx.getBean(KafkaNewTopics.class);
await().atMost(5, SECONDS).until(() -> areNewTopicsDone(newTopics));
assertEquals(1, newTopics.getResult().numPartitions("my-new-topic-1").get());
assertEquals(2, newTopics.getResult().numPartitions("my-new-topic-2").get());
}
}

// tag::result[]
boolean areNewTopicsDone(KafkaNewTopics newTopics) {
return newTopics.getResult().all().isDone();
}
// end::result[]
}

0 comments on commit 46f8436

Please sign in to comment.