Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Avoid Kafka health check singleton bug.
Browse files Browse the repository at this point in the history
mitchdenny committed Sep 16, 2024
1 parent 2d9fd8c commit cea16de
Showing 2 changed files with 27 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/Aspire.Hosting.Kafka/Aspire.Hosting.Kafka.csproj
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@
<ItemGroup>
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
</ItemGroup>
<ItemGroup>
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" />
29 changes: 25 additions & 4 deletions src/Aspire.Hosting.Kafka/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -3,7 +3,10 @@

using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using HealthChecks.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Aspire.Hosting;

@@ -44,10 +47,28 @@ public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedAp
});

var healthCheckKey = $"{name}_check";
builder.Services.AddHealthChecks().AddKafka(config =>
{
config.BootstrapServers = connectionString ?? throw new InvalidOperationException("Connection string is unavailable");
}, name: healthCheckKey);

// NOTE: We cannot use AddKafka here because it registers the health check as a singleton
// which means if you have multiple Kafka resources the factory callback will end
// up using the connection string of the last Kafka resource that was added. The
// client packages also have to work around this issue.
//
// SEE: https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2298
var healthCheckRegistration = new HealthCheckRegistration(
healthCheckKey,
sp =>
{
var options = new KafkaHealthCheckOptions();
options.Configuration = new ProducerConfig();
options.Configuration.BootstrapServers = connectionString ?? throw new InvalidOperationException("Connection string is unavailable");
options.Configuration.SocketTimeoutMs = 1000;
options.Configuration.MessageTimeoutMs = 1000;
options.Configuration.StatisticsIntervalMs = 0;
return new KafkaHealthCheck(options);
},
failureStatus: default,
tags: default);
builder.Services.AddHealthChecks().Add(healthCheckRegistration);

return builder.AddResource(kafka)
.WithEndpoint(targetPort: KafkaBrokerPort, port: port, name: KafkaServerResource.PrimaryEndpointName)

0 comments on commit cea16de

Please sign in to comment.