-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[jaeger-v2] Add kafka exporter and receiver configuration #4971
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #4971 +/- ##
==========================================
- Coverage 95.64% 95.63% -0.01%
==========================================
Files 325 325
Lines 18619 18640 +21
==========================================
+ Hits 17808 17827 +19
- Misses 651 652 +1
- Partials 160 161 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great start! But what we really need is an e2e integration test utilizing these configs.
HI! I am currently halfway through implementing the e2e test. I've discoveered testbed pkg from opentelemetry-collector-contrib repo, designed for Otel collector integration tests. I've successfully test e2e from load generator (otlp) -> collector-with-kafka -> kafka -> mock backend (kafka receiver), and now I'm stuck to test load generator -> collector-with-kafka -> kafka -> ingester -> mock backend because ingester doesn't actually export the traces instead it stores them to memory. Do you have any suggestions on how should the test be done? Here's the diagram of testbed and my e2e test structure. |
This is the first time I see the testbed framework, I will need to understand it better. What is the expectation of the mock backend - is it expected to "push" the data somewhere? If it can be pulled for data by the framework, then you could utilize a remote memory storage (i.e. running as a separate process), similar to how our existing integration test works:
|
Nope, it can be a "pull" strategy too! The mock backend uses the receiver component to retrieve the traces, and there are some receivers in a "pull" strategy, such as kafka_receiver.
Ah, okay, I think this can do. I'll try to read it. |
Hi, I've almost finished the integration test. Currently, I'm blocked on a mismatch model between OTLP Span Link (here) and Jaeger Span Reference, OTLP Span Link has attributes but Jaeger Span Reference doesn't thus the data sent with span link attributes will be discarded at the Jaeger storage.
How should we solve this? Does it make sense if we add attributes to our Span Reference model? |
It's going to be pretty significant change to add attributes to SpanRefs. Is it possible to configure the test controller not to generate attributes for the links? |
The only way to not generate attributes is not to generate a span link entirely. Are you okay with that? |
Yes, that's fine - we're not testing model conversion here, but the ingestion pipeline integration, it doesn't matter much what is in the spans. |
ff47003
to
0fc6015
Compare
Test Results2 081 tests +5 2 070 ✅ +4 1m 10s ⏱️ -1s Results for commit 51b71d2. ± Comparison against base commit adbdb2d. This pull request removes 1 and adds 6 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
) | ||
|
||
// Config has the configuration for jaeger-query, | ||
type Config struct { | ||
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` | ||
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc-plugin"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc-plugin"` | |
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` |
|
||
func (r *storageReceiver) consumeLoop(ctx context.Context) error { | ||
// golden data provider can produce one of these service names | ||
services := []string{"", "customers", "OTLPResourceNoServiceName"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more robust if you just loaded services from SpanReader, in case the testbed changes its behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tagging as needing followup
9042486
to
f92e3e7
Compare
Hi, I've raised a PR to fix the OTEL testbed data race and currently waiting for their review here open-telemetry/opentelemetry-collector-contrib#30549. |
86a92c8
to
497a8e7
Compare
Signed-off-by: James Ryans <[email protected]>
497a8e7
to
998fe18
Compare
Hi @yurishkuro, sorry, this PR became messy as I tried to solve the conflict merge with rebases, some failing CIs, an unsigned commit, and now I'm stuck with Build binaries CI. Can you help me find out what's wrong with my commits? I've successfully run Locally
CI
|
Signed-off-by: James Ryans <[email protected]>
Signed-off-by: James Ryans <[email protected]>
I have got a good news that the OTEL testbed data race PR has been merged! And I've updated the OTEL testbed module dependency to that specific commit open-telemetry/opentelemetry-collector-contrib@95e673e because we don't know when is the next release. Should I create a CI for this kafka integration test? |
Signed-off-by: James Ryans <[email protected]>
The next collector release will be around Feb 11.
Yes. Thanks. |
Signed-off-by: James Ryans <[email protected]>
Wow, yes, it has! I've updated the go.mod but encountered an issue that the OTEL testcase is not closing.. I'll get back to you after I've found and fixed the issue. 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks for taking this on Yuri!
@jkowall I'm just reviewing, this is a community contribution from @james-ryans |
I think the issue in our CIT kafka v2 is because of OTEL kafka receiver v0.93.0 bug, the receiver is shutting down indefinitely. I've raised an issue on OTEL contrib repository here open-telemetry/opentelemetry-collector-contrib#30789. If it is actually the OTEL contrib bug, we'll need to wait until the next release again. |
@james-ryans the ticket you referenced sounds like a deadlock issue during abnormal conditions, not when everything is running well, is that not true? Because if it's actually blocking normal operations it seems like a much bigger issue in OTEL. |
I'm not sure, but I think this actually is a big issue? 🤔 They have labeled the ticket to Or what you meant by "running well" is that when the collector never shut down? If yes, we can avoid this bug by not shutting them down after the test cases have been tested. |
I didn't get into the meat of the issue, got the impression that deadlock happens when something first happens during startup. Otherwise how do any of their unit tests work, do they not test shutdown? |
Signed-off-by: Yuri Shkuro <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
brokers: | ||
- localhost:9092 | ||
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift | ||
initial_offset: earliest # consume messages from the beginning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we should parameterize this and default to latest checkpoint rather than earliest, since it would be very bad to run with earliest in production. The integration tests can override the value via env var.
extensions: | ||
jaeger_storage: | ||
grpc: | ||
memstore: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't know it's a memstore
in this config, let's call it 'external-storage'
return receiver.NewFactory( | ||
componentType, | ||
createDefaultConfig, | ||
receiver.WithTraces(createTraces, component.StabilityLevelDevelopment), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver.WithTraces(createTraces, component.StabilityLevelDevelopment), | |
receiver.WithTraces(tracesReceiver, component.StabilityLevelDevelopment), |
return &Config{} | ||
} | ||
|
||
func createTraces(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func createTraces(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { | |
func tracesReceiver(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { |
if err != nil { | ||
return nil, fmt.Errorf("failed to init storage factory: %w", err) | ||
} | ||
// TODO add support for other backends |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use jaegerextension here, same as storageexporter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can. I'll change the implementation in a new PR of this receiver.
if err != nil { | ||
t.Fatal(err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
require.NoError(t, err)
t.Fatal(err) | ||
} | ||
configCleanup, err := runner.PrepareConfig(string(config)) | ||
require.NoError(t, err, "collector configuration resulted in: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
require.NoError(t, err, "collector configuration resulted in: %v", err) | |
require.NoError(t, err) |
load := i == 0 | ||
if load { | ||
tc.StartLoad(testbed.LoadOptions{ | ||
DataItemsPerSecond: 16, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean there's no limit on the number of traces the generator creates? Seems like it would result in a random amount in each test. Not necessarily a problem as long as it doesn't cause flakiness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has a limit, it will stop once it has used all the combinations of traces and spans from the fixtures. It consistently stops at 3,524 traces every time I run the test.
tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, | ||
10*time.Second, "all data items received") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, | |
10*time.Second, "all data items received") | |
tc.WaitForN(func() bool { | |
return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() | |
}, | |
10*time.Second, | |
"all data items received") |
@@ -0,0 +1 @@ | |||
FIXME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may want to pull storagereceiver into a separate PR as well, and add the tests to it (most other v2 packages already have tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please rebase on top of #5171
Hi @yurishkuro, thank you for all of this invaluable feedback! I just recovered from dental implant post-OP fever, I'll get back to this issue as soon as I can. Sorry if the revision takes long enough. 😄 |
@james-ryans was the upstream issue in OTEL resolved in their latest release? |
Not yet. I've tried @v0.95.0, but I still encountered issues with terminating the Kafka receiver. I haven't fully understood the root cause yet. The issue they referenced occurs at startup, but our issue happens on shutdown, which is confusing. I plan to help pinpoint and possibly fix the problem or find out if there is a workaround after I finish refactoring this PR based on your feedbacks. |
## Which problem is this PR solving? - Part of #4843 - Separate GRPC storage PR from and will be used by jaeger-v2 Kafka PR #4971 ## Description of the changes - Implement GRPC storage backend for Jaeger-V2 storage ## How was this change tested? - Run two `jaegertracing/jaeger-remote-storage` at `17271` and `17281` ports - Execute `go run -tags=ui ./cmd/jaeger --config ./cmd/jaeger/grpc_config.yaml` ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: James Ryans <[email protected]>
## Which problem is this PR solving? - Part of #4843 - Separate Jaeger storage receiver PR from and will be used by jaeger-v2 Kafka PR #4971 ## Description of the changes - Implement Jaeger storage receiver to be used by Jaeger-v2 Kafka integration test. ## How was this change tested? - Added some unit tests. ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: James Ryans <[email protected]>
@james-ryans given your later experience with v2 integration tests, how do you feel about this approach? Do you think we still need it, or could we also cover Kafka as part of the same v2 e2e tests? (Note: Kafka is one of the projects for summer mentorship) |
I think this approach is now outdated, same as the last discussion we did at #5254 (comment), it only solves the (1) requirement. This PR only checks if the span is correctly sent to Kafka and stored in remote storage.
I don't think this PR is relevant to the current solution anymore. We should change this to the proposed solution as the other v2 e2e tests (probably with some tweaks). |
Which problem is this PR solving?
Description of the changes
jaeger-collector
with kafka span storage type andjaeger-ingester
already have.How was this change tested?
./scripts/otel-kafka-integration-test.sh
to test the overall pipelines.Checklist
jaeger
:make lint test
jaeger-ui
:yarn lint
andyarn test