Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential context leak in kafka-clients instrumentation #1947

Open
trask opened this issue Dec 19, 2020 · 16 comments
Open

Potential context leak in kafka-clients instrumentation #1947

trask opened this issue Dec 19, 2020 · 16 comments
Labels
bug Something isn't working

Comments

@trask
Copy link
Member

trask commented Dec 19, 2020

TracingIterator wraps Iterator<ConsumerRecord> returned from kafka, and starts scopes in next() and closes them in hasNext() for each record in the iterator.

This is a clever way to put the context in scope, and may be acceptable, but worth reviewing alternatives, including removing it and only automatically putting context in scope when kafka is used from higher-level APIs, e.g. spring-kafka's MessageListener.onMessage() and vertx-kafka's KafkaReadStream.poll() which takes a handler (we can start a span for the handler and create links to all of the messages in the batch).

@pavolloffay
Copy link
Member

I have find one Kafka issue that might be related to this. I have seen multiple spans with the same ID coming from the Kafka instrumentation.

Screenshot of Jaeger UI (7) (1)

@trask
Copy link
Member Author

trask commented Apr 14, 2022

This instrumentation is definitely leaky.

I sent a PR to reduce the impact of this leakiness: #5826

But it's still worth considering dropping this instrumentation, and recommending manual instrumentation of all poll use cases, e.g.

    ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<K, V> record : records) {
      Context context =
          GlobalOpenTelemetry.getPropagators()
              .getTextMapPropagator()
              .extract(Context.root(), record, KafkaConsumerRecordGetter.INSTANCE);
      try (Scope ignored = context.makeCurrent()) {
        ...
      }
    }

Where KafkaConsumerRecordGetter can be copied locally or imported from io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common.

@mateuszrzeszutek
Copy link
Member

mateuszrzeszutek commented Apr 14, 2022

But it's still worth considering dropping this instrumentation, and recommending manual instrumentation of all poll use cases, e.g.

I 100% agree with that 💯 -- we could even provide some utilities for correct manual instrumentation (along with examples) as a part of the library instrumentation.

But, before doing that we'd have to implement a dedicated spring-kafka instrumentation (the current one only covers batch processing, kafka-clients covers the 1-record scenario), a vertx-kafka instrumentation (I think it also depends on kafka-clients instrumentation), and perhaps consider doing something with reactor-kafka (I'm ~95% sure it's broken right now). And there's kafka-streams too, it also relies on kafka-clients instrumentation.

@trask
Copy link
Member Author

trask commented Apr 15, 2022

as @anuraaga pointed out in SIG meeting, even with PR #5826, you can still end up with an unended (and so unreported) consumer span when an exception is thrown out of the iterator loop.

this is not nearly as bad as leaking the context and then getting same trace id stamped forever after, but it's still problematic and probably reason enough to deprecate the ConsumerRecords iterator instrumentation and provide a simple library/guidance for users who are using poll. this simple library/guidance could be used by both non-javaagent and javaagent users.

@62mkv
Copy link

62mkv commented Dec 6, 2023

Hi @mateuszrzeszutek any news/plans on this one? we're also facing leaking trace id's and missing spans for reactor-kafka based consumers.. 😢 javaagent, 1.31

@almogtavor
Copy link

@62mkv are you using Schedulers.parallel()? I've been facing issues only when using it, and resolved them (according to what I have seen until now) by manually reactivating the original Context for each message

@trask
Copy link
Member Author

trask commented Dec 21, 2023

@62mkv @almogtavor if you are able to create a repro for either of your issues, can you open a new issue for it and link to your repro? thx!

@almogtavor
Copy link

@trask I've created a repro (not as a test yet, but as a repo that has a processor that reads from Kafka with reactor Kafka, and traces go wrong when uncommenting the parallel scheduler). #9771

@jackrwoods
Copy link

I have run into a similar situation--not with Kafka--but basically we have some business logic that processes data from an iterator with no guarantees that the iterator will be completely consumed. Each next() call interacts with many layers of calculations which each have spans that need to be end()ed when we are done with the iterator.

We have opted to create a custom ImplicitContextKeyed type to track the Span stack and close them out when we are done. Each context with a new span contains a reference to the parent span's context, which recursively could contain another parent context reference etc. We walk this reference chain to close out any open spans until we reach some designated root ancestor context.

Does this seem like a reasonable work around for now, or is there a different approach that's preferred for these types of situations?

@trask
Copy link
Member Author

trask commented Apr 4, 2024

hi @jackrwoods! can you open a new issue for this? it's probably not too related to the kafka leak, since the problem here is that with automatic instrumentation we are limited to where we can insert hooks, which isn't a limitation for manual instrumentation, so there may be more/better options for you

@jackrwoods
Copy link

I'll open one. Thanks @trask !

@Cirilla-zmh
Copy link
Contributor

Hi @trask ,

I am currently focused on adding automatic instrumentation for messaging frameworks implemented in Java and Go. I believe the instrumentation of kafka-clients in opentelemetry-java-instrumentation is a clever solution. Without such instrumentation, we cannot provide any zero-code approaches to track the consumer process, similar to the challenges we're facing with Go instrumentation.

I have a couple of questions that I would appreciate your insights on:

  1. Does the kafka-clients instrumentation still experience context leaks in the latest version?
  2. Is there a better way to offer a zero-code approach for users to instrument the consumer process, or is manual instrumentation the only option available?

I am very interested in messaging system instrumentation, but I haven't found much discussion or input from the OpenTelemetry community regarding these issues. I'm glad to see the topic you've raised. Is there any relevant SIG or ongoing discussion in the community where I could get involved?

Thank you!

@trask
Copy link
Member Author

trask commented Sep 18, 2024

hi @Cirilla-zmh! there's a Messaging SIG that meets once a week: https://github.com/open-telemetry/community#:~:text=Semantic%20Conventions%3A%20Messaging

you're welcome to join there, or post in the #otel-messaging slack channel

Does the kafka-clients instrumentation still experience context leaks in the latest version?

I think it's possible, but it's mitigated a bit by overwriting any leaked span the next time a batch of messages is read over

Is there a better way to offer a zero-code approach for users to instrument the consumer process, or is manual instrumentation the only option available?

I don't think with Java "iterator" style APIs at least (with higher level messaging APIs, e.g. Spring Kafka, this is not an issue)

@Cirilla-zmh
Copy link
Contributor

Thank you! @trask

I think it's possible, but it's mitigated a bit by overwriting any leaked span the next time a batch of messages is read over

I apologize for any misunderstanding, but I'm having difficulty grasping the details. I've reviewed the MRs, but I'm particularly unclear about this one: #6021. My understanding is that the receive context shouldn't be leaked even if the process spans are suppressed, as the receive context is only referenced by ConsumeRecords, which will eventually be garbage collected.

Could you please provide a more detailed explanation? Thank you!

I don't think with Java "iterator" style APIs at least (with higher level messaging APIs, e.g. Spring Kafka, this is not an issue)

Higher-level messaging APIs, which allow developers to register message handlers, are typically easier to instrument. However, most APIs primarily offer poll or receiveMessage methods, which provide greater flexibility for developers. In such cases, I think it's really a great challenge to find out alternative approaches for instrumenting the consumption process.

Therefore, will the "iterator" style implementation be phased out entirely?

@trask
Copy link
Member Author

trask commented Sep 27, 2024

which will eventually be garbage collected

sorry, by "context leak" I mean it will be left bound to the thread and could leak into the next transaction (but not cause a memory leak since bounded at most one bound context pre thread

In such cases, I think it's really a great challenge to find out alternative approaches for instrumenting the consumption process.

👍

will the "iterator" style implementation be phased out entirely?

I suspect not for autoinstrumentation given its already in widespread use, but our recommendations could change down the line if it becomes an issue.

@Cirilla-zmh
Copy link
Contributor

Thanks a lot for your reply!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants