-
-
Notifications
You must be signed in to change notification settings - Fork 197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Context to Record #201
Conversation
pkg/kgo/record_and_fetch.go
Outdated
@@ -139,6 +146,25 @@ type Record struct { | |||
Offset int64 | |||
} | |||
|
|||
// WithContext enriches the Record with a Context. | |||
func (r *Record) WithContext(ctx context.Context) *Record { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for WithContext? I think this would only be necessary for producing, which already currently accepts a context to both Produce and ProduceSync. It may be less confusing to set r.ctx = ctx
inside of Produce, rather than having two areas for setting the context and people not knowing which one actually controls cancelling, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use case is to enrich consumer records. In a consumer hook, we would extract trace data from the Kafka headers to a context and enrich the record with that context. This makes propagating trace data very convenient and save users the hassle of manual work. They would use the consumer record context to do further tracing in their consumer logic.
The consumer hook implementation would look like this:
func (k *Kotel) OnFetchRecordBuffered(r *kgo.Record) {
textMapPropagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})
ctx := textMapPropagator.Extract(context.Background(), NewRecordCarrier(r))
r = r.WithContext(ctx)
...
}
Understandably, having two areas for setting the context is confusing. We are open to any suggestions you see necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I was hoping that if I initialized the context internally, there would not need to be any public setter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could provide a mechanism for setting the Context, kgo.WithContextSetter(func(r *kgo.Record){} ctx.Context)
or a hook with a context return value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaning to adding a public Context (or Ctx) field, rather than a getter / setter. If only read access were needed, a method would be better, but write access complicates things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@twmb Okay, that works, I still think it is important that the context is being set on the record in the produce
call. Otherwise it will be impossible to access the context(produce context) from the produce related hooks, and thus create and end spans are impossible.
Thanks for thinking about this, we understand that adding a field to the Record struct for the use case of OpenTelemetry is controversial, but I dont really think there is any other way if franz-go needs to support OpenTelemetry in a seamless manner.
Hello @twmb, We made some changes after hearing feedback. Thank you for your consideration and review of this pull request. |
Great that you updated this, @twmb I hope this aligns with your expectation. |
@@ -364,6 +364,8 @@ func (cl *Client) produce( | |||
) { | |||
if ctx == nil { | |||
ctx = context.Background() | |||
} else { | |||
r.Context = ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to set the context unconditionally to whatever edit: pause on this, need to ask clarifying question. Not sure a record's context should be set here at all.ctx
, how about dropping the else
?
See #201 and #200. Some packages want to stuff information into a context with WithValue, and then that information can be used to trace spans (see the open telemetry packages). Without a Context field or something equivalent, it is not possible to trace spans once a record is handed off to franz-go. We want to use these spans in franz-go hooks. We do not need to do anything on the consumer side, but unfortunately, we need the Context field to be **writable** so that consumers can initialize the context (and stuff information into it) with WithValue. So, we just add a new field to the record.
I've squashed the four commits here into one, moved Context to the end of the Record struct, modified doc a tiny bit, and pushed this in 1a13af8 Thanks! Sorry for the delay on this one. Still trying to chase down if some things are bugs in this library that need to be addressed right now or not (I don't think so, thankfully) |
TBH, this is a bit confusing. If we want to enrich the Record with data, why wouldn't we use something more generic like Not to mention, Contexts are stacks, and I can not set Record.Context in a thread safe way since adding a value would look like this: record.Context = context.WithValue(record.Context, k, v) How would I lock the Record to ensure no one else is accessing record.Context? I'd have to wrap Record in another struct which has a lock. If I'm doing that, I don't need a Context on the record at all, I can just put this Context in the wrapper. Seems to me, this could have been solved by the requester with a simple type like: type RecordCtx struct {
record *kgo.Record
ctx context.Context
}
// safely set context values
func (rc RecordCtx) WithValue(k,v interface{}) RecordCtx {
return RecordCtx{
record: rc.record,
ctx: context.WithValue(rc.ctx, k, v),
}
} It's already in now. oh well. |
Agreed, I had thought about this, but requiring type assertions to use what's in the arbitrary container would be a bit ugly. I know the same thing happens when people use
I think the intent here is that you, as owner creating the record, set the context once in the same way you set a key / value once.
That was my original hope too, but the problem comes from the consumer half the API if you want to set the record's context as early as possible (in a hook). If you want to start tracking information ASAP in |
In this pull request, we add a
Context
to theRecord
type. Adding the context provides the ability to enrich the record with data. Our use-case is observing requests/events as they propagate through Kafka and the rest our distributed environment, aka the "distributed tracing" pattern[1].The following snippet demonstrates how an upstream process propagates its context to franz-go. In this example, we would like to start tracing HTTP requests when using an HTTP server. The HTTP request context includes tracing data. This context is passed to the Kafka producer. We made an update to produce that checks for context and enriches the Kafka record. From there, a hook can reference the context and continue tracing the request.
Other libraries use the pattern of adding a context to entities such as HTTP[2]. Having the context on the record allows for instrumenting code via existing hooks. We demonstrate a POC of instrumentation in PR#200[3].
Links:
[1] https://microservices.io/patterns/observability/distributed-tracing.html
[2] https://cs.opensource.google/go/go/+/refs/tags/go1.19.1:src/net/http/request.go;l=355-363
[3] #200
CC: @tobiasbrodersen @brunsgaard
@twmb, what are your thoughts on this proposal?