-
Notifications
You must be signed in to change notification settings - Fork 525
/
Copy pathAspireKafkaProducerExtensions.cs
243 lines (205 loc) · 14.6 KB
/
AspireKafkaProducerExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using Aspire;
using Aspire.Confluent.Kafka;
using Confluent.Kafka;
using HealthChecks.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
namespace Microsoft.Extensions.Hosting;
/// <summary>
/// Extension methods for connecting to a Kafka broker.
/// </summary>
public static class AspireKafkaProducerExtensions
{
private const string DefaultConfigSectionName = "Aspire:Confluent:Kafka:Producer";
/// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName)
=> AddKafkaProducerInternal<TKey, TValue>(builder, null, null, connectionName, serviceKey: null);
/// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<KafkaProducerSettings>? configureSettings)
=> AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, null, connectionName, serviceKey: null);
/// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
=> AddKafkaProducerInternal<TKey, TValue>(builder, null, Wrap(configureBuilder), connectionName, serviceKey: null);
/// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
=> AddKafkaProducerInternal<TKey, TValue>(builder, null, configureBuilder, connectionName, serviceKey: null);
/// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<KafkaProducerSettings>? configureSettings, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
=> AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, Wrap(configureBuilder), connectionName, serviceKey: null);
/// <summary>
/// Registers <see cref="IProducer{TKey,TValue}"/> as a singleton in the services provided by the <paramref name="builder"/>.
/// </summary>
/// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
/// <param name="connectionName">A name used to retrieve the connection string from the ConnectionStrings configuration section.</param>
/// <param name="configureSettings">An optional method used for customizing the <see cref="KafkaProducerSettings"/>.</param>
/// <param name="configureBuilder">A method used for customizing the <see cref="ProducerBuilder{TKey,TValue}"/>.</param>
/// <remarks>Reads the configuration from "Aspire:Kafka:Producer" section.</remarks>
public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<KafkaProducerSettings>? configureSettings, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
=> AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, configureBuilder, connectionName, serviceKey: null);
/// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name)
{
ArgumentException.ThrowIfNullOrEmpty(name);
AddKafkaProducerInternal<TKey, TValue>(builder, null, null, connectionName: name, serviceKey: name);
}
/// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<KafkaProducerSettings>? configureSettings)
{
ArgumentException.ThrowIfNullOrEmpty(name);
AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, null, connectionName: name, serviceKey: name);
}
/// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
{
ArgumentException.ThrowIfNullOrEmpty(name);
AddKafkaProducerInternal<TKey, TValue>(builder, null, Wrap(configureBuilder), connectionName: name, serviceKey: name);
}
/// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
{
ArgumentException.ThrowIfNullOrEmpty(name);
AddKafkaProducerInternal<TKey, TValue>(builder, null, configureBuilder, connectionName: name, serviceKey: name);
}
/// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<KafkaProducerSettings>? configureSettings, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
{
ArgumentException.ThrowIfNullOrEmpty(name);
AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, Wrap(configureBuilder), connectionName: name, serviceKey: name);
}
/// <summary>
/// Registers <see cref="IProducer{TKey,TValue}"/> as a keyed singleton for the given <paramref name="name"/> in the services provided by the <paramref name="builder"/>.
/// </summary>
/// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
/// <param name="name">The name of the component, which is used as the <see cref="ServiceDescriptor.ServiceKey"/> of the service and also to retrieve the connection string from the ConnectionStrings configuration section.</param>
/// <param name="configureSettings">An optional method used for customizing the <see cref="KafkaProducerSettings"/>.</param>
/// <param name="configureBuilder">An optional method used for customizing the <see cref="ProducerBuilder{TKey,TValue}"/>.</param>
/// <remarks>Reads the configuration from "Aspire:Kafka:Producer:{name}" section.</remarks>
public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<KafkaProducerSettings>? configureSettings, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
{
ArgumentException.ThrowIfNullOrEmpty(name);
AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, configureBuilder, connectionName: name, serviceKey: name);
}
private static void AddKafkaProducerInternal<TKey, TValue>(
IHostApplicationBuilder builder,
Action<KafkaProducerSettings>? configureSettings,
Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder,
string connectionName,
string? serviceKey)
{
ArgumentNullException.ThrowIfNull(builder);
var settings = BuildProducerSettings(builder, configureSettings, connectionName);
if (serviceKey is null)
{
builder.Services.AddSingleton<ProducerConnectionFactory<TKey, TValue>>(sp => CreateProducerConnectionFactory<TKey, TValue>(sp, configureBuilder, settings));
builder.Services.AddSingleton<IProducer<TKey, TValue>>(sp => sp.GetRequiredService<ProducerConnectionFactory<TKey, TValue>>().Create());
}
else
{
builder.Services.AddKeyedSingleton<ProducerConnectionFactory<TKey, TValue>>(serviceKey, (sp, key) => CreateProducerConnectionFactory<TKey, TValue>(sp, configureBuilder, settings));
builder.Services.AddKeyedSingleton<IProducer<TKey, TValue>>(serviceKey, (sp, key) => sp.GetRequiredKeyedService<ProducerConnectionFactory<TKey, TValue>>(key).Create());
}
if (!settings.DisableMetrics)
{
builder.Services.TryAddSingleton<MetricsChannel>();
builder.Services.AddHostedService<MetricsService>();
builder.Services.TryAddSingleton<ConfluentKafkaMetrics>();
builder.Services.AddOpenTelemetry().WithMetrics(metricBuilderProvider => metricBuilderProvider.AddMeter(ConfluentKafkaCommon.MeterName));
}
if (!settings.DisableHealthChecks)
{
string healthCheckName = serviceKey is null
? ConfluentKafkaCommon.ProducerHealthCheckName
: string.Concat(ConfluentKafkaCommon.KeyedProducerHealthCheckName, connectionName);
builder.Services.TryAddKeyedSingleton<KafkaHealthCheck>(healthCheckName,
(sp, _) =>
{
var connectionFactory = serviceKey is null
? sp.GetRequiredService<ProducerConnectionFactory<TKey, TValue>>()
: sp.GetRequiredKeyedService<ProducerConnectionFactory<TKey, TValue>>(serviceKey);
var options = new KafkaHealthCheckOptions();
options.Configuration = new ProducerConfig(connectionFactory.Config.ToDictionary());
options.Configuration.SocketTimeoutMs = 1000;
options.Configuration.MessageTimeoutMs = 1000;
options.Configuration.StatisticsIntervalMs = 0;
return new KafkaHealthCheck(options);
});
builder.TryAddHealthCheck(new HealthCheckRegistration(healthCheckName,
sp => sp.GetRequiredKeyedService<KafkaHealthCheck>(healthCheckName),
failureStatus: default,
tags: default));
}
}
private static ProducerConnectionFactory<TKey, TValue> CreateProducerConnectionFactory<TKey, TValue>(IServiceProvider serviceProvider, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder, KafkaProducerSettings settings)
=> new(CreateProducerBuilder(serviceProvider, configureBuilder, settings), settings.Config);
private static ProducerBuilder<TKey, TValue> CreateProducerBuilder<TKey, TValue>(IServiceProvider serviceProvider, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder, KafkaProducerSettings settings)
{
settings.Validate();
ProducerBuilder<TKey, TValue> builder = new(settings.Config);
ILogger logger = serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(ConfluentKafkaCommon.LogCategoryName);
configureBuilder?.Invoke(serviceProvider, builder);
try
{
void OnLog(IProducer<TKey, TValue> _, LogMessage logMessage) =>
logger.Log((LogLevel)logMessage.LevelAs(LogLevelType.MicrosoftExtensionsLogging), logMessage.Facility?.GetHashCode() ?? 0, logMessage.Message, null, static (value, ex) => value);
builder.SetLogHandler(OnLog);
}
catch (InvalidOperationException)
{
logger.LogWarning("LogHandler is already set. Skipping... No logs will be written.");
}
if (!settings.DisableMetrics)
{
MetricsChannel channel = serviceProvider.GetRequiredService<MetricsChannel>();
void OnStatistics(IProducer<TKey, TValue> _, string json)
{
if (string.IsNullOrEmpty(json))
{
return;
}
// StatisticsHandler is called on the producer poll thread, we need to offload the processing
// to avoid slowing the producer down.
channel.Writer.TryWrite(json);
};
try
{
builder.SetStatisticsHandler(OnStatistics);
}
catch (InvalidOperationException)
{
logger.LogWarning("StatisticsHandler is already set. Skipping... No metrics will be exposed.");
}
}
return builder;
}
private static KafkaProducerSettings BuildProducerSettings(IHostApplicationBuilder builder, Action<KafkaProducerSettings>? configureSettings, string connectionName)
{
var configSection = builder.Configuration.GetSection(DefaultConfigSectionName);
var namedConfigSection = configSection.GetSection(connectionName);
KafkaProducerSettings settings = new();
configSection.Bind(settings);
namedConfigSection.Bind(settings);
// Manually bind the ProducerConfig until https://github.com/dotnet/runtime/issues/96652 is fixed
configSection.GetSection(nameof(KafkaProducerSettings.Config)).Bind(settings.Config);
namedConfigSection.GetSection(nameof(KafkaProducerSettings.Config)).Bind(settings.Config);
if (builder.Configuration.GetConnectionString(connectionName) is string connectionString)
{
settings.ConnectionString = connectionString;
}
configureSettings?.Invoke(settings);
settings.Consolidate();
return settings;
}
private static Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? Wrap<TKey, TValue>(Action<ProducerBuilder<TKey, TValue>>? action)
{
if (action is null)
{
return null;
}
return (_, builder) => action(builder);
}
}