-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
writer.go
94 lines (82 loc) · 2.55 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"github.com/Shopify/sarama"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"github.com/jaegertracing/jaeger/model"
)
type spanWriterMetrics struct {
SpansWrittenSuccess metrics.Counter
SpansWrittenFailure metrics.Counter
}
// SpanWriter writes spans to kafka. Implements spanstore.Writer
type SpanWriter struct {
metrics spanWriterMetrics
producer sarama.AsyncProducer
marshaller Marshaller
topic string
}
// NewSpanWriter initiates and returns a new kafka spanwriter
func NewSpanWriter(
producer sarama.AsyncProducer,
marshaller Marshaller,
topic string,
factory metrics.Factory,
logger *zap.Logger,
) *SpanWriter {
writeMetrics := spanWriterMetrics{
SpansWrittenSuccess: factory.Counter(metrics.Options{Name: "kafka_spans_written", Tags: map[string]string{"status": "success"}}),
SpansWrittenFailure: factory.Counter(metrics.Options{Name: "kafka_spans_written", Tags: map[string]string{"status": "failure"}}),
}
go func() {
for range producer.Successes() {
writeMetrics.SpansWrittenSuccess.Inc(1)
}
}()
go func() {
for e := range producer.Errors() {
logger.Error(e.Err.Error())
writeMetrics.SpansWrittenFailure.Inc(1)
}
}()
return &SpanWriter{
producer: producer,
marshaller: marshaller,
topic: topic,
metrics: writeMetrics,
}
}
// WriteSpan writes the span to kafka.
func (w *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
spanBytes, err := w.marshaller.Marshal(span)
if err != nil {
w.metrics.SpansWrittenFailure.Inc(1)
return err
}
// The AsyncProducer accepts messages on a channel and produces them asynchronously
// in the background as efficiently as possible
w.producer.Input() <- &sarama.ProducerMessage{
Topic: w.topic,
Key: sarama.StringEncoder(span.TraceID.String()),
Value: sarama.ByteEncoder(spanBytes),
}
return nil
}
// Close closes SpanWriter by closing producer
func (w *SpanWriter) Close() error {
return w.producer.Close()
}