Skip to content

Commit

Permalink
[#78] merge to main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
dwkang committed Sep 12, 2023
1 parent cf9e652 commit 5619a46
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
9 changes: 6 additions & 3 deletions plugin/sarama/example/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var fakeDB string

var producer sarama.SyncProducer
var producer ppsarama.SyncProducer

func prepareMessage(topic, message string) *sarama.ProducerMessage {
msg := &sarama.ProducerMessage{
Expand All @@ -31,8 +31,11 @@ func save(w http.ResponseWriter, r *http.Request) {

topic := "go-sarama-test"
msg := prepareMessage(topic, "Hello, Kafka!!")
ppsarama.WithContext(r.Context(), producer)
partition, offset, err := producer.SendMessage(msg)

//ppsarama.WithContext(r.Context(), producer)
//partition, offset, err := producer.SendMessage(msg)

partition, offset, err := producer.SendMessageContext(r.Context(), msg)

if err != nil {
fmt.Fprintf(w, "%s error occured.", err.Error())
Expand Down
24 changes: 19 additions & 5 deletions plugin/sarama/syncproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"github.com/pinpoint-apm/pinpoint-go-agent"
)

type SyncProducer interface {
sarama.SyncProducer
SendMessageContext(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
SendMessagesContext(ctx context.Context, msgs []*sarama.ProducerMessage) error
}

type syncProducer struct {
sarama.SyncProducer
addrs []string
Expand All @@ -23,16 +29,20 @@ func (m *distributedTracingContextWriterConsumer) Set(key string, value string)
})
}

func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
defer newProducerTracer(p.ctx, p.addrs, msg).EndSpanEvent()
func (p *syncProducer) SendMessageContext(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
defer newProducerTracer(ctx, p.addrs, msg).EndSpanEvent()
partition, offset, err = p.SyncProducer.SendMessage(msg)
return partition, offset, err
}

func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
return p.SendMessageContext(p.ctx, msg)
}

func (p *syncProducer) SendMessagesContext(ctx context.Context, msgs []*sarama.ProducerMessage) error {
spans := make([]pinpoint.Tracer, len(msgs))
for i, msg := range msgs {
spans[i] = newProducerTracer(p.ctx, p.addrs, msg)
spans[i] = newProducerTracer(ctx, p.addrs, msg)
}

err := p.SyncProducer.SendMessages(msgs)
Expand All @@ -43,6 +53,10 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
return err
}

func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
return p.SendMessagesContext(p.ctx, msgs)
}

func (p *syncProducer) Close() error {
return p.SyncProducer.Close()
}
Expand All @@ -52,7 +66,7 @@ func (p *syncProducer) WithContext(ctx context.Context) {
}

// NewSyncProducer wraps sarama.NewSyncProducer and returns a sarama.SyncProducer ready to instrument.
func NewSyncProducer(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) {
func NewSyncProducer(addrs []string, config *sarama.Config) (SyncProducer, error) {
producer, err := sarama.NewSyncProducer(addrs, config)
if err != nil {
return nil, err
Expand Down

0 comments on commit 5619a46

Please sign in to comment.