Skip to content

Commit

Permalink
[Instrumentation.ConfluentKafka] Add named instrumentation support (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
g7ed6e authored Sep 18, 2024
1 parent 95b0372 commit 18c5a26
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.TryExtractPropagatio
static Confluent.Kafka.OpenTelemetryProducerBuilderExtensions.AsInstrumentedProducerBuilder<TKey, TValue>(this Confluent.Kafka.ProducerBuilder<TKey, TValue>! producerBuilder) -> Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>! consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>? consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>! producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>? producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>! consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>? consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>! producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>? producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
virtual Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>.Invoke(Confluent.Kafka.ConsumeResult<TKey, TValue>! consumeResult, System.Diagnostics.Activity? activity, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

## Unreleased

- Add named instrumentation support
([#2074](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2074))

## 0.1.0-alpha.1

Released 2024-Sep-16

* Initial release
- Initial release
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using OpenTelemetry.Instrumentation.ConfluentKafka;

namespace Confluent.Kafka;
Expand All @@ -13,6 +12,8 @@ namespace Confluent.Kafka;
/// <typeparam name="TValue">Type of value.</typeparam>
public sealed class InstrumentedConsumerBuilder<TKey, TValue> : ConsumerBuilder<TKey, TValue>
{
private readonly ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options = new();

/// <summary>
/// Initializes a new instance of the <see cref="InstrumentedConsumerBuilder{TKey, TValue}"/> class.
/// </summary>
Expand All @@ -22,19 +23,27 @@ public InstrumentedConsumerBuilder(IEnumerable<KeyValuePair<string, string>> con
{
}

internal ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>? Options { get; set; }
internal bool EnableMetrics
{
get => this.options.Metrics;
set => this.options.Metrics = value;
}

internal bool EnableTraces
{
get => this.options.Traces;
set => this.options.Traces = value;
}

/// <summary>
/// Build a new IConsumer instance.
/// </summary>
/// <returns>an <see cref="IProducer{TKey,TValue}"/>.</returns>
public override IConsumer<TKey, TValue> Build()
{
Debug.Assert(this.Options != null, "Options should not be null.");

ConsumerConfig config = (ConsumerConfig)this.Config;

var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.Options!);
var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.options);
consumer.GroupId = config.GroupId;

return consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public InstrumentedProducer(

public string Name => this.producer.Name;

internal ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> Options => this.options;

public int AddBrokers(string brokers)
{
return this.producer.AddBrokers(brokers);
Expand Down Expand Up @@ -326,6 +324,11 @@ private static void RecordPublish(TopicPartition topicPartition, TimeSpan durati

private Activity? StartPublishActivity(DateTimeOffset start, string topic, Message<TKey, TValue> message, int? partition = null)
{
if (!this.options.Traces)
{
return null;
}

var spanName = string.Concat(topic, " ", ConfluentKafkaCommon.PublishOperationName);
var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(name: spanName, kind: ActivityKind.Producer, startTime: start);
if (activity == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using OpenTelemetry.Instrumentation.ConfluentKafka;

namespace Confluent.Kafka;
Expand All @@ -13,6 +12,8 @@ namespace Confluent.Kafka;
/// <typeparam name="TValue">Type of value.</typeparam>
public sealed class InstrumentedProducerBuilder<TKey, TValue> : ProducerBuilder<TKey, TValue>
{
private readonly ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> options = new();

/// <summary>
/// Initializes a new instance of the <see cref="InstrumentedProducerBuilder{TKey, TValue}"/> class.
/// </summary>
Expand All @@ -22,16 +23,24 @@ public InstrumentedProducerBuilder(IEnumerable<KeyValuePair<string, string>> con
{
}

internal ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>? Options { get; set; }
internal bool EnableMetrics
{
get => this.options.Metrics;
set => this.options.Metrics = value;
}

internal bool EnableTraces
{
get => this.options.Traces;
set => this.options.Traces = value;
}

/// <summary>
/// Build a new IProducer instance.
/// </summary>
/// <returns>an <see cref="IProducer{TKey,TValue}"/>.</returns>
public override IProducer<TKey, TValue> Build()
{
Debug.Assert(this.Options != null, "Options should not be null.");

return new InstrumentedProducer<TKey, TValue>(base.Build(), this.Options!);
return new InstrumentedProducer<TKey, TValue>(base.Build(), this.options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using OpenTelemetry.Instrumentation.ConfluentKafka;
using OpenTelemetry.Internal;

Expand All @@ -25,6 +24,18 @@ public static MeterProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>
this MeterProviderBuilder builder)
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: null, consumerBuilder: null);

/// <summary>
/// Enables automatic data collection of outgoing requests to Kafka.
/// </summary>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TValue">The type of the value.</typeparam>
/// <param name="builder"><see cref="MeterProviderBuilder"/> being configured.</param>
/// <param name="name">The name of the instrumentation.</param>
/// <returns>The instance of <see cref="MeterProviderBuilder"/> to chain the calls.</returns>
public static MeterProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>(
this MeterProviderBuilder builder, string? name)
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: name, consumerBuilder: null);

/// <summary>
/// Enables automatic data collection of outgoing requests to Kafka.
/// </summary>
Expand Down Expand Up @@ -58,34 +69,21 @@ public static MeterProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>
{
Guard.ThrowIfNull(builder);

name ??= Options.DefaultName;

builder.ConfigureServices(services =>
{
services.Configure<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>(name, EnableMetrics);
});

return builder
.AddMeter(ConfluentKafkaCommon.InstrumentationName)
.AddInstrumentation(sp =>
{
if (consumerBuilder == null)
if (name == null)
{
consumerBuilder = sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
var options = sp.GetRequiredService<IOptionsMonitor<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>>();
consumerBuilder.Options = options.Get(name);
consumerBuilder ??= sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
}

if (consumerBuilder.Options == null)
else
{
consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>();
EnableMetrics(consumerBuilder.Options);
consumerBuilder ??= sp.GetRequiredKeyedService<InstrumentedConsumerBuilder<TKey, TValue>>(name);
}

consumerBuilder.EnableMetrics = true;
return new ConfluentKafkaConsumerInstrumentation<TKey, TValue>(consumerBuilder);
});
}

private static void EnableMetrics<TKey, TValue>(ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options) =>
options.Metrics = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using OpenTelemetry.Instrumentation.ConfluentKafka;
using OpenTelemetry.Internal;

Expand All @@ -25,6 +24,18 @@ public static MeterProviderBuilder AddKafkaProducerInstrumentation<TKey, TValue>
this MeterProviderBuilder builder)
=> AddKafkaProducerInstrumentation<TKey, TValue>(builder, name: null, producerBuilder: null);

/// <summary>
/// Enables automatic data collection of outgoing requests to Kafka.
/// </summary>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TValue">The type of the value.</typeparam>
/// <param name="builder"><see cref="MeterProviderBuilder"/> being configured.</param>
/// <param name="name">The name of the instrumentation.</param>
/// <returns>The instance of <see cref="MeterProviderBuilder"/> to chain the calls.</returns>
public static MeterProviderBuilder AddKafkaProducerInstrumentation<TKey, TValue>(
this MeterProviderBuilder builder, string? name)
=> AddKafkaProducerInstrumentation<TKey, TValue>(builder, name: name, producerBuilder: null);

/// <summary>
/// Enables automatic data collection of outgoing requests to Kafka.
/// </summary>
Expand Down Expand Up @@ -58,34 +69,21 @@ public static MeterProviderBuilder AddKafkaProducerInstrumentation<TKey, TValue>
{
Guard.ThrowIfNull(builder);

name ??= Options.DefaultName;

builder.ConfigureServices(services =>
{
services.Configure<ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>>(name, EnableMetrics);
});

return builder
.AddMeter(ConfluentKafkaCommon.InstrumentationName)
.AddInstrumentation(sp =>
{
if (producerBuilder == null)
if (name == null)
{
producerBuilder = sp.GetRequiredService<InstrumentedProducerBuilder<TKey, TValue>>();
var options = sp.GetRequiredService<IOptionsMonitor<ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>>>();
producerBuilder.Options = options.Get(name);
producerBuilder ??= sp.GetRequiredService<InstrumentedProducerBuilder<TKey, TValue>>();
}

if (producerBuilder.Options == null)
else
{
producerBuilder.Options = new ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>();
EnableMetrics(producerBuilder.Options);
producerBuilder ??= sp.GetRequiredKeyedService<InstrumentedProducerBuilder<TKey, TValue>>(name);
}

producerBuilder.EnableMetrics = true;
return new ConfluentKafkaProducerInstrumentation<TKey, TValue>(producerBuilder);
});
}

private static void EnableMetrics<TKey, TValue>(ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> options) =>
options.Metrics = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using OpenTelemetry.Instrumentation.ConfluentKafka;
using OpenTelemetry.Internal;

Expand All @@ -25,6 +24,18 @@ public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue
this TracerProviderBuilder builder)
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: null, consumerBuilder: null);

/// <summary>
/// Enables automatic data collection of outgoing requests to Kafka.
/// </summary>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TValue">The type of the value.</typeparam>
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param>
/// <param name="name">The name of the instrumentation.</param>
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns>
public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>(
this TracerProviderBuilder builder, string? name)
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: name, consumerBuilder: null);

/// <summary>
/// Enables automatic data collection of outgoing requests to Kafka.
/// </summary>
Expand All @@ -48,7 +59,7 @@ public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TValue">The type of the value.</typeparam>
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param>
/// <param name="name">Optional name which is used when retrieving options.</param>
/// <param name="name">The name of the instrumentation.</param>
/// <param name="consumerBuilder">Optional <see cref="InstrumentedConsumerBuilder{TKey, TValue}"/> to instrument.</param>
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns>
public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>(
Expand All @@ -58,34 +69,21 @@ public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue
{
Guard.ThrowIfNull(builder);

name ??= Options.DefaultName;

builder.ConfigureServices(services =>
{
services.Configure<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>(name, EnableTracing);
});

return builder
.AddSource(ConfluentKafkaCommon.InstrumentationName)
.AddInstrumentation(sp =>
{
if (consumerBuilder == null)
if (name == null)
{
consumerBuilder = sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
var options = sp.GetRequiredService<IOptionsMonitor<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>>();
consumerBuilder.Options = options.Get(name);
consumerBuilder ??= sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
}

if (consumerBuilder.Options == null)
else
{
consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>();
EnableTracing(consumerBuilder.Options);
consumerBuilder ??= sp.GetRequiredKeyedService<InstrumentedConsumerBuilder<TKey, TValue>>(name);
}

consumerBuilder.EnableTraces = true;
return new ConfluentKafkaConsumerInstrumentation<TKey, TValue>(consumerBuilder);
});
}

private static void EnableTracing<TKey, TValue>(ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options) =>
options.Traces = true;
}
Loading

0 comments on commit 18c5a26

Please sign in to comment.