Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Streams trace map calls #825

Closed
artemyarulin opened this issue Nov 12, 2018 · 14 comments
Closed

Kafka Streams trace map calls #825

artemyarulin opened this issue Nov 12, 2018 · 14 comments

Comments

@artemyarulin
Copy link

Current Kafka Streams implementation using message headers which is available only in Producer/Tranformer which makes it impossible to build detailed trace of Kafka Streams topologies with different map/filter/reduce/etc steps.

Example output from https://github.com/artemyarulin/kafka-tracing-example looks like that:

After some discussion in Gitter it was suggested that traced map may be a good starting point as even with complex topologies developer can always call map before/after important operation.

@artemyarulin
Copy link
Author

@jeqo Please correct me if I've described that issue in a wrong way :)

@jeqo
Copy link
Member

jeqo commented Nov 13, 2018

@artemyarulin thanks for creating the issue. This seems close to what has been discussed in the PR: #800 (comment)

We could add kStream.transform(kafkaStreamsTracing.map(spanName, fn)) to trace kStream.map(fn). Do you see value in other operations as well? e.g. we can also have kafkaStreamsTracing.mark(spanName) or something similar to annotate start/end of a complex task.

@artemyarulin
Copy link
Author

mark operations would be really great 👍

It's much more explicit comparing to wrapped map call and it also gives more flexibility as it would be possible to simply call mark at the beginning/end of a complex topology if you want just to have one span for the whole stream of use many in case more details is needed

So I imagine it would look smth like that?

val kafkaStreamsTracing = // create KafkaStreamTracing
val builder = StreamsBuilder().apply {
            stream(topic)
               .transform(kafkaStreamsTracing.mark("start"))
               .mapValues { k, v ->
                    log.info("[1 to 1] Processing $k and $v")
                    Thread.sleep(1000)
                    v + 1
                }
               .transform(kafkaStreamsTracing.mark("after"))
               .to(config.topic + "_a", Produced.with(Serdes.String(), Serdes.Long()))
        }

@jeqo
Copy link
Member

jeqo commented Nov 13, 2018

cc @ImFlog

@ImFlog
Copy link
Contributor

ImFlog commented Nov 15, 2018

@jeqo, If we do a start mark and an end mark aren't falling back to the empty time span creation ?
We can't create one span starting at start and finishing at end right ?

@jeqo
Copy link
Member

jeqo commented Nov 15, 2018

@ImFlog yes, we'd be falling back to that option. Not sure if we can keep a span "alive" in between "start" -> "processor" -> "end".

For those cases I would prefer a wrapper instead:

val kafkaStreamsTracing = // create KafkaStreamTracing
val builder = StreamsBuilder().apply {
            stream(topic)
               .transform(kafkaStreamsTracing.mapValues("map", { k, v ->
                    log.info("[1 to 1] Processing $k and $v")
                    Thread.sleep(1000)
                    v + 1
                })
               .to(config.topic + "_a", Produced.with(Serdes.String(), Serdes.Long()))
        }

And mark for annotation purposes? - or maybe is not needed if we have enough wrappers (e.g. map/mapValues/flatMap)

@ImFlog
Copy link
Contributor

ImFlog commented Nov 19, 2018

AFAIR, the threading model doesn't allow us to keep the span alive.
But yeah, mark for annotation could add some useful information, we wanted to know if people were interested in this feature, it seems so :)

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Nov 19, 2018 via email

@jeqo
Copy link
Member

jeqo commented Nov 25, 2018

I have an initial draft of a set of operations to add initially to the API: map, mapValues, foreach, peek and mark ref: master...jeqo:kafka-streams-wrapper#diff-b8dd8e4445870a6a249cf383bdfd6cf7R130

Not sure if other operations as filter or flatMap will also add value here. WDYT?

@artemyarulin
Copy link
Author

StreamsBuilder builder = new StreamsBuilder();
    builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
        .process(kafkaStreamsTracing.mark("mark-1"));
    Topology topology = builder.build();
     KafkaStreams streams = buildKafkaStreams(topology);

That is beautiful ❤️

Question if I can - how does mark is rendered in a zipkin UI? I guess as a trace of 0 duration. What do you think - would it be possible/useful to have markStart/markEnd so that we can track the duration of any steps as well?

@jeqo
Copy link
Member

jeqo commented Nov 26, 2018

AFAIR, it is a "dot" or a latency close to 0 ns.

Also, I think peek and mark would be better represented as transformers to be able to continue the process (processor returns void in Streams DSL). I made the changes on the referenced branch.

@jeqo
Copy link
Member

jeqo commented Nov 26, 2018

screenshot from 2018-11-26 12-18-47

@artemyarulin here is a example, were mark the span created by the mark operation.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Nov 26, 2018 via email

@jeqo
Copy link
Member

jeqo commented Apr 8, 2019

Fixed by #833

@jeqo jeqo closed this as completed Apr 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants