diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 9f7461f0cdf1..dd3ee9f32733 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -401,6 +401,7 @@ private Map topicConfig( () -> adminClient.get().describeConfigs(request).all().get(), ExecutorUtil.RetryBehaviour.ON_RETRYABLE).get(resource); return config.entries().stream() + .filter(e -> e.value() != null) .filter(e -> includeDefaults || !e.isDefault()) .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value)); } catch (final Exception e) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 182843d7e251..100d40a729da 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -534,6 +534,23 @@ public void shouldGetTopicConfig() { assertThat(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is("1")); } + @Test + public void shouldNotFailWhenTopicConfigValueIsNull() { + // Given: + givenTopicConfigs( + "fred", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"), + overriddenConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, null) + ); + + // When: + final Map config = kafkaTopicClient.getTopicConfig("fred"); + + // Then: + assertThat(config.get(TopicConfig.RETENTION_MS_CONFIG), is("12345")); + assertThat(config.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is(false)); + } + @Test public void shouldGetTopicCleanUpPolicyDelete() { // Given: