Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an overload to Kafka extensions for registering multiple instances #2337

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

Alirexaa
Copy link
Collaborator

@Alirexaa Alirexaa commented Dec 5, 2024

What this PR does / why we need it:
This pull request introduces a new overload for the AddKafka method in the KafkaHealthCheckBuilderExtensions class, which allows configuring Kafka health checks using an optionFactory. Additionally, the corresponding method signature is added to the approved API file.

New Overload for Kafka Health Check Configuration:

Update to Approved API:

Please reference the issue this PR will close: #2298

Special notes for your reviewer:

Does this PR introduce a user-facing change?:

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • [] Code compiles correctly
  • Created/updated tests
  • Unit tests passing
  • End-to-end tests passing
  • Extended the documentation
  • Provided sample for the feature

@Alirexaa Alirexaa closed this Dec 5, 2024
@Alirexaa Alirexaa reopened this Dec 5, 2024
@Alirexaa Alirexaa closed this Dec 5, 2024
@Alirexaa Alirexaa reopened this Dec 5, 2024
@Alirexaa Alirexaa closed this Dec 5, 2024
@Alirexaa Alirexaa reopened this Dec 5, 2024
Copy link
Collaborator

@adamsitnik adamsitnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to wait for feedback from Aspire if adding this particular overload is going to solve the problem. If so, we are going to need some new tests (just a few).

Also, I wonder if we should instead add an overload that requires a IProducer<K, V> factory .

Something like this:

public static IHealthChecksBuilder AddKafka<TKey, TValue>(
    this IHealthChecksBuilder builder,
    Func<ServiceProvider, IProducer<TKey, TValue>>? providerFactory = default,
    Func<Message<TKey, TValue>> messageBuilder,
    string topic = DEFAULT_TOPIC,
    string? name = default,
    HealthStatus? failureStatus = default,
    IEnumerable<string>? tags = default,
    TimeSpan? timeout = default)
{
    return builder.Add(new HealthCheckRegistration(
        name ?? NAME,
        sp => new KafkaHealthCheck<TKey, TValue>(providerFactory?.Invoke(sp) ?? sp.GetRequiredService<IProducer<TKey, TValue>>(), messageBuilder, topic),
        failureStatus,
        tags,
        timeout));
}

public class KafkaHealthCheck<TKey, TValue> : IHealthCheck, IDisposable
{
    private readonly IProducer<TKey, TValue> _producer;
    private readonly Func<Message<TKey, TValue>> _messageBuilder;
    private readonly string _topic;

    public KafkaHealthCheck(IProducer<TKey, TValue> producer, Func<Message<TKey, TValue>> messageBuilder, string? topic)
    {
        _producer = producer;
        _messageBuilder = messageBuilder;
        _topic = topic ?? KafkaHealthCheckBuilderExtensions.DEFAULT_TOPIC;
    }

    /// <inheritdoc />
    public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
    {
        try
        {
            Message<TKey, TValue> message = _messageBuilder.Invoke();

            var result = await _producer.ProduceAsync(_topic, message, cancellationToken).ConfigureAwait(false);

            if (result.Status == PersistenceStatus.NotPersisted)
            {
                return new HealthCheckResult(context.Registration.FailureStatus, description: $"Message is not persisted or a failure is raised on health check for kafka.");
            }

            return HealthCheckResult.Healthy();
        }
        catch (Exception ex)
        {
            return new HealthCheckResult(context.Registration.FailureStatus, exception: ex);
        }
    }

    public void Dispose() => _producer.Dispose();
}

This would force the users to register and resolve everything on their own in the DI and we would not need to care about settings.

/// <param name="tags">A list of tags that can be used to filter sets of health checks. Optional.</param>
/// <param name="timeout">An optional <see cref="TimeSpan"/> representing the timeout of the check.</param>
/// <returns>The specified <paramref name="builder"/>.</returns>
public static IHealthChecksBuilder AddKafka(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mitchdenny would adding this overload solve the problem you have faced in Aspire?

I can see that you have solved it by simply reusing the ctor of the health check and registering your own thing in the DI container:

https://github.com/dotnet/aspire/blob/422d777c8dc7df154d9eea268f0d28a35e673f1b/src/Components/Aspire.Confluent.Kafka/AspireKafkaProducerExtensions.cs#L139-L151

/// <returns>The specified <paramref name="builder"/>.</returns>
public static IHealthChecksBuilder AddKafka(
this IHealthChecksBuilder builder,
Func<IServiceProvider, KafkaHealthCheckOptions> optionFactory,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace

Suggested change
Func<IServiceProvider, KafkaHealthCheckOptions> optionFactory,
Func<IServiceProvider, KafkaHealthCheckOptions> optionFactory,

@Alirexaa
Copy link
Collaborator Author

Alirexaa commented Dec 6, 2024

@adamsitnik, I prefer that overload too but this mean we should remove all other overloads and introduce breaking changes. Should we do this?

@adamsitnik
Copy link
Collaborator

@adamsitnik, I prefer that overload too but this mean we should remove all other overloads and introduce breaking changes. Should we do this?

This time I am not 100% convinced as the new approach would require the users to always specify TKey, TValue and the message builder. So it would not be super easy to move to the new version.

I was hoping to get feedback from @mitchdenny but it seems that he just started his winter holiday leave.

@eerhardt what are your thoughts on this? Should we introduce such a breaking change or perhaps just extend the existing ways with a new overload?

@eerhardt
Copy link
Collaborator

eerhardt commented Dec 9, 2024

@eerhardt what are your thoughts on this? Should we introduce such a breaking change or perhaps just extend the existing ways with a new overload?

My default opinion is to not introduce a breaking change if you can help it. If anything, we could create a new IHealthCheck class, if necessary.


return builder.Add(new HealthCheckRegistration(
name ?? NAME,
sp => new KafkaHealthCheck(optionFactory.Invoke(sp)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this callback gets invoked every time the health is checked. So the optionFactory would be invoked each time as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

@Alirexaa
Copy link
Collaborator Author

@eerhardt what are your thoughts on this? Should we introduce such a breaking change or perhaps just extend the existing ways with a new overload?

My default opinion is to not introduce a breaking change if you can help it. If anything, we could create a new IHealthCheck class, if necessary.

I thought about new class and it seems creating a new class is better approach. I think this new factory method could create memory leak.
Also with new class we can resolve singlton client from factory method and mark older class and methods as obsolated.

@Alirexaa
Copy link
Collaborator Author

@adamsitnik Should we add a new HealthCheck class?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow registering multiple Kafka health checks.
3 participants