diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6ef951be..4bbc489e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -5,11 +5,10 @@ reactorCore = "3.4.33-SNAPSHOT" asciidoctor = "3.3.2" kafka-doc = "28" log4j = "2.17.1" -micrometer = "1.10.4" +micrometer = "1.10.10" powermock = "2.0.9" reactiveStreams = "1.0.3" -micrometer = "1.11.0-M1" -micrometerTracing = '1.1.0-M1' +micrometerTracing = '1.1.4' [libraries] kafka = "org.apache.kafka:kafka-clients:2.8.2" @@ -30,7 +29,7 @@ powermock-core = { module = "org.powermock:powermock-core", version.ref = "power powermock-junit = { module = "org.powermock:powermock-module-junit4", version.ref = "powermock" } powermock-mockito = { module = "org.powermock:powermock-api-mockito2", version.ref = "powermock" } slf4j = "org.slf4j:slf4j-api:1.7.36" -testcontainers = "org.testcontainers:kafka:1.16.3" +testcontainers = "org.testcontainers:kafka:1.19.0" micrometer-observation = { module = "io.micrometer:micrometer-observation", version.ref = "micrometer" } micrometer-tracing-test = { module = "io.micrometer:micrometer-tracing-integration-test", version.ref = "micrometerTracing" } diff --git a/src/main/java/reactor/kafka/receiver/ImmutableReceiverOptions.java b/src/main/java/reactor/kafka/receiver/ImmutableReceiverOptions.java index 58a2e9eb..98bb3ab6 100644 --- a/src/main/java/reactor/kafka/receiver/ImmutableReceiverOptions.java +++ b/src/main/java/reactor/kafka/receiver/ImmutableReceiverOptions.java @@ -16,12 +16,14 @@ package reactor.kafka.receiver; +import io.micrometer.observation.ObservationRegistry; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Deserializer; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.kafka.receiver.observation.KafkaReceiverObservationConvention; import reactor.util.annotation.Nullable; import java.time.Duration; @@ -67,6 +69,11 @@ class ImmutableReceiverOptions implements ReceiverOptions { private final Supplier schedulerSupplier; private final ConsumerListener consumerListener; + private final ObservationRegistry observationRegistry; + + @Nullable + private final KafkaReceiverObservationConvention observationConvention; + ImmutableReceiverOptions() { this(new HashMap<>()); } @@ -105,6 +112,8 @@ class ImmutableReceiverOptions implements ReceiverOptions { this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); schedulerSupplier = Schedulers::immediate; consumerListener = null; + observationRegistry = ObservationRegistry.NOOP; + observationConvention = null; } ImmutableReceiverOptions( @@ -127,7 +136,9 @@ class ImmutableReceiverOptions implements ReceiverOptions { Collection partitions, Pattern pattern, Supplier supplier, - ConsumerListener consumerListener + ConsumerListener consumerListener, + ObservationRegistry observationRegistry, + KafkaReceiverObservationConvention observationConvention ) { this.properties = new HashMap<>(properties); this.assignListeners = new ArrayList<>(assignListeners); @@ -149,6 +160,8 @@ class ImmutableReceiverOptions implements ReceiverOptions { this.subscribePattern = pattern; this.schedulerSupplier = supplier; this.consumerListener = consumerListener; + this.observationRegistry = observationRegistry; + this.observationConvention = observationConvention; } @Override @@ -170,52 +183,56 @@ public ReceiverOptions consumerProperty(String name, Object newValue) { properties.put(name, newValue); return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @Override public ReceiverOptions withKeyDeserializer(Deserializer keyDeserializer) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - Objects.requireNonNull(keyDeserializer), - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + Objects.requireNonNull(keyDeserializer), + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -227,26 +244,28 @@ public Deserializer keyDeserializer() { @Override public ReceiverOptions withValueDeserializer(Deserializer valueDeserializer) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - Objects.requireNonNull(valueDeserializer), - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + Objects.requireNonNull(valueDeserializer), + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -266,26 +285,28 @@ public ReceiverOptions pollTimeout(Duration timeout) { throw new IllegalArgumentException("Close timeout must be >= 0"); return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - Objects.requireNonNull(timeout), - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + Objects.requireNonNull(timeout), + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -300,26 +321,28 @@ public ReceiverOptions closeTimeout(Duration timeout) { throw new IllegalArgumentException("Close timeout must be >= 0"); return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - Objects.requireNonNull(timeout), - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + Objects.requireNonNull(timeout), + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -331,26 +354,28 @@ public ReceiverOptions addAssignListener(Consumer( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -362,78 +387,84 @@ public ReceiverOptions addRevokeListener(Consumer( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @Override public ReceiverOptions clearAssignListeners() { return new ImmutableReceiverOptions<>( - properties, - new ArrayList<>(), - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + new ArrayList<>(), + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @Override public ReceiverOptions clearRevokeListeners() { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - new ArrayList<>(), - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + new ArrayList<>(), + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -450,78 +481,85 @@ public List>> revokeListeners() { @Override public ReceiverOptions subscription(Collection topics) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - Objects.requireNonNull(topics), - null, - null, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + Objects.requireNonNull(topics), + null, + null, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @Override public ReceiverOptions subscription(Pattern pattern) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - null, - null, - Objects.requireNonNull(pattern), - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + null, + null, + Objects.requireNonNull(pattern), + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention + ); } @Override public ReceiverOptions assignment(Collection partitions) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - null, - Objects.requireNonNull(partitions), - null, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + null, + Objects.requireNonNull(partitions), + null, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -563,26 +601,28 @@ public ReceiverOptions commitInterval(Duration commitInterval) { throw new IllegalArgumentException("Commit interval must be >= 0"); return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -597,26 +637,28 @@ public ReceiverOptions commitBatchSize(int commitBatchSize) { throw new IllegalArgumentException("Commit batch size must be >= 0"); return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -631,26 +673,28 @@ public ReceiverOptions atmostOnceCommitAheadSize(int commitAheadSize) { throw new IllegalArgumentException("Commit ahead size must be >= 0"); return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - commitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + commitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -665,26 +709,28 @@ public ReceiverOptions maxCommitAttempts(int maxAttempts) { throw new IllegalArgumentException("the number of attempts must be >= 0"); return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } @@ -715,7 +761,9 @@ public ReceiverOptions maxDeferredCommits(int maxDeferred) { assignTopicPartitions, subscribePattern, schedulerSupplier, - consumerListener + consumerListener, + observationRegistry, + observationConvention ); } @@ -727,57 +775,62 @@ public Duration maxDelayRebalance() { @Override public ReceiverOptions maxDelayRebalance(Duration maxDelay) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelay, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener - ); + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelay, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention + ); } @Override public long commitIntervalDuringDelay() { return this.commitIntervalDuringDelay; } + @Override public ReceiverOptions commitIntervalDuringDelay(long interval) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - interval, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener - ); + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + interval, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention + ); } @Override @@ -788,26 +841,28 @@ public Supplier schedulerSupplier() { @Override public ReceiverOptions schedulerSupplier(Supplier schedulerSupplier) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - Objects.requireNonNull(schedulerSupplier), - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + Objects.requireNonNull(schedulerSupplier), + consumerListener, + observationRegistry, + observationConvention ); } @@ -833,35 +888,70 @@ public ReceiverOptions commitRetryInterval(Duration commitRetryInterval) { assignTopicPartitions, subscribePattern, schedulerSupplier, - consumerListener + consumerListener, + observationRegistry, + observationConvention ); } public ReceiverOptions consumerListener(@Nullable ConsumerListener consumerListener) { return new ImmutableReceiverOptions<>( - properties, - assignListeners, - revokeListeners, - keyDeserializer, - valueDeserializer, - pollTimeout, - closeTimeout, - commitInterval, - commitBatchSize, - atmostOnceCommitAheadSize, - maxCommitAttempts, - commitRetryInterval, - maxDeferredCommits, - maxDelayRebalance, - commitIntervalDuringDelay, - subscribeTopics, - assignTopicPartitions, - subscribePattern, - schedulerSupplier, - consumerListener + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention + ); + } + + @Override + public ReceiverOptions withObservation(ObservationRegistry observationRegistry, + KafkaReceiverObservationConvention observationConvention) { + + return new ImmutableReceiverOptions<>( + properties, + assignListeners, + revokeListeners, + keyDeserializer, + valueDeserializer, + pollTimeout, + closeTimeout, + commitInterval, + commitBatchSize, + atmostOnceCommitAheadSize, + maxCommitAttempts, + commitRetryInterval, + maxDeferredCommits, + maxDelayRebalance, + commitIntervalDuringDelay, + subscribeTopics, + assignTopicPartitions, + subscribePattern, + schedulerSupplier, + consumerListener, + observationRegistry, + observationConvention ); } + @Override public Duration commitRetryInterval() { return commitRetryInterval; @@ -873,6 +963,17 @@ public ConsumerListener consumerListener() { return consumerListener; } + @Override + public ObservationRegistry observationRegistry() { + return observationRegistry; + } + + @Nullable + @Override + public KafkaReceiverObservationConvention observationConvention() { + return observationConvention; + } + private long getLongOption(String optionName, long defaultValue) { Objects.requireNonNull(optionName); @@ -912,7 +1013,9 @@ public int hashCode() { subscribeTopics, assignTopicPartitions, subscribePattern, - consumerListener + consumerListener, + observationRegistry, + observationConvention ); } @@ -940,7 +1043,9 @@ public boolean equals(Object object) { && Objects.equals(subscribeTopics, that.subscribeTopics) && Objects.equals(assignTopicPartitions, that.assignTopicPartitions) && Objects.equals(subscribePattern, that.subscribePattern) - && Objects.equals(consumerListener, that.consumerListener); + && Objects.equals(consumerListener, that.consumerListener) + && Objects.equals(observationRegistry, that.observationRegistry) + && Objects.equals(observationConvention, that.observationConvention); } return false; } diff --git a/src/main/java/reactor/kafka/receiver/ReceiverOptions.java b/src/main/java/reactor/kafka/receiver/ReceiverOptions.java index 7c8136e5..41addb23 100644 --- a/src/main/java/reactor/kafka/receiver/ReceiverOptions.java +++ b/src/main/java/reactor/kafka/receiver/ReceiverOptions.java @@ -16,16 +16,6 @@ package reactor.kafka.receiver; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.RetriableCommitFailedException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.Deserializer; -import reactor.core.scheduler.Scheduler; -import reactor.util.annotation.NonNull; -import reactor.util.annotation.Nullable; - import java.time.Duration; import java.util.Collection; import java.util.List; @@ -36,6 +26,19 @@ import java.util.function.Supplier; import java.util.regex.Pattern; +import io.micrometer.observation.ObservationRegistry; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import reactor.core.scheduler.Scheduler; +import reactor.kafka.receiver.observation.KafkaReceiverObservation; +import reactor.kafka.receiver.observation.KafkaReceiverObservationConvention; +import reactor.util.annotation.NonNull; +import reactor.util.annotation.Nullable; + public interface ReceiverOptions { /** @@ -299,6 +302,33 @@ default ReceiverOptions consumerListener(@Nullable ConsumerListener listen return this; } + /** + * Configure an {@link ObservationRegistry} to observe Kafka record consuming operation. + * @param observationRegistry {@link ObservationRegistry} to use. + * @return receiver options with updated observationRegistry + * @since 1.3.21 + */ + @NonNull + default ReceiverOptions withObservation(@NonNull ObservationRegistry observationRegistry) { + return withObservation(observationRegistry, null); + } + + /** + * Configure an {@link ObservationRegistry} to observe Kafka record receiving operation. + * This functionality makes sense only in simple use-cases where it needs to be closed gaps + * in tracing on this consumer side: an observation is opened and closed immediately to + * attach respective consumer span to the trace. + * For more complex (e.g. tracing continuation) the {@link KafkaReceiverObservation} API + * should be used: the tracing and parent span information is extracted from the consumer record. + * @param observationRegistry {@link ObservationRegistry} to use. + * @param observationConvention the {@link KafkaReceiverObservationConvention} to use. + * @return receiver options with updated observationRegistry + * @since 1.3.21 + */ + @NonNull + ReceiverOptions withObservation(@NonNull ObservationRegistry observationRegistry, + @Nullable KafkaReceiverObservationConvention observationConvention); + /** * Returns the configuration properties of the underlying {@link KafkaConsumer}. * @return options to configure for Kafka consumer. @@ -452,7 +482,6 @@ default int maxDeferredCommits() { /** * Get the maximum amount of time to delay a rebalance until existing records in the * pipeline have been processed. Default 60s. - * @param maxDelay the max delay. * @return options updated with the max delay. * @since 1.3.12 * @see #commitIntervalDuringDelay() @@ -464,7 +493,6 @@ default Duration maxDelayRebalance() { /** * Get how often to commit offsets, in milliseconds, while a rebalance is being * delayed. Default 100ms. - * @param interval the interval. * @return options updated with the interval * @since 1.3.12 * @see #maxDelayRebalance() @@ -491,6 +519,39 @@ default ConsumerListener consumerListener() { return null; } + /** + * Return an {@link ObservationRegistry} to observe Kafka record consuming operation. + * @return the {@link ObservationRegistry}. + * @since 1.3.21 + */ + @NonNull + ObservationRegistry observationRegistry(); + + /** + * Return a {@link KafkaReceiverObservationConvention} to support a publishing operation observation. + * @return the {@link KafkaReceiverObservationConvention}. + * @since 1.3.21 + */ + @Nullable + KafkaReceiverObservationConvention observationConvention(); + + /** + * Return the client id provided by the {@link ConsumerConfig}. + * @return the client id + */ + @Nullable + default String clientId() { + return (String) consumerProperty(ConsumerConfig.CLIENT_ID_CONFIG); + } + + /** + * Return the bootstrap servers from the provided {@link ConsumerConfig}. + * @return the bootstrap servers list. + */ + @NonNull + default String bootstrapServers() { + return (String) Objects.requireNonNull(consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + } /** * Returns the {@link KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)}, @@ -518,9 +579,6 @@ else if (assignment() != null) /** * Called whenever a consumer is added or removed. * - * @param the key type. - * @param the value type. - * * @since 1.3.17 * */ diff --git a/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java b/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java index 68115548..4c4bf48e 100644 --- a/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java +++ b/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java @@ -117,7 +117,7 @@ class ConsumerHandler { sink, awaitingTransaction ); - eventScheduler.start(); + eventScheduler.init(); } public Flux> receive() { diff --git a/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java b/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java index 891be8d1..0c880bda 100644 --- a/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java +++ b/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java @@ -29,8 +29,11 @@ import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.receiver.observation.KafkaReceiverObservation; +import reactor.kafka.receiver.observation.KafkaRecordReceiverContext; import reactor.kafka.sender.TransactionManager; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; @@ -44,14 +47,20 @@ public class DefaultKafkaReceiver implements KafkaReceiver { private final ReceiverOptions receiverOptions; + private final String receiverId; + Predicate isRetriableException = t -> RetriableCommitFailedException.class.isInstance(t) - || RebalanceInProgressException.class.isInstance(t); + || RebalanceInProgressException.class.isInstance(t); final AtomicReference> consumerHandlerRef = new AtomicReference<>(); public DefaultKafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions receiverOptions) { this.consumerFactory = consumerFactory; this.receiverOptions = receiverOptions; + receiverId = + Optional.ofNullable(receiverOptions.clientId()) + .filter(clientId -> !clientId.isEmpty()) + .orElse("reactor-kafka-receiver-" + System.identityHashCode(this)); } @Override @@ -62,6 +71,7 @@ public Flux> receive(Integer prefetch) { .receive() .publishOn(scheduler, prefetchCalculated) .flatMapIterable(it -> it, prefetchCalculated) + .doOnNext(this::observerRecord) .map(record -> new ReceiverRecord<>( record, handler.toCommittableOffset(record) @@ -76,6 +86,7 @@ public Flux>> receiveAutoAck(Integer prefetch) { .filter(it -> !it.isEmpty()) .publishOn(scheduler, preparePublishOnQueueSize(prefetch)) .map(consumerRecords -> Flux.fromIterable(consumerRecords) + .doOnNext(this::observerRecord) .doAfterTerminate(() -> { for (ConsumerRecord r : consumerRecords) { handler.acknowledge(r); @@ -89,6 +100,7 @@ public Flux> receiveAtmostOnce(Integer prefetch) { .receive() .concatMap(records -> Flux .fromIterable(records) + .doOnNext(this::observerRecord) .concatMap(r -> handler.commit(r).thenReturn(r)) .publishOn(scheduler, 1), preparePublishOnQueueSize(prefetch))); } @@ -113,15 +125,26 @@ public Flux>> receiveExactlyOnce(TransactionManager tr })) .concatWith(transactionManager .sendOffsets(offsetBatch - .getAndClearOffsets() - .offsets(), + .getAndClearOffsets() + .offsets(), handler.consumer.groupMetadata())) + .doOnNext(this::observerRecord) .doAfterTerminate(() -> handler.awaitingTransaction.set(false)); }); return resultFlux.publishOn(transactionManager.scheduler(), preparePublishOnQueueSize(prefetch)); }); } + private > void observerRecord(R record) { + KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(receiverOptions.observationConvention(), + KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, + () -> new KafkaRecordReceiverContext(record, + receiverId, + receiverOptions.bootstrapServers()), + receiverOptions.observationRegistry()) + .observe(() -> { }); + } + @Override public Mono doOnConsumer(Function, ? extends T> function) { ConsumerHandler consumerHandler = consumerHandlerRef.get(); diff --git a/src/main/java/reactor/kafka/receiver/observation/KafkaReceiverObservation.java b/src/main/java/reactor/kafka/receiver/observation/KafkaReceiverObservation.java index 5c5f654c..f1e05977 100644 --- a/src/main/java/reactor/kafka/receiver/observation/KafkaReceiverObservation.java +++ b/src/main/java/reactor/kafka/receiver/observation/KafkaReceiverObservation.java @@ -43,7 +43,7 @@ public Class> getDefaultConve @Override public String getPrefix() { - return "spring.kafka.receiver"; + return "reactor.kafka.receiver"; } @Override @@ -60,8 +60,9 @@ public enum ReceiverLowCardinalityTags implements KeyName { /** * The client id of the {@code KafkaConsumer} behind {@link reactor.kafka.receiver.KafkaReceiver}. + * Can be a {@link org.apache.kafka.clients.consumer.ConsumerConfig#CLIENT_ID_CONFIG} value if present. */ - CLIENT_ID { + RECEIVER_ID { @Override public String asString() { return "reactor.kafka.client.id"; @@ -91,12 +92,12 @@ public static class DefaultKafkaReceiverObservationConvention implements KafkaRe * A singleton instance of the convention. */ public static final DefaultKafkaReceiverObservationConvention INSTANCE = - new DefaultKafkaReceiverObservationConvention(); + new DefaultKafkaReceiverObservationConvention(); @Override public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) { - return KeyValues.of(ReceiverLowCardinalityTags.CLIENT_ID.withValue(context.getClientId()), - ReceiverLowCardinalityTags.COMPONENT_TYPE.withValue("receiver")); + return KeyValues.of(ReceiverLowCardinalityTags.RECEIVER_ID.withValue(context.getReceiverId()), + ReceiverLowCardinalityTags.COMPONENT_TYPE.withValue("receiver")); } } diff --git a/src/main/java/reactor/kafka/receiver/observation/KafkaRecordReceiverContext.java b/src/main/java/reactor/kafka/receiver/observation/KafkaRecordReceiverContext.java index 44a805dc..37a00509 100644 --- a/src/main/java/reactor/kafka/receiver/observation/KafkaRecordReceiverContext.java +++ b/src/main/java/reactor/kafka/receiver/observation/KafkaRecordReceiverContext.java @@ -32,11 +32,11 @@ */ public class KafkaRecordReceiverContext extends ReceiverContext> { - private final String clientId; + private final String receiverId; private final ConsumerRecord record; - public KafkaRecordReceiverContext(ConsumerRecord record, String clientId, String kafkaServers) { + public KafkaRecordReceiverContext(ConsumerRecord record, String receiverId, String kafkaServers) { super((carrier, key) -> { Header header = carrier.headers().lastHeader(key); if (header == null) { @@ -46,12 +46,12 @@ public KafkaRecordReceiverContext(ConsumerRecord record, String clientId, }); setCarrier(record); this.record = record; - this.clientId = clientId; + this.receiverId = receiverId; setRemoteServiceName("Apache Kafka: " + kafkaServers); } - public String getClientId() { - return this.clientId; + public String getReceiverId() { + return this.receiverId; } /** diff --git a/src/main/java/reactor/kafka/receiver/observation/ReceiverObservations.java b/src/main/java/reactor/kafka/receiver/observation/ReceiverObservations.java deleted file mode 100644 index b5ba9696..00000000 --- a/src/main/java/reactor/kafka/receiver/observation/ReceiverObservations.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.kafka.receiver.observation; - -import java.util.function.Consumer; -import java.util.function.Function; - -import io.micrometer.observation.Observation; -import io.micrometer.observation.ObservationRegistry; -import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import reactor.core.publisher.Mono; -import reactor.util.annotation.Nullable; - -/** - * Helper factories for {@link ConsumerRecord} observation in the end-user code. - * Typically used like this on a receiver {@link reactor.core.publisher.Flux} - * result in a {@link Mono#transformDeferred(Function)} operator for each record: - *
- * {@code
- * Flux> receive =
- *        KafkaReceiver.create(receiverOptions.subscription(Collections.singletonList(topic)))
- *                .receive()
- *                .flatMap(record ->
- *                        Mono.just(record)
- *                                .transformDeferred(mono ->
- *                                        ReceiverObservations.observe(mono, record, "reactor kafka receiver",
- *                                                bootstrapServers(), OBSERVATION_REGISTRY)));
- * }
- * 
- *

- * These factories are needed if there is a requirements to process a record in a reactive manner within - * the mentioned consumer observation. - * Otherwise, a regular {@link KafkaReceiverObservation} API is enough to use - * in a {@link reactor.core.publisher.Flux#doOnNext(Consumer)} operator. - * - * @see KafkaReceiverObservation - */ -public final class ReceiverObservations { - - public static Mono observe(Mono userMono, ConsumerRecord record, String clientId, - String kafkaServers, ObservationRegistry observationRegistry) { - - return observe(userMono, record, clientId, kafkaServers, observationRegistry, null); - } - - public static Mono observe(Mono userMono, ConsumerRecord record, String clientId, - String kafkaServers, ObservationRegistry observationRegistry, - @Nullable KafkaReceiverObservationConvention observationConvention) { - - Observation observation = - KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(observationConvention, - KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, - () -> new KafkaRecordReceiverContext(record, clientId, kafkaServers), - observationRegistry); - observation.start(); - - return userMono - .doOnTerminate(observation::stop) - .doOnError(observation::error) - .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation)); - } - - private ReceiverObservations() { - } - -} diff --git a/src/main/java/reactor/kafka/sender/ImmutableSenderOptions.java b/src/main/java/reactor/kafka/sender/ImmutableSenderOptions.java index b6bcbf39..8f628a0d 100644 --- a/src/main/java/reactor/kafka/sender/ImmutableSenderOptions.java +++ b/src/main/java/reactor/kafka/sender/ImmutableSenderOptions.java @@ -16,14 +16,6 @@ package reactor.kafka.sender; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - import io.micrometer.observation.ObservationRegistry; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -45,10 +37,8 @@ class ImmutableSenderOptions implements SenderOptions { - private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); - private final Map properties; - private final ProducerListener producerListener; + private final ProducerListener producerListener; private final Serializer keySerializer; @@ -72,19 +62,18 @@ class ImmutableSenderOptions implements SenderOptions { ImmutableSenderOptions(Properties properties) { this( - properties - .entrySet() - .stream() - .collect(Collectors.toMap( - e -> e.getKey().toString(), - Map.Entry::getValue - )) + properties + .entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey().toString(), + Map.Entry::getValue + )) ); } ImmutableSenderOptions(Map properties) { this.properties = new HashMap<>(properties); - maybeOverrideClientId(); keySerializer = null; valueSerializer = null; @@ -98,34 +87,17 @@ class ImmutableSenderOptions implements SenderOptions { this.observationConvention = null; } - /** - * Simulate {@link ProducerConfig#CLIENT_ID_CONFIG} generation. - */ - private void maybeOverrideClientId() { - String refinedClientId; - boolean userConfiguredClientId = this.properties.containsKey(ProducerConfig.CLIENT_ID_CONFIG); - if (userConfiguredClientId) { - refinedClientId = (String) this.properties.get(ProducerConfig.CLIENT_ID_CONFIG); - } else { - String transactionalId = (String) this.properties.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); - refinedClientId = - "producer-" + - (transactionalId != null ? transactionalId : PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement()); - } - this.properties.put(ProducerConfig.CLIENT_ID_CONFIG, refinedClientId); - } - ImmutableSenderOptions( - Map properties, - Serializer serializer, - Serializer valueSerializer, - Duration timeout, - Scheduler scheduler, - int flight, - boolean error, - ProducerListener producerListener, - ObservationRegistry observationRegistry, - KafkaSenderObservationConvention observationConvention + Map properties, + Serializer serializer, + Serializer valueSerializer, + Duration timeout, + Scheduler scheduler, + int flight, + boolean error, + ProducerListener producerListener, + ObservationRegistry observationRegistry, + KafkaSenderObservationConvention observationConvention ) { this.properties = properties; keySerializer = serializer; @@ -172,16 +144,16 @@ public SenderOptions producerProperty(String name, Object value) { properties.put(name, value); return new ImmutableSenderOptions<>( - properties, - keySerializer, - valueSerializer, - closeTimeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + valueSerializer, + closeTimeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -204,16 +176,16 @@ public Serializer keySerializer() { @Override public SenderOptions withKeySerializer(Serializer keySerializer) { return new ImmutableSenderOptions<>( - properties, - Objects.requireNonNull(keySerializer), - valueSerializer, - closeTimeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + Objects.requireNonNull(keySerializer), + valueSerializer, + closeTimeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -236,16 +208,16 @@ public Serializer valueSerializer() { @Override public SenderOptions withValueSerializer(Serializer valueSerializer) { return new ImmutableSenderOptions<>( - properties, - keySerializer, - Objects.requireNonNull(valueSerializer), - closeTimeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + Objects.requireNonNull(valueSerializer), + closeTimeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -265,16 +237,16 @@ public Scheduler scheduler() { @Override public SenderOptions scheduler(Scheduler scheduler) { return new ImmutableSenderOptions<>( - properties, - keySerializer, - valueSerializer, - closeTimeout, - Objects.requireNonNull(scheduler), - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + valueSerializer, + closeTimeout, + Objects.requireNonNull(scheduler), + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -298,16 +270,16 @@ public int maxInFlight() { @Override public SenderOptions maxInFlight(int maxInFlight) { return new ImmutableSenderOptions<>( - properties, - keySerializer, - valueSerializer, - closeTimeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + valueSerializer, + closeTimeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -338,16 +310,16 @@ public boolean stopOnError() { @Override public SenderOptions stopOnError(boolean stopOnError) { return new ImmutableSenderOptions<>( - properties, - keySerializer, - valueSerializer, - closeTimeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + valueSerializer, + closeTimeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -367,16 +339,16 @@ public Duration closeTimeout() { @Override public SenderOptions closeTimeout(Duration timeout) { return new ImmutableSenderOptions<>( - properties, - keySerializer, - valueSerializer, - timeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + valueSerializer, + timeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -388,34 +360,34 @@ public ProducerListener producerListener() { @Override public SenderOptions producerListener(ProducerListener producerListener) { return new ImmutableSenderOptions<>( - properties, - keySerializer, - valueSerializer, - closeTimeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + valueSerializer, + closeTimeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @Override public SenderOptions withObservation(@NonNull ObservationRegistry observationRegistry, - @Nullable KafkaSenderObservationConvention observationConvention) { + @Nullable KafkaSenderObservationConvention observationConvention) { return new ImmutableSenderOptions<>( - properties, - keySerializer, - valueSerializer, - closeTimeout, - scheduler, - maxInFlight, - stopOnError, - producerListener, - observationRegistry, - observationConvention + properties, + keySerializer, + valueSerializer, + closeTimeout, + scheduler, + maxInFlight, + stopOnError, + producerListener, + observationRegistry, + observationConvention ); } @@ -431,7 +403,8 @@ public KafkaSenderObservationConvention observationConvention() { return observationConvention; } - @Override public int hashCode() { + @Override + public int hashCode() { return Objects.hash( properties, keySerializer, @@ -440,7 +413,9 @@ public KafkaSenderObservationConvention observationConvention() { scheduler, maxInFlight, stopOnError, - producerListener + producerListener, + observationRegistry, + observationConvention ); } @@ -456,7 +431,9 @@ public boolean equals(Object object) { && Objects.equals(valueSerializer, that.valueSerializer) && Objects.equals(closeTimeout, that.closeTimeout) && Objects.equals(scheduler, that.scheduler) - && Objects.equals(producerListener, that.producerListener); + && Objects.equals(producerListener, that.producerListener) + && Objects.equals(observationRegistry, that.observationRegistry) + && Objects.equals(observationConvention, that.observationConvention); } return false; } diff --git a/src/main/java/reactor/kafka/sender/SenderOptions.java b/src/main/java/reactor/kafka/sender/SenderOptions.java index 0f501d3f..e72bc706 100644 --- a/src/main/java/reactor/kafka/sender/SenderOptions.java +++ b/src/main/java/reactor/kafka/sender/SenderOptions.java @@ -211,10 +211,10 @@ default SenderOptions producerListener(@Nullable ProducerListener listener } /** - * Configure an {@link ObservationRegistry} to observe Kafka record publishing operation. * @param observationRegistry {@link ObservationRegistry} to use. * @return sender options with updated observationRegistry + * @since 1.3.21 */ @NonNull default SenderOptions withObservation(@NonNull ObservationRegistry observationRegistry) { @@ -226,6 +226,7 @@ default SenderOptions withObservation(@NonNull ObservationRegistry observa * @param observationRegistry {@link ObservationRegistry} to use. * @param observationConvention the {@link KafkaSenderObservationConvention} to use. * @return sender options with updated observationRegistry + * @since 1.3.21 */ @NonNull SenderOptions withObservation(@NonNull ObservationRegistry observationRegistry, @@ -234,6 +235,7 @@ SenderOptions withObservation(@NonNull ObservationRegistry observationRegi /** * Return an {@link ObservationRegistry} to observe Kafka record publishing operation. * @return the {@link ObservationRegistry}. + * @since 1.3.21 */ @NonNull ObservationRegistry observationRegistry(); @@ -241,6 +243,7 @@ SenderOptions withObservation(@NonNull ObservationRegistry observationRegi /** * Return a {@link KafkaSenderObservationConvention} to support a publishing operation observation. * @return the {@link KafkaSenderObservationConvention}. + * @since 1.3.21 */ @Nullable KafkaSenderObservationConvention observationConvention(); @@ -260,12 +263,12 @@ default boolean isTransactional() { } /** - * Return the client id, configured or generated by the {@link ProducerConfig}. + * Return the client id provided by the {@link ProducerConfig}. * @return the client id */ - @NonNull + @Nullable default String clientId() { - return Objects.requireNonNull((String) producerProperty(ProducerConfig.CLIENT_ID_CONFIG)); + return (String) producerProperty(ProducerConfig.CLIENT_ID_CONFIG); } /** @@ -294,11 +297,7 @@ default boolean fatalException(@NonNull Throwable t) { /** * Called whenever a producer is added or removed. * - * @param the key type. - * @param the value type. - * * @since 1.3.17 - * */ interface ProducerListener { diff --git a/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java b/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java index fc8ae5ed..8cb3afeb 100644 --- a/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java +++ b/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,8 +42,8 @@ import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.HashSet; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -80,14 +80,17 @@ public class DefaultKafkaSender implements KafkaSender, EmitFailureH * producer properties are supported. The underlying Kafka producer is created lazily when required. */ public DefaultKafkaSender(ProducerFactory producerFactory, SenderOptions options) { - this.scheduler = Schedulers.newSingle(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { + producerId = + Optional.ofNullable(options.clientId()) + .filter(clientId -> !clientId.isEmpty()) + .orElse("reactor-kafka-sender-" + System.identityHashCode(this)); + + this.scheduler = + Schedulers.newSingle(r -> { Thread thread = new Thread(r); - thread.setName("reactor-kafka-sender-" + System.identityHashCode(this)); + thread.setName(producerId); return thread; - } - }); + }); this.hasProducer = new AtomicBoolean(); this.senderOptions = options.scheduler(options.isTransactional() ? Schedulers.newSingle(options.transactionalId()) @@ -97,8 +100,9 @@ public Thread newThread(Runnable r) { this.producerMono = Mono .fromCallable(() -> { Producer producer = producerFactory.createProducer(senderOptions); - if (senderOptions.producerListener() != null) { - senderOptions.producerListener().producerAdded(producerId, producer); + SenderOptions.ProducerListener producerListener = senderOptions.producerListener(); + if (producerListener != null) { + producerListener.producerAdded(producerId, producer); } if (transactional) { log.info("Initializing transactions for producer {}", @@ -110,11 +114,10 @@ public Thread newThread(Runnable r) { }) .publishOn(senderOptions.isTransactional() ? this.scheduler : senderOptions.scheduler()) .cache() - .as(flux -> { - return senderOptions.isTransactional() - ? flux.publishOn(senderOptions.isTransactional() ? this.scheduler : senderOptions.scheduler()) - : flux; - }); + .as(flux -> + senderOptions.isTransactional() + ? flux.publishOn(senderOptions.isTransactional() ? this.scheduler : senderOptions.scheduler()) + : flux); if (transactional) { this.producerMono.subscribe().dispose(); @@ -138,7 +141,7 @@ Flux> doSend(Publisher> recor .as(flux -> new FluxOperator, SenderResult>(flux) { @Override public void subscribe(CoreSubscriber> s) { - source.subscribe(new SendSubscriber<>(senderOptions, producer, s)); + source.subscribe(new SendSubscriber<>(senderOptions, producer, producerId, s)); } }); }) diff --git a/src/main/java/reactor/kafka/sender/internals/SendSubscriber.java b/src/main/java/reactor/kafka/sender/internals/SendSubscriber.java index 1fe8972d..5c4d93f2 100644 --- a/src/main/java/reactor/kafka/sender/internals/SendSubscriber.java +++ b/src/main/java/reactor/kafka/sender/internals/SendSubscriber.java @@ -36,7 +36,7 @@ /** * This is basically an optimized flatMapDelayError(Function<ProducerRecord,Mono<SenderResult>>), without prefetching * and with an unlimited concurrency (implicitly limited by {@link Producer#send(ProducerRecord)}). - * + *

* The requests are passed to the upstream "as is". * */ @@ -56,9 +56,14 @@ enum State { private final AtomicReference state = new AtomicReference<>(State.INIT); private final SenderOptions senderOptions; - SendSubscriber(SenderOptions senderOptions, Producer producer, CoreSubscriber> actual) { + private final String producerId; + + SendSubscriber(SenderOptions senderOptions, Producer producer, String producerId, + CoreSubscriber> actual) { + this.senderOptions = senderOptions; this.producer = producer; + this.producerId = producerId; this.actual = actual; } @@ -116,12 +121,12 @@ public void onNext(ProducerRecord record) { try { KafkaSenderObservation.SENDER_OBSERVATION.observation(senderOptions.observationConvention(), KafkaSenderObservation.DefaultKafkaSenderObservationConvention.INSTANCE, - () -> new KafkaRecordSenderContext(record, - senderOptions.clientId(), - senderOptions.bootstrapServers()), + () -> new KafkaRecordSenderContext(record, + producerId, + senderOptions.bootstrapServers()), senderOptions.observationRegistry()) - .parentObservation(currentContext().getOrDefault(ObservationThreadLocalAccessor.KEY, null)) - .observe(() -> producer.send(record, callback)); + .parentObservation(currentContext().getOrDefault(ObservationThreadLocalAccessor.KEY, null)) + .observe(() -> producer.send(record, callback)); } catch (Exception e) { callback.onCompletion(null, e); } diff --git a/src/main/java/reactor/kafka/sender/observation/KafkaRecordSenderContext.java b/src/main/java/reactor/kafka/sender/observation/KafkaRecordSenderContext.java index 07a6f37c..54a6c87f 100644 --- a/src/main/java/reactor/kafka/sender/observation/KafkaRecordSenderContext.java +++ b/src/main/java/reactor/kafka/sender/observation/KafkaRecordSenderContext.java @@ -30,20 +30,20 @@ */ public class KafkaRecordSenderContext extends SenderContext> { - private final String clientId; + private final String producerId; private final String destination; - public KafkaRecordSenderContext(ProducerRecord record, String clientId, String kafkaServers) { + public KafkaRecordSenderContext(ProducerRecord record, String producerId, String kafkaServers) { super((carrier, key, value) -> record.headers().add(key, value.getBytes(StandardCharsets.UTF_8))); setCarrier(record); - this.clientId = clientId; + this.producerId = producerId; this.destination = record.topic(); setRemoteServiceName("Apache Kafka: " + kafkaServers); } - public String getClientId() { - return this.clientId; + public String getProducerId() { + return this.producerId; } /** diff --git a/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservation.java b/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservation.java index c6567575..49b070d0 100644 --- a/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservation.java +++ b/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservation.java @@ -59,9 +59,10 @@ public KeyName[] getLowCardinalityKeyNames() { public enum SenderLowCardinalityTags implements KeyName { /** - * The client id of the {@code KafkaProducer} behind {@link reactor.kafka.sender.KafkaSender}. + * The producer id of the {@link reactor.kafka.sender.KafkaSender}. + * Can be a {@link org.apache.kafka.clients.producer.ProducerConfig#CLIENT_ID_CONFIG} value if present. */ - CLIENT_ID { + PRODUCER_ID { @Override public String asString() { return "reactor.kafka.client.id"; @@ -91,13 +92,13 @@ public static class DefaultKafkaSenderObservationConvention implements KafkaSend * A singleton instance of the convention. */ public static final DefaultKafkaSenderObservationConvention INSTANCE = - new DefaultKafkaSenderObservationConvention(); + new DefaultKafkaSenderObservationConvention(); @Override public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) { return KeyValues.of( - SenderLowCardinalityTags.CLIENT_ID.withValue(context.getClientId()), - SenderLowCardinalityTags.COMPONENT_TYPE.withValue("sender")); + SenderLowCardinalityTags.PRODUCER_ID.withValue(context.getProducerId()), + SenderLowCardinalityTags.COMPONENT_TYPE.withValue("sender")); } } diff --git a/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservationConvention.java b/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservationConvention.java index 058e74db..f559c3d7 100644 --- a/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservationConvention.java +++ b/src/main/java/reactor/kafka/sender/observation/KafkaSenderObservationConvention.java @@ -41,6 +41,6 @@ default String getName() { @Override default String getContextualName(KafkaRecordSenderContext context) { - return context.getDestination() + " send"; + return context.getDestination() + " publish"; } } diff --git a/src/test/java/reactor/kafka/observation/ReactorKafkaObservationTests.java b/src/test/java/reactor/kafka/observation/ReactorKafkaObservationTests.java index 3eeb252c..a8691e0d 100644 --- a/src/test/java/reactor/kafka/observation/ReactorKafkaObservationTests.java +++ b/src/test/java/reactor/kafka/observation/ReactorKafkaObservationTests.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.stream.Collectors; import brave.Tracing; @@ -37,6 +38,8 @@ import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; import io.micrometer.tracing.test.simple.SpansAssert; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,7 +48,8 @@ import reactor.kafka.AbstractKafkaTest; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverRecord; -import reactor.kafka.receiver.observation.ReceiverObservations; +import reactor.kafka.receiver.observation.KafkaReceiverObservation; +import reactor.kafka.receiver.observation.KafkaRecordReceiverContext; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.observation.KafkaSenderObservation; import reactor.test.StepVerifier; @@ -61,23 +65,37 @@ public class ReactorKafkaObservationTests extends AbstractKafkaTest { static { Tracing tracing = Tracing.newBuilder().addSpanHandler(SPANS).build(); BraveTracer braveTracer = new BraveTracer(tracing.tracer(), - new BraveCurrentTraceContext(ThreadLocalCurrentTraceContext.create()), - new BraveBaggageManager()); + new BraveCurrentTraceContext(ThreadLocalCurrentTraceContext.create()), + new BraveBaggageManager()); BravePropagator bravePropagator = new BravePropagator(tracing); OBSERVATION_REGISTRY.observationConfig() - .observationHandler( - // Composite will pick the first matching handler - new ObservationHandler.FirstMatchingCompositeObservationHandler( - // This is responsible for creating a child span on the sender side - new PropagatingSenderTracingObservationHandler<>(braveTracer, bravePropagator), - // This is responsible for creating a span on the receiver side - new PropagatingReceiverTracingObservationHandler<>(braveTracer, bravePropagator), - // This is responsible for creating a default span - new DefaultTracingObservationHandler(braveTracer))); + .observationHandler( + // Composite will pick the first matching handler + new ObservationHandler.FirstMatchingCompositeObservationHandler( + // This is responsible for creating a child span on the sender side + new PropagatingSenderTracingObservationHandler<>(braveTracer, bravePropagator), + // This is responsible for creating a span on the receiver side + new PropagatingReceiverTracingObservationHandler<>(braveTracer, bravePropagator), + // This is responsible for creating a default span + new DefaultTracingObservationHandler(braveTracer))); } private KafkaSender kafkaSender; + @Override + public Map producerProps() { + Map producerProps = super.producerProps(); + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "observed.producer"); + return producerProps; + } + + @Override + protected Map consumerProps(String groupId) { + Map consumerProps = super.consumerProps(groupId); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "observed.receiver"); + return consumerProps; + } + @Before public void setup() { SPANS.clear(); @@ -97,36 +115,74 @@ public void senderPropagatesObservationToReceiver() { Observation parentObservation = Observation.createNotStarted("test parent observation", OBSERVATION_REGISTRY); parentObservation.start(); kafkaSender.createOutbound().send(source.map(i -> createProducerRecord(i, true))) - .then() - .doOnTerminate(parentObservation::stop) - .doOnError(parentObservation::error) - .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, parentObservation)) - .subscribe(); + .then() + .doOnTerminate(parentObservation::stop) + .doOnError(parentObservation::error) + .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, parentObservation)) + .subscribe(); Flux> receive = - KafkaReceiver.create(receiverOptions.subscription(Collections.singletonList(topic))) - .receive() - .flatMap(record -> - Mono.deferContextual(cxt -> - Mono.just(record) - .filter(data -> cxt.hasKey(ObservationThreadLocalAccessor.KEY))) - .transformDeferred(mono -> - ReceiverObservations.observe(mono, record, "reactor kafka receiver", - bootstrapServers(), OBSERVATION_REGISTRY))); + KafkaReceiver.create(receiverOptions.subscription(Collections.singletonList(topic)) + .withObservation(OBSERVATION_REGISTRY)) + .receive(); StepVerifier.create(receive) - .expectNextCount(count) - .thenCancel() - .verify(Duration.ofMillis(receiveTimeoutMillis)); + .expectNextCount(count) + .thenCancel() + .verify(Duration.ofMillis(receiveTimeoutMillis)); assertThat(SPANS.spans()).hasSize(21); SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList())) - .haveSameTraceId() - .hasASpanWithName("test parent observation") - .hasASpanWithATag(KafkaSenderObservation.SenderLowCardinalityTags.COMPONENT_TYPE, "sender") - .hasASpanWithATag(KafkaSenderObservation.SenderLowCardinalityTags.CLIENT_ID, "producer-1") - .hasASpanWithName(topic + " send", spanAssert -> spanAssert.hasKindEqualTo(Span.Kind.PRODUCER)) - .hasASpanWithName(topic + " receive", spanAssert -> spanAssert.hasKindEqualTo(Span.Kind.CONSUMER)); + .haveSameTraceId() + .hasASpanWithName("test parent observation") + .hasASpanWithATag(KafkaSenderObservation.SenderLowCardinalityTags.COMPONENT_TYPE, "sender") + .hasASpanWithATag(KafkaSenderObservation.SenderLowCardinalityTags.PRODUCER_ID, "observed.producer") + .hasASpanWithName(topic + " publish", spanAssert -> spanAssert.hasKindEqualTo(Span.Kind.PRODUCER)) + .hasASpanWithName(topic + " receive", spanAssert -> spanAssert.hasKindEqualTo(Span.Kind.CONSUMER)) + .hasASpanWithATag(KafkaReceiverObservation.ReceiverLowCardinalityTags.COMPONENT_TYPE, "receiver") + .hasASpanWithATag(KafkaReceiverObservation.ReceiverLowCardinalityTags.RECEIVER_ID, "observed.receiver"); + } + + @Test + public void manualReceiverObservationIsPartOfSenderTrace() { + kafkaSender.createOutbound() + .send(Mono.just(createProducerRecord(0, true))) + .then() + .subscribe(); + + Flux> receive = + KafkaReceiver.create(receiverOptions.consumerProperty(ConsumerConfig.CLIENT_ID_CONFIG, "") + .subscription(Collections.singletonList(topic))) + .receive() + .flatMap(record -> { + Observation receiverObservation = + KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null, + KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, + () -> + new KafkaRecordReceiverContext( + record, "user.receiver", receiverOptions.bootstrapServers()), + OBSERVATION_REGISTRY); + + return Mono.just(record) + .doOnTerminate(receiverObservation::stop) + .doOnError(receiverObservation::error) + .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation)); + }); + + StepVerifier.create(receive) + .expectNextCount(1) + .thenCancel() + .verify(Duration.ofMillis(receiveTimeoutMillis)); + + assertThat(SPANS.spans()).hasSize(2); + SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList())) + .haveSameTraceId() + .hasASpanWithATag(KafkaSenderObservation.SenderLowCardinalityTags.COMPONENT_TYPE, "sender") + .hasASpanWithATag(KafkaSenderObservation.SenderLowCardinalityTags.PRODUCER_ID, "observed.producer") + .hasASpanWithName(topic + " publish", spanAssert -> spanAssert.hasKindEqualTo(Span.Kind.PRODUCER)) + .hasASpanWithName(topic + " receive", spanAssert -> spanAssert.hasKindEqualTo(Span.Kind.CONSUMER)) + .hasASpanWithATag(KafkaReceiverObservation.ReceiverLowCardinalityTags.COMPONENT_TYPE, "receiver") + .hasASpanWithATag(KafkaReceiverObservation.ReceiverLowCardinalityTags.RECEIVER_ID, "user.receiver"); } }