Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Commit

Permalink
Enable custom binder health check impelementation
Browse files Browse the repository at this point in the history
Currently, KafkaBinderHealthIndicator is not customizable and included by default
when Spring Boot actuator is on the classpath. Fix this by allowing the application
to provide a custom implementation. A new marker interface called KafkaBinderHealth
can be used by the applicaiton to provide a custom HealthIndicator implementation, in
which case, the binder's default implementation will be excluded.

Tests and docs changes.

Resolves #1180
  • Loading branch information
sobychacko committed Jan 13, 2022
1 parent 31b91f4 commit d345ac8
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 7 deletions.
25 changes: 25 additions & 0 deletions docs/src/main/asciidoc/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -969,3 +969,28 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() {
};
}
```

[[custom-kafka-binder-health-indicator]]
=== Custom Kafka Binder Health Indicator

Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface.
`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`.
In the custom implementation, it must provide an implementation for the `health()` method.
The custom implementation must be present in the application configuration as a bean.
When the binder discovers the custom implementation, it will use that instead of the default implementation.
Here is an example of such a custom implementation bean in the application.

```
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka;

import org.springframework.boot.actuate.health.HealthIndicator;

/**
* Marker interface used for custom KafkaBinderHealth indicator implementations.
*
* @author Soby Chacko
* @since 3.2.2
*/
public interface KafkaBinderHealth extends HealthIndicator {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,6 @@

import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.actuate.health.StatusAggregator;
import org.springframework.kafka.core.ConsumerFactory;
Expand All @@ -55,7 +54,7 @@
* @author Chukwubuikem Ume-Ugwa
* @author Taras Danylchuk
*/
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean {

private static final int DEFAULT_TIMEOUT = 60;

Expand All @@ -73,7 +72,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
private boolean considerDownWhenAnyPartitionHasNoLeader;

public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.consumerFactory = consumerFactory;
}
Expand Down Expand Up @@ -219,7 +218,7 @@ private Health buildListenerContainersHealth() {
}

@Override
public void destroy() throws Exception {
public void destroy() {
executor.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@

import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
Expand All @@ -38,11 +40,13 @@
*
* @author Oleg Zhurakousky
* @author Chukwubuikem Ume-Ugwa
* @author Soby Chacko
*/

@Configuration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
@ConditionalOnEnabledHealthIndicator("binders")
@ConditionalOnMissingBean(KafkaBinderHealth.class)
public class KafkaBinderHealthIndicatorConfiguration {

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.integration2;

import org.junit.ClassRule;
import org.junit.Test;

import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/**
* @author Soby Chacko
*/
public class KafkaBinderCustomHealthCheckTests {

@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);

@Test
public void testCustomHealthIndicatorIsActivated() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
CustomHealthCheckApplication.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.kafka.binder.brokers="
+ embeddedKafka.getEmbeddedKafka().getBrokersAsString());
final KafkaBinderHealth kafkaBinderHealth = applicationContext.getBean(KafkaBinderHealth.class);
assertThat(kafkaBinderHealth).isInstanceOf(CustomHealthIndicator.class);
assertThatThrownBy(() -> applicationContext.getBean(KafkaBinderHealthIndicator.class)).isInstanceOf(NoSuchBeanDefinitionException.class);
applicationContext.close();
}

@SpringBootApplication
static class CustomHealthCheckApplication {

@Bean
public CustomHealthIndicator kafkaBinderHealthIndicator() {
return new CustomHealthIndicator();
}
}

static class CustomHealthIndicator implements KafkaBinderHealth {

@Override
public Health health() {
return null;
}
}
}

0 comments on commit d345ac8

Please sign in to comment.