Skip to content

Commit

Permalink
Chore: signozkafkaexporter: ensure log body is always a string (#302)
Browse files Browse the repository at this point in the history
* chore: make signozkafkareceiver.NewPdataLogsUnmarshaler public for testing signozkafkaexporter

* chore: add test validating signozkafkaexporter converts body bytes to text string

* feat: signozkafkaexporter: convert bytes sent in log bodies to text string

* chore: clean things up
  • Loading branch information
raj-k-singh authored Mar 16, 2024
1 parent 053e7f5 commit d70589d
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 7 deletions.
31 changes: 31 additions & 0 deletions exporter/signozkafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -108,6 +109,8 @@ type kafkaLogsProducer struct {
}

func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error {
e.normalizeLogData(&ld)

kafkaTopicPrefix, err := getKafkaTopicFromClientMetadata(client.FromContext(ctx).Metadata)
if err != nil {
return consumererror.NewPermanent(err)
Expand All @@ -131,6 +134,34 @@ func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) er
return nil
}

func (e *kafkaLogsProducer) normalizeLogData(ld *plog.Logs) {
for rlIdx := 0; rlIdx < ld.ResourceLogs().Len(); rlIdx++ {
rl := ld.ResourceLogs().At(rlIdx)

for slIdx := 0; slIdx < rl.ScopeLogs().Len(); slIdx++ {
sl := rl.ScopeLogs().At(slIdx)

for lrIdx := 0; lrIdx < sl.LogRecords().Len(); lrIdx++ {
lr := sl.LogRecords().At(lrIdx)

// log body is always expected to be string in SigNoz
if lr.Body().Type() != pcommon.ValueTypeStr {
var strBody string
if lr.Body().Type() == pcommon.ValueTypeBytes {
strBody = string(lr.Body().Bytes().AsRaw())
} else {
strBody = lr.Body().AsString()
}

lr.Body().SetStr(strBody)

}

}
}
}
}

func (e *kafkaLogsProducer) Close(context.Context) error {
return e.producer.Close()
}
Expand Down
51 changes: 51 additions & 0 deletions exporter/signozkafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package signozkafkaexporter
import (
"context"
"fmt"
"reflect"
"testing"

"github.com/Shopify/sarama"
Expand All @@ -21,6 +22,7 @@ import (
"go.uber.org/zap"

"github.com/SigNoz/signoz-otel-collector/internal/coreinternal/testdata"
"github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver"
)

func TestNewExporter_err_version(t *testing.T) {
Expand Down Expand Up @@ -242,6 +244,55 @@ func TestLogsDataPusher(t *testing.T) {
require.NoError(t, err)
}

func TestLogBodyBytesGetConvertedToTextString(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
p := kafkaLogsProducer{
producer: producer,
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})

testLogCount := 5
testBody := []byte("test log")

logs := testdata.GenerateLogsManyLogRecordsSameResource(testLogCount)
for i := 0; i < testLogCount; i++ {
lr := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i)
logBodyBytes := lr.Body().SetEmptyBytes()
logBodyBytes.Append(testBody...)
}

producer.ExpectSendMessageWithCheckerFunctionAndSucceed(func(val []byte) error {
unmarshaler := signozkafkareceiver.NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding)
producedLogs, err := unmarshaler.Unmarshal(val)
if err != nil {
return err
}

for i := 0; i < testLogCount; i++ {
lr := producedLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i)
producedBody := lr.Body().AsRaw()
producedBodyStr, ok := producedBody.(string)
if !ok || producedBodyStr != string(testBody) {
return fmt.Errorf(
"unexpected log body produced: testLogIdx: %d, type %s, value: %v",
i, reflect.TypeOf(producedBody).String(), producedBody,
)
}
}

return nil
})

ctx := client.NewContext(context.Background(), client.Info{Metadata: client.NewMetadata(map[string][]string{"signoz_tenant_id": {"test_tenant_id"}})})
err := p.logsDataPusher(ctx, logs)

require.NoError(t, err)
}

func TestLogsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down
8 changes: 4 additions & 4 deletions receiver/signozkafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
Expand Down Expand Up @@ -701,7 +701,7 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
Expand Down Expand Up @@ -746,7 +746,7 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
Expand All @@ -773,7 +773,7 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
Expand Down
2 changes: 1 addition & 1 deletion receiver/signozkafkareceiver/pdata_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (p pdataLogsUnmarshaler) Encoding() string {
return p.encoding
}

func newPdataLogsUnmarshaler(unmarshaler plog.Unmarshaler, encoding string) LogsUnmarshaler {
func NewPdataLogsUnmarshaler(unmarshaler plog.Unmarshaler, encoding string) LogsUnmarshaler {
return pdataLogsUnmarshaler{
Unmarshaler: unmarshaler,
encoding: encoding,
Expand Down
2 changes: 1 addition & 1 deletion receiver/signozkafkareceiver/pdata_unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ func TestNewPdataMetricsUnmarshaler(t *testing.T) {
}

func TestNewPdataLogsUnmarshaler(t *testing.T) {
um := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, "test")
um := NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, "test")
assert.Equal(t, "test", um.Encoding())
}
2 changes: 1 addition & 1 deletion receiver/signozkafkareceiver/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler {
}

func defaultLogsUnmarshalers() map[string]LogsUnmarshaler {
otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding)
otlpPb := NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding)
raw := newRawLogsUnmarshaler()
text := newTextLogsUnmarshaler()
return map[string]LogsUnmarshaler{
Expand Down

0 comments on commit d70589d

Please sign in to comment.