Skip to content

Commit

Permalink
[#78] add document comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dwkang committed Jan 10, 2024
1 parent 7ae50a7 commit ef56fa0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
36 changes: 20 additions & 16 deletions plugin/sarama/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@
//
// ConsumePartition example:
//
// ctx := ppsarama.NewContext(context.Background(), broker)
// pc, _ := consumer.ConsumePartition(topic, partition, offset)
// for msg := range pc.Messages() {
// ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
// }
// ctx := ppsarama.NewContext(context.Background(), broker)
// pc, _ := consumer.ConsumePartition(topic, partition, offset)
// for msg := range pc.Messages() {
// ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
// }
//
// ConsumerGroupHandler example:
//
// func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// ctx := sess.Context()
// for msg := range claim.Messages() {
// _ = ppsarama.ConsumeMessageContext(process, ctx, msg)
// }
// func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// ctx := sess.Context()
// for msg := range claim.Messages() {
// _ = ppsarama.ConsumeMessageContext(process, ctx, msg)
// }
//
// ConsumeMessageContext passes a context added pinpoint.Tracer to HandlerContextFunc.
// In HandlerContextFunc, this tracer can be obtained by using the pinpoint.FromContext function.
//
// func process(ctx context.Context, msg *sarama.ConsumerMessage) error {
// tracer := pinpoint.FromContext(ctx)
// defer tracer.NewSpanEvent("process").EndSpanEvent()
// func process(ctx context.Context, msg *sarama.ConsumerMessage) error {
// tracer := pinpoint.FromContext(ctx)
// defer tracer.NewSpanEvent("process").EndSpanEvent()
//
// fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
// fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
//
// To instrument a Kafka producer, use NewSyncProducer or NewAsyncProducer.
//
Expand All @@ -39,8 +39,12 @@
// It is necessary to pass the context containing the pinpoint.Tracer
// to sarama.SyncProducer (or sarama.AsyncProducer) using WithContext function.
//
// ppsarama.WithContext(pinpoint.NewContext(context.Background(), tracer), producer)
// partition, offset, err := producer.SendMessage(msg)
// ppsarama.WithContext(pinpoint.NewContext(context.Background(), tracer), producer)
// partition, offset, err := producer.SendMessage(msg)
//
// The WithContext function() function is not thread-safe, so use the SendMessageContext function() if you have a data trace.
//
// partition, offset, err := producer.SendMessageContext(r.Context(), msg)
package ppsarama

import (
Expand Down
4 changes: 4 additions & 0 deletions plugin/sarama/syncproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func (m *distributedTracingContextWriterConsumer) Set(key string, value string)
})
}

// SendMessageContext produces a given message with the context.
// It is possible to trace only when the given context contains a pinpoint.Tracer.
func (p *syncProducer) SendMessageContext(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
defer newProducerTracer(ctx, p.addrs, msg).EndSpanEvent()
partition, offset, err = p.SyncProducer.SendMessage(msg)
Expand All @@ -39,6 +41,8 @@ func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32
return p.SendMessageContext(p.ctx, msg)
}

// SendMessagesContext produces a given set of messages with the context.
// It is possible to trace only when the given context contains a pinpoint.Tracer.
func (p *syncProducer) SendMessagesContext(ctx context.Context, msgs []*sarama.ProducerMessage) error {
spans := make([]pinpoint.Tracer, len(msgs))
for i, msg := range msgs {
Expand Down

0 comments on commit ef56fa0

Please sign in to comment.