-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingester Processor #944
Ingester Processor #944
Conversation
d1565e5
to
38f5111
Compare
38f5111
to
ee86e47
Compare
Codecov Report
@@ Coverage Diff @@
## master #944 +/- ##
=====================================
Coverage 100% 100%
=====================================
Files 136 138 +2
Lines 6322 6343 +21
=====================================
+ Hits 6322 6343 +21
Continue to review full report at Codecov.
|
250f585
to
ece2341
Compare
// NewSpanProcessor creates a new SpanProcessor | ||
func NewSpanProcessor(writer spanstore.Writer) SpanProcessor { | ||
return &spanProcessor{ | ||
unmarshaller: kafka.NewProtobufUnmarshaller(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend making the unmarshaller configurable by using an Option
.
That way, we can default to protobuf, but allow people to easily conveniently override it to anything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good stuff
plugin/storage/kafka/unmarshaller.go
Outdated
// JSONUnmarshaller implements Unmarshaller | ||
type JSONUnmarshaller struct{} | ||
|
||
// NewJSONUnmarshaller constructs a ProtobufUnmarshaller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this was school, you'd be reprimanded
func (s spanProcessor) Process(message Message) error { | ||
mSpan, err := s.unmarshaller.Unmarshal(message.Value()) | ||
if err != nil { | ||
return errors.Wrap(err, "cannot read message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this message is a bit too generic, maybe could not unmarshal kafka message
?
1c101c2
to
973195f
Compare
973195f
to
58fb9c2
Compare
// Message contains the fields of the kafka message that the span processor uses | ||
type Message interface { | ||
Value() []byte | ||
} | ||
|
||
// NewSpanProcessor creates a new SpanProcessor | ||
func NewSpanProcessor(writer spanstore.Writer, unmarshaller kafka.Unmarshaller) SpanProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to use functional options here and default to protobuf
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't use functional options. Pass an options struct if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create a Params struct similar to the consumer
990848b
to
2017c2e
Compare
} | ||
|
||
// NewSpanProcessor creates a new KafkaSpanProcessor | ||
func NewSpanProcessor(params SpanProcessorParams) KafkaSpanProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you return a pointer instead?
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
2017c2e
to
35210fd
Compare
Signed-off-by: Davit Yeghshatyan <[email protected]>
35210fd
to
0f34c30
Compare
Which problem is this PR solving?
Short description of the changes