Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove usage of deprecated pdata package #9971

Merged
merged 1 commit into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions internal/stanza/frompdataconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ import (
"time"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

// FromPdataConverter converts a set of entry.Entry into pdata.Logs
// FromPdataConverter converts a set of entry.Entry into plog.Logs
//
// The diagram below illustrates the internal communication inside the FromPdataConverter:
//
// ┌─────────────────────────────────┐
// │ Batch() │
// ┌─────────┤ Ingests pdata.Logs, splits up │
// ┌─────────┤ Ingests plog.Logs, splits up
// │ │ and places them on workerChan │
// │ └─────────────────────────────────┘
// │
Expand Down Expand Up @@ -107,9 +106,9 @@ func (c *FromPdataConverter) OutChannel() <-chan []*entry.Entry {
}

type fromConverterWorkerItem struct {
Resource pdata.Resource
LogRecordSlice pdata.LogRecordSlice
Scope pdata.ScopeLogs
Resource pcommon.Resource
LogRecordSlice plog.LogRecordSlice
Scope plog.ScopeLogs
}

// workerLoop is responsible for obtaining pdata logs from Batch() calls,
Expand All @@ -136,8 +135,8 @@ func (c *FromPdataConverter) workerLoop() {
}
}

// Batch takes in an set of pdata.Logs and sends it to an available worker for processing.
func (c *FromPdataConverter) Batch(pLogs pdata.Logs) error {
// Batch takes in an set of plog.Logs and sends it to an available worker for processing.
func (c *FromPdataConverter) Batch(pLogs plog.Logs) error {
for i := 0; i < pLogs.ResourceLogs().Len(); i++ {
rls := pLogs.ResourceLogs().At(i)
for j := 0; j < rls.ScopeLogs().Len(); j++ {
Expand Down Expand Up @@ -174,10 +173,10 @@ func convertFromLogs(workerItem fromConverterWorkerItem) []*entry.Entry {
return result
}

// ConvertFrom converts pdata.Logs into a slice of entry.Entry
// ConvertFrom converts plog.Logs into a slice of entry.Entry
// To be used in a stateless setting like tests where ease of use is more
// important than performance or throughput.
func ConvertFrom(pLogs pdata.Logs) []*entry.Entry {
func ConvertFrom(pLogs plog.Logs) []*entry.Entry {
result := make([]*entry.Entry, 0, pLogs.LogRecordCount())
for i := 0; i < pLogs.ResourceLogs().Len(); i++ {
rls := pLogs.ResourceLogs().At(i)
Expand All @@ -189,8 +188,8 @@ func ConvertFrom(pLogs pdata.Logs) []*entry.Entry {
return result
}

// convertFrom converts pdata.LogRecord into provided entry.Entry.
func convertFrom(src pdata.LogRecord, ent *entry.Entry) {
// convertFrom converts plog.LogRecord into provided entry.Entry.
func convertFrom(src plog.LogRecord, ent *entry.Entry) {
// if src.Timestamp == 0, then leave ent.Timestamp as nil
if src.Timestamp() != 0 {
ent.Timestamp = src.Timestamp().AsTime()
Expand Down Expand Up @@ -223,16 +222,16 @@ func convertFrom(src pdata.LogRecord, ent *entry.Entry) {
}
}

func valueToMap(value pdata.Map) map[string]interface{} {
func valueToMap(value pcommon.Map) map[string]interface{} {
rawMap := map[string]interface{}{}
value.Range(func(k string, v pdata.Value) bool {
value.Range(func(k string, v pcommon.Value) bool {
rawMap[k] = valueToInterface(v)
return true
})
return rawMap
}

func valueToInterface(value pdata.Value) interface{} {
func valueToInterface(value pcommon.Value) interface{} {
switch value.Type() {
case pcommon.ValueTypeEmpty:
return nil
Expand All @@ -259,7 +258,7 @@ func valueToInterface(value pdata.Value) interface{} {
}
}

var fromPdataSevMap = map[pdata.SeverityNumber]entry.Severity{
var fromPdataSevMap = map[plog.SeverityNumber]entry.Severity{
plog.SeverityNumberUNDEFINED: entry.Default,
plog.SeverityNumberTRACE: entry.Trace,
plog.SeverityNumberTRACE2: entry.Trace2,
Expand Down
17 changes: 8 additions & 9 deletions internal/stanza/frompdataconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

func BenchmarkConvertFromPdataSimple(b *testing.B) {
b.StopTimer()
pLogs := pdata.NewLogs()
pLogs := plog.NewLogs()
b.StartTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -47,7 +46,7 @@ func BenchmarkConvertFromPdataComplex(b *testing.B) {
}
}

func baseMap() pdata.Map {
func baseMap() pcommon.Map {
obj := pcommon.NewMap()
arr := pcommon.NewValueSlice()
arr.SliceVal().AppendEmpty().SetStringVal("666")
Expand All @@ -61,15 +60,15 @@ func baseMap() pdata.Map {
return obj
}

func baseMapValue() pdata.Value {
func baseMapValue() pcommon.Value {
v := pcommon.NewValueMap()
baseMap := baseMap()
baseMap.CopyTo(v.MapVal())
return v
}

func complexPdataForNDifferentHosts(count int, n int) pdata.Logs {
pLogs := pdata.NewLogs()
func complexPdataForNDifferentHosts(count int, n int) plog.Logs {
pLogs := plog.NewLogs()
logs := pLogs.ResourceLogs()

for i := 0; i < count; i++ {
Expand Down Expand Up @@ -125,7 +124,7 @@ func TestRoundTrip(t *testing.T) {
require.Equal(t, initialLogs, pLogs)
}

func sortComplexData(pLogs pdata.Logs) {
func sortComplexData(pLogs plog.Logs) {
pLogs.ResourceLogs().At(0).Resource().Attributes().Sort()
attrObject, _ := pLogs.ResourceLogs().At(0).Resource().Attributes().Get("object")
attrObject.MapVal().Sort()
Expand Down Expand Up @@ -222,7 +221,7 @@ func TestConvertFrom(t *testing.T) {
func TestConvertFromSeverity(t *testing.T) {
cases := []struct {
expectedSeverity entry.Severity
severityNumber pdata.SeverityNumber
severityNumber plog.SeverityNumber
}{
{entry.Default, plog.SeverityNumberUNDEFINED},
{entry.Trace, plog.SeverityNumberTRACE},
Expand Down Expand Up @@ -254,7 +253,7 @@ func TestConvertFromSeverity(t *testing.T) {
for _, tc := range cases {
t.Run(fmt.Sprintf("%v", tc.severityNumber), func(t *testing.T) {
entry := entry.New()
logRecord := pdata.NewLogRecord()
logRecord := plog.NewLogRecord()
logRecord.SetSeverityNumber(tc.severityNumber)
convertFrom(logRecord, entry)
require.Equal(t, tc.expectedSeverity, entry.Severity)
Expand Down
1 change: 0 additions & 1 deletion internal/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/open-telemetry/opentelemetry-log-collection v0.29.1
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/trace v1.7.0
Expand Down
2 changes: 0 additions & 2 deletions internal/stanza/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion processor/logstransformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/open-telemetry/opentelemetry-log-collection v0.29.1
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/zap v1.21.0
gonum.org/v1/gonum v0.11.0
Expand Down
2 changes: 0 additions & 2 deletions processor/logstransformprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"gonum.org/v1/gonum/graph/topo"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanza"
)

type outputType struct {
logs pdata.Logs
logs plog.Logs
err error
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos
return nil
}

func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
// Add the logs to the chain
err := ltp.fromConverter.Batch(ld)
if err != nil {
Expand Down
58 changes: 25 additions & 33 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

Expand Down Expand Up @@ -63,15 +62,15 @@ func parseTime(format, input string) *time.Time {
}

type testLogMessage struct {
body *pdata.Value
body pcommon.Value
time *time.Time
observedTime *time.Time
severity pdata.SeverityNumber
severity plog.SeverityNumber
severityText *string
spanID *pdata.SpanID
traceID *pdata.TraceID
spanID pcommon.SpanID
traceID pcommon.TraceID
flags uint32
attributes *map[string]pdata.Value
attributes *map[string]pcommon.Value
}

func TestLogsTransformProcessor(t *testing.T) {
Expand All @@ -91,47 +90,47 @@ func TestLogsTransformProcessor(t *testing.T) {
config: cfg,
sourceMessages: []testLogMessage{
{
body: &baseMessage,
spanID: &spanID,
traceID: &traceID,
body: baseMessage,
spanID: spanID,
traceID: traceID,
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
},
{
body: &baseMessage,
spanID: &spanID,
traceID: &traceID,
body: baseMessage,
spanID: spanID,
traceID: traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
},
},
parsedMessages: []testLogMessage{
{
body: &baseMessage,
body: baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
attributes: &map[string]pcommon.Value{
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
spanID: spanID,
traceID: traceID,
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
},
{
body: &baseMessage,
body: baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
attributes: &map[string]pcommon.Value{
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
spanID: spanID,
traceID: traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
Expand Down Expand Up @@ -166,19 +165,17 @@ func TestLogsTransformProcessor(t *testing.T) {
}
}

func generateLogData(messages []testLogMessage) pdata.Logs {
func generateLogData(messages []testLogMessage) plog.Logs {
ld := testdata.GenerateLogsOneEmptyResourceLogs()
scope := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
for _, content := range messages {
log := scope.LogRecords().AppendEmpty()
if content.body != nil {
content.body.CopyTo(log.Body())
}
content.body.CopyTo(log.Body())
if content.time != nil {
log.SetTimestamp(pdata.NewTimestampFromTime(*content.time))
log.SetTimestamp(pcommon.NewTimestampFromTime(*content.time))
}
if content.observedTime != nil {
log.SetObservedTimestamp(pdata.NewTimestampFromTime(*content.observedTime))
log.SetObservedTimestamp(pcommon.NewTimestampFromTime(*content.observedTime))
}
if content.severity != 0 {
log.SetSeverityNumber(content.severity)
Expand All @@ -193,13 +190,8 @@ func generateLogData(messages []testLogMessage) pdata.Logs {
log.Attributes().Sort()
}

if content.spanID != nil {
log.SetSpanID(*content.spanID)
}

if content.traceID != nil {
log.SetTraceID(*content.traceID)
}
log.SetSpanID(content.spanID)
log.SetTraceID(content.traceID)

if content.flags != uint32(0x00) {
log.SetFlags(content.flags)
Expand Down
1 change: 0 additions & 1 deletion receiver/filelogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.50.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions receiver/filelogreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion receiver/journaldreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.50.0 // indirect
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions receiver/journaldreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading