Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-321: Add Observation propagation #325

Merged
merged 11 commits into from
Oct 26, 2023
Merged

GH-321: Add Observation propagation #325

merged 11 commits into from
Oct 26, 2023

Conversation

artembilan
Copy link
Contributor

@artembilan artembilan commented Feb 17, 2023

Fixes #321

  • Copy-paste (and adjust) KafkaReceiverObservation & KafkaSenderObservation infrastructure from Spring for Apache Kafka
  • Add an Observation handling into a KafkaSender with contextCapture() and KafkaSenderObservation around producer.send()
  • An Observation handling on the consumer side is only around a consumer record to start and stop immediately to fill a gap for COSNUMER kind span in the propagated trace from producer.
    If there is need to propagate an observation to downstream flow, then KafkaReceiverObservation API has to be used in the flatMap() around the Mono for record processing.
  • The ReactorKafkaObservationTests demonstrates a single trace propagation from a sender via parent Observation and its restoration on a consumer side via KafkaReceiverObservation in the end-user code

@artembilan artembilan marked this pull request as draft February 17, 2023 19:16
@artembilan
Copy link
Contributor Author

/CC @marcingrzejszczak , @chemicL .

I believe it might not be possible to stick with an old Reactor version to preserve the current version.
So, I guess this really may lead to a new version for this project and respective upgrades for everything.

I converted this to Draft since I don't have yet a clear vision how to handle an Observation on a consumer side per record.
The current KafkaReceiverObservation is OK to restore the trace as a CONSUMER kind and have it connected to respective parent trace from the producer, but this is what end-user have to to ad the moment in their code.
I doubt we can propagate each record through Reactor context.

Thanks

@patpatpat123
Copy link

Cheering for this PR

@Tomasz-Marciniak
Copy link

Tomasz-Marciniak commented Mar 7, 2023

Hi, I think that tracing is still missing in 1.3.17-SNAPSHOT.

I managed to turn on traces in 1.3.16 by extending

ReactiveKafkaProducerTemplate:

class ReactiveTracingKafkaProducerTemplate<K, V>(
    senderOptions: SenderOptions<K, V>,
    private val kafkaAdmin: KafkaAdmin?,
    private val observationRegistry: ObservationRegistry
) :
    ReactiveKafkaProducerTemplate<K, V>(senderOptions), BeanNameAware {

    private lateinit var beanName: String
    private var clusterId: String? = null

    override fun <T : Any?> send(record: SenderRecord<K, V, T>): Mono<SenderResult<T>> {
        val observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(
            null, KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention.INSTANCE,
            { KafkaRecordSenderContext(record, this.beanName) { this.clusterId() } },
            observationRegistry
        )

        return super.send(record).contextWrite(Function.identity())
            .doFirst { observation.start() }
            .doOnSuccess {
                observation.stop()
            }
            .doOnError {
                observation.error(it)
                observation.stop()
            }
    }

    @Nullable
    private fun clusterId(): String? {
        if (kafkaAdmin != null && this.clusterId == null) {
            this.clusterId = kafkaAdmin.clusterId()
        }
        return this.clusterId
    }

    override fun setBeanName(beanName: String) {
        this.beanName = beanName;
    }
}

And ReactiveKafkaConsumerTemplate:

class ReactiveTracingKafkaConsumerTemplate<K, V>(
    receiverOptions: ReceiverOptions<K, V>,
    private val kafkaAdmin: KafkaAdmin?,
    private val observationRegistry: ObservationRegistry
) :
    ReactiveKafkaConsumerTemplate<K, V>(receiverOptions), BeanNameAware {
    private lateinit var beanName: String
    private var clusterId: String? = null

    override fun receiveAutoAck(): Flux<ConsumerRecord<K, V>> {
        return super.receiveAutoAck()
            .map {
                val observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
                    null,
                    DefaultKafkaListenerObservationConvention.INSTANCE,
                    {
                        KafkaRecordReceiverContext(
                            it, this.beanName
                        ) { clusterId() }
                    },
                    observationRegistry
                )
                observation.observe {  }
                it
            }.contextWrite(Function.identity())
    }

    @Nullable
    private fun clusterId(): String? {
        if (kafkaAdmin != null && this.clusterId == null) {
            this.clusterId = kafkaAdmin.clusterId()
        }
        return this.clusterId
    }

    override fun setBeanName(beanName: String) {
        this.beanName = beanName;
    }
}

I know this solution is far from perfect one but maybe you could move some parts of this code to reactor kafka project.

I think it should be located somewhere in DefaultKafkaSender and DefaultKafkaReceiver or there should be some option to register proper listeners.

@artembilan - Could you please comment on this?

@artembilan
Copy link
Contributor Author

@Tomasz-Marciniak ,

Thank you for sharing the code, but what you show so far has nothing to do with Reactor Kafka.
It is more Spring for Apache Kafka stuff, but when we come up with the fix for this project, we won't to do anything on Spring for Apache Kafka side.

Your producer side is similar to what I did in the KafkaSender code.
The consumer is a bit awkward since what you do is just observe an empty action: observation.observe { }.
I'm not sure that this is what users would like to deal with.
Therefore in this PR I'm thinking about some callback for users to make something useful from an observation when you process a consumed record. Withing the context of a Mono per record.

I'm not sure in your .contextWrite(Function.identity()) though. you don't put your observation into a context, so there won't be anything to propagate back to ThreadLocal.

It is also not fair to say I think that tracing is still missing in 1.3.17-SNAPSHOT., since this PR has not been merged yet and respective GH issue is not closed.

@Tomasz-Marciniak
Copy link

Tomasz-Marciniak commented Mar 7, 2023

@artembilan

Thank you for your comment. Much appreciated. I understand this is related to Spring for Apache Kafka but under the hood spring-kafka uses DefaultKafkaSender. Since I have no opportunity to adjust this code I had to make a workaround and I wrapped usage of DefaultKafkaSender:

  public class ReactiveKafkaProducerTemplate<K, V> implements AutoCloseable, DisposableBean {
  
      private final **KafkaSender**<K, V> sender;

  }

I just wanted to share that in case of sender it may work like this:

val observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(
            null, KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention.INSTANCE,
            { KafkaRecordSenderContext(record, beanName) { this.clusterId() } },
            observationRegistry
        )

        return super.send(record).contextWrite(Function.identity())
            .doFirst { observation.start() }
            .doFinally { observation.stop() }
            .doOnError { observation.error(it) }

I'm aware that 1.3.17-SNAPSHOT is not finished, I just did not know if you were planning to add the support for tracing in this release and I wanted to tell that in current library I could not find it.

Regarding .contextWrite(Function.identity()), I was just playing with that function, the traces for receiving message are visible in Tempo but the trace id are not visible in logs so I tried everything. It helped me in case of Mongo reactive repos. To be honest I do not know how to wrap around the whole Flux/Mono on the consumer side to track receiving the message.

@Tomasz-Marciniak
Copy link

@artembilan

I have went through PRs and seems I get your point related to callback. I managed to do something like this and it works:

fun <T> receiveAutoAckWithCallback(consumerCallbackFunction: Function<Flux<ConsumerRecord<K, V>>, Flux<T>>): Flux<T> {
    return super.receiveAutoAck().flatMap { record ->
        Mono.just(record)
            .transformDeferred { it ->
                val observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
                    null,
                    KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE,
                    { KafkaRecordReceiverContext(record, beanName) { clusterId() } },
                    observationRegistry
                )
                observation.start();
                
                consumerCallbackFunction.apply(Flux.from(it))
                    .doOnTerminate(observation::stop)
                    .doOnError(observation::error)
                    .contextWrite { context -> context.put(ObservationThreadLocalAccessor.KEY, observation) };
            }
    }

To invoke it I simply use this:

override fun consume(): Flux<UUID> {
      return template.receiveAutoAckWithCallback {
          receiveFluxCallback(it)
      }
  }

private fun receiveFluxCallback(it: Flux<ConsumerRecord<String, String>>): Flux<UUID> =
    it
        .doOnNext { consumerRecord: ConsumerRecord<String, String> ->
            log.info(
                "Received kafka message with attributes: key={}, value={} from topic={}, offset={}",
                consumerRecord.key(),
                consumerRecord.value(),
                consumerRecord.topic(),
                consumerRecord.offset()
            )
        }.map {
            UUID.randomUUID()
        }

I'm not sure the Function is the best approach here but for now, I'm ok with that and in the future, I'm sure I will have no such challenges with 1.3.17 :)

@artembilan
Copy link
Contributor Author

Sure! That's also possible. We can consider it as an API evolution: I believe my current low-level approach still can be used for regular receiveAutoAck().
My point is that is not known how end-user would like to process his/her records within an observation. Therefore it might be still OK to give such a chance to do that transformDeferred() manually.

The discussion is not over yet: your feedback is really valuable.

Thank you!

@Tomasz-Marciniak
Copy link

Tomasz-Marciniak commented Mar 8, 2023

Thank you Artem,

Yes with transformDeffered all further actions were gone in trace graphs, like saving in mongo for instance.

I ended up with:

fun <T> receiveAutoAckWithCallback(function: Function<Flux<ConsumerRecord<K, V>>, Flux<T>>): Flux<T> {
    return super.receiveAutoAck()
        .flatMap {
            val observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
                null,
                KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE,
                { KafkaRecordReceiverContext(it, beanName) { clusterId() } },
                observationRegistry
            )
            observation.start();
            function.apply(Flux.just(it)).doOnTerminate(observation::stop)
                .doOnError(observation::error)
                .contextWrite { context -> context.put(ObservationThreadLocalAccessor.KEY, observation) };
        }
}

This produce nice trace graph:

image

@patpatpat123
Copy link

Just checking on this PR, it is a great initiative, hope there is nothing blocking it!

@patpatpat123
Copy link

Hello team, the latest v1.3.18 only had one change. It seems the repo is quite stable lately. Would it be possible to merge this?

@artembilan
Copy link
Contributor Author

Would it be possible to merge this?

I didn't come up with clean end-user API for Consumer observation.
Plus I think now that it is better to ask for a client id from end-user when we user observation instead of trying to infer it from producer props.

@tongdaoqa
Copy link

Was about to start a feature request for the same. Found this (long opened) thread.

Big +1 for this!

What is blocking the merge please?
Non reactive have tracing well in place

@kuldeep0508
Copy link

Any update on this MR ?

@yusuf-murodov-gamerpay
Copy link

Hi @artembilan, any chance to merge this in near future?

Thanks

@artembilan artembilan marked this pull request as ready for review September 6, 2023 18:07
@artembilan
Copy link
Contributor Author

OK. I'm not sure who to ask for review on this, so I went ahead with closest contacts who are aware of this project and the stuff I do in the change.

Thank you every one for feedback and sorry for delay!

@patpatpat123
Copy link

This is great! Thanks @artembilan ! Hope this can be merged!

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - any thoughts about adding documentation?

We can merge this for next week's release, if you prefer, and add docs later?

@chemicL
Copy link
Member

chemicL commented Sep 8, 2023

I had a look. It looks like this is an important improvement for the lives of developers :) I have a few questions/suggestions to understand the implementation and the intentions:

  • The default pattern is to create an artificial Observation for each item in the Flux returned via receive() and immediately close the Observation – therefore any timing information is insignificant (both for metrics and for tracing data)?
  • Assuming the above is true, the user also won't get any log correlation out of the change, unless they manually handle each item via a flatMap-like operator, where each item gets its own Mono, to which the Observation inferred is attached, correct?
  • Docs for this would be really appreciated I suppose.
  • Any chance to enhance the API to save the users from doing the manual work? E.g. receiveObserved variants, which could accept an item transformer (Function<ReceiverRecord, Mono<T>>), which would get the inferred Observation attached via contextWrite to the chain that the user provides?

* Revert new version `1.4`
* Don't use `contextCapture()` in the `DefaultKafkaSender`:
better to ask end-users to use a `contextWrite()` for the parent observation to propagate
* `ReceiverObservations` factories to observe `ConsumerRecord` on the end-user side
via `transformDeferred()` on each record
* Fix deprecation in the `ConsumerHandler` for `eventScheduler.start()`
* Use generated `producerId` and `receiverId` if `CLIENT_ID_CONFIG` is not provided
* Use an `Observation` in the `DefaultKafkaReceiver` to add a `CONSUMER` span to
the trace provided by the `PRODUCER` in the record.
* Recommend to use a `KafkaReceiverObservation.RECEIVER_OBSERVATION` API directly
in the target end-user code when need to have an observation context around record handling
* Exclude `org.mockito` from Micrometer deps since we cannot use
a newer version of Mockito yet
* Add docs about observation support
src/docs/asciidoc/api-guide.adoc Outdated Show resolved Hide resolved
src/docs/asciidoc/api-guide.adoc Outdated Show resolved Hide resolved
src/docs/asciidoc/api-guide.adoc Outdated Show resolved Hide resolved
src/docs/asciidoc/api-guide.adoc Outdated Show resolved Hide resolved
src/docs/asciidoc/api-guide.adoc Outdated Show resolved Hide resolved
Co-authored-by: Gary Russell <[email protected]>
@garyrussell garyrussell merged commit 5436ae9 into main Oct 26, 2023
1 check passed
@patpatpat123
Copy link

This is great! Thanks!

I would like to help test this. Just to be sure I am testing the "right thing"

Should I pull the latest available snapshot from https://repo.spring.io/snapshot ?

@michalwilk96
Copy link

Hello @garyrussell, @artembilan,
when do you plan the next reactor-kafka release?

@garyrussell
Copy link
Contributor

November 7: https://calendar.spring.io/

@artembilan
Copy link
Contributor Author

@patpatpat123 ,

yes, the latest SNAPSHOT is available in that repo.

@michalwilk96
Copy link

Hi @artembilan , @garyrussell
I don't see that fix on the list of reactor-kafka 1.3.22 release: https://github.com/reactor/reactor-kafka/milestone/39?closed=1. Is it true?

@michalwilk96
Copy link

November 7: https://calendar.spring.io/

@garyrussell 7 November or 14?

@jonatan-ivanov
Copy link

@michalwilk96 Adding the milestone to the issue and to the PR as well would show up as a duplicate so usually the milestone is added to the issue: #321

@artembilan
Copy link
Contributor Author

Just added that issue to the respective milestone.
Yes, November 14: https://github.com/reactor/reactor-kafka/milestone/39

@patpatpat123
Copy link

Hello team,

Just verified 1.3.22 was not part of SpringBoot 3.2.0-RC2

Would it be possible to have this PR as part of the official 3.2.0 release due one week after 1.3.22 of reactor kafka?
https://github.com/spring-projects/spring-boot/milestone/308?closed=1

Thank you

@artembilan
Copy link
Contributor Author

This is release is going to be a part of upcoming Reactor release trains tomorrow.
And indeed those Reactor BOMs are going to be pulled automatically by upcoming Spring Boot releases next week.

@violetagg violetagg added the type/enhancement A general enhancement label Nov 14, 2023
@violetagg violetagg added this to the 1.3.22 milestone Nov 14, 2023
@violetagg
Copy link
Member

violetagg commented Nov 14, 2023

@michalwilk96 Adding the milestone to the issue and to the PR as well would show up as a duplicate so usually the milestone is added to the issue: #321

@jonatan-ivanov That's not how we do this in Project Reactor. It is important that PRs has the milestone and the type, because we need to generate the release notes based on this information.

@mukul20798
Copy link

Hi @marcingrzejszczak , Here in this change, type casting(String) is used for bootstrap.severs value and when I am passing my server values(e.g listOf(val1,val2) as mentioned here in springframework, then it is giving type cast exception as below

java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class java.lang.String (java.util.ArrayList and java.lang.String are in module java.base of loader 'bootstrap') at reactor.kafka.receiver.ReceiverOptions.bootstrapServers(ReceiverOptions.java:573) at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$observerRecord$21(DefaultKafkaReceiver.java:160) at io.micrometer.observation.Observation.createNotStarted(Observation.java:169) at io.micrometer.observation.docs.ObservationDocumentation.observation(ObservationDocumentation.java:188) at reactor.kafka.receiver.internals.DefaultKafkaReceiver.observerRecord(DefaultKafkaReceiver.java:156) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453) at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724) at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runBackfused(FluxPublishOn.java:484) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:521) at reactor.core.scheduler.ExecutorScheduler$ExecutorTrackedRunnable.run(ExecutorScheduler.java:192) at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84) at reactor.core.scheduler.SingleWorkerScheduler.execute(SingleWorkerScheduler.java:64) at reactor.core.scheduler.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:252) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:283) at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:364) at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:238) at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) at reactor.kafka.receiver.internals.ConsumerEventLoop$PollEvent.run(ConsumerEventLoop.java:380) at io.micrometer.context.ContextSnapshot.lambda$wrap$0(ContextSnapshot.java:91) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)

@artembilan
Copy link
Contributor Author

@mukul20798 ,

this has nothing to do with observation support.
That is how Reactor Kafka was designed originally for that property.

we cannot change that to realign with Spring Boot option in the current patch version.

here is how you have to configure for now on:

producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
				StringUtils.collectionToCommaDelimitedString(kafkaProperties.getBootstrapServers()));

Feel free to raise a new GH issue, so we will review this when we start a new version.

and stop, please, comment on old closed issues/PRs: the comment might not be related to the subject or easily lost. Because , you know , it is closed already .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reactor Core + reactor-core-micrometer + context-propagation + tap, still no trace ID seen