Skip to content

Commit

Permalink
Logs transform processor (open-telemetry#9335)
Browse files Browse the repository at this point in the history
* First pass frompdataconverter for stanza

* Create fromPdataConverter

* Current iteration of logs transform processor

* Commit updated factorytest

* Almost ready

* Continue iterating to try to resolve test issues with logs transform

* Update gomod

* Fix issues with failing to start pipeline

* Fix issue with trace flags conversion

* Fix tests

* Fix linter concern

* Add changelog & codeowner

* Add logs transform to versions

* Add link to logs processor readme

* Tiny update to readme

* update go sum files

* Update gomod

* Update processor test based on updates to opentelemetry-log-collection

* Update internal/stanza/frompdataconverter.go

Co-authored-by: Daniel Jaglowski <[email protected]>

* Address some PR feedback

* Fix missing default return

* Fix issue with fromconverter batch

* Re-fix issue with fromconverter batch

* Implement round trip test

* Fix round trip test

* Validate scope name where possible

* Mark processor as experimental

* Set observed time for pdata converter test

* Fix test's use of observedTime

* Address linter concerns

* Update internal/stanza/go.mod

Co-authored-by: Daniel Jaglowski <[email protected]>

* Revert "Update internal/stanza/go.mod"

This reverts commit fb1de19.

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
Sam DeHaan and djaglowski committed May 10, 2022
1 parent b5ce63e commit dc97633
Show file tree
Hide file tree
Showing 35 changed files with 1,837 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ processor/filterprocessor/ @open-telemetry/collector-c
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @pmm-sumo
processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @owais @dmitryax @pmm-sumo
processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa
processor/metricstransformprocessor/ @open-telemetry/collector-contrib-approvers @punya
processor/probabilisticsamplerprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/redactionprocessor/ @open-telemetry/collector-contrib-approvers @leonsp-ai @dmitryax @mx-psi
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

- `schemaprocessor`: Starting the initial work to allow from translating from semantic convention to another (#8371)
- `saphanareceiver`: Added implementation of SAP HANA Metric Receiver (#8827)
- `logstransformprocessor`: Add implementation of Logs Transform Processor (#9335)

### 💡 Enhancements 💡

Expand Down
2 changes: 1 addition & 1 deletion internal/stanza/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type InputConfig map[string]interface{}

// decodeOperatorConfigs is an unmarshaling workaround for stanza operators
// This is needed only until stanza operators are migrated to mapstructure
func (cfg BaseConfig) decodeOperatorConfigs() ([]operator.Config, error) {
func (cfg BaseConfig) DecodeOperatorConfigs() ([]operator.Config, error) {
if len(cfg.Operators) == 0 {
return []operator.Config{}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/stanza/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func insertToAttributeVal(value interface{}, dest pcommon.Value) {
case string:
dest.SetStringVal(t)
case []byte:
dest.SetStringVal(string(t))
dest.SetBytesVal(t)
case int64:
dest.SetIntVal(t)
case int32:
Expand Down Expand Up @@ -421,7 +421,7 @@ func insertToAttributeMap(obsMap map[string]interface{}, dest pcommon.Map) {
case string:
dest.InsertString(k, t)
case []byte:
dest.InsertString(k, string(t))
dest.InsertBytes(k, t)
case int64:
dest.InsertInt(k, t)
case int32:
Expand Down
12 changes: 8 additions & 4 deletions internal/stanza/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestConvert(t *testing.T) {
m.InsertInt("int", 123)
m.InsertDouble("double", 12.34)
m.InsertString("string", "hello")
m.InsertString("bytes", "asdf")
m.InsertBytes("bytes", []byte("asdf"))
assert.EqualValues(t, m.Sort(), lr.Body().MapVal().Sort())
}
}
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestConvertSimpleBody(t *testing.T) {
require.False(t, anyToBody(false).BoolVal())

require.Equal(t, "string", anyToBody("string").StringVal())
require.Equal(t, "bytes", anyToBody([]byte("bytes")).StringVal())
require.Equal(t, []byte("bytes"), anyToBody([]byte("bytes")).BytesVal())

require.Equal(t, int64(1), anyToBody(1).IntVal())
require.Equal(t, int64(1), anyToBody(int8(1)).IntVal())
Expand Down Expand Up @@ -637,10 +637,14 @@ func TestConvertMapBody(t *testing.T) {
v, _ = result.Get("false")
require.False(t, v.BoolVal())

for _, k := range []string{"string", "bytes"} {
for _, k := range []string{"string"} {
v, _ = result.Get(k)
require.Equal(t, k, v.StringVal())
}
for _, k := range []string{"bytes"} {
v, _ = result.Get(k)
require.Equal(t, []byte(k), v.BytesVal())
}
for _, k := range []string{"int", "int8", "int16", "int32", "int64", "uint", "uint8", "uint16", "uint32", "uint64"} {
v, _ = result.Get(k)
require.Equal(t, int64(1), v.IntVal())
Expand Down Expand Up @@ -678,7 +682,7 @@ func TestConvertArrayBody(t *testing.T) {
require.True(t, result.At(0).BoolVal())
require.False(t, result.At(1).BoolVal())
require.Equal(t, "string", result.At(2).StringVal())
require.Equal(t, "bytes", result.At(3).StringVal())
require.Equal(t, []byte("bytes"), result.At(3).BytesVal())

require.Equal(t, int64(1), result.At(4).IntVal()) // int
require.Equal(t, int64(1), result.At(5).IntVal()) // int8
Expand Down
5 changes: 5 additions & 0 deletions internal/stanza/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (e *LogEmitter) Stop() error {
return nil
}

// OutChannel returns the channel on which entries will be sent to.
func (e *LogEmitter) OutChannel() <-chan []*entry.Entry {
return e.logChan
}

// Process will emit an entry to the output channel
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/stanza/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) component.CreateLogsRec
}

baseCfg := logReceiverType.BaseConfig(cfg)
operatorCfgs, err := baseCfg.decodeOperatorConfigs()
operatorCfgs, err := baseCfg.DecodeOperatorConfigs()
if err != nil {
return nil, err
}
Expand Down
288 changes: 288 additions & 0 deletions internal/stanza/frompdataconverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stanza // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanza"

import (
"encoding/binary"
"math"
"runtime"
"sync"
"time"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

// FromPdataConverter converts a set of entry.Entry into pdata.Logs
//
// The diagram below illustrates the internal communication inside the FromPdataConverter:
//
// ┌─────────────────────────────────┐
// │ Batch() │
// ┌─────────┤ Ingests pdata.Logs, splits up │
// │ │ and places them on workerChan │
// │ └─────────────────────────────────┘
// │
// │ ┌───────────────────────────────────────────────────┐
// ├─► workerLoop() │
// │ │ ┌─────────────────────────────────────────────────┴─┐
// ├─┼─► workerLoop() │
// │ │ │ ┌─────────────────────────────────────────────────┴─┐
// └─┼─┼─► workerLoop() │
// └─┤ │ consumes sent log entries from workerChan, │
// │ │ translates received logs to entry.Entry, │
// └─┤ and sends them along entriesChan │
// └───────────────────────────────────────────────────┘
//
type FromPdataConverter struct {
// entriesChan is a channel on which converted logs will be sent out of the converter.
entriesChan chan []*entry.Entry

stopOnce sync.Once
stopChan chan struct{}

// workerChan is an internal communication channel that gets the log
// entries from Batch() calls and it receives the data in workerLoop().
workerChan chan fromConverterWorkerItem

// wg is a WaitGroup that makes sure that we wait for spun up goroutines exit
// when Stop() is called.
wg sync.WaitGroup

logger *zap.Logger
}

func NewFromPdataConverter(workerCount int, logger *zap.Logger) *FromPdataConverter {
if logger == nil {
logger = zap.NewNop()
}
if workerCount <= 0 {
workerCount = int(math.Max(1, float64(runtime.NumCPU())))
}

return &FromPdataConverter{
workerChan: make(chan fromConverterWorkerItem, workerCount),
entriesChan: make(chan []*entry.Entry),
stopChan: make(chan struct{}),
logger: logger,
}
}

func (c *FromPdataConverter) Start() {
c.logger.Debug("Starting log converter from pdata", zap.Int("worker_count", cap(c.workerChan)))

for i := 0; i < cap(c.workerChan); i++ {
c.wg.Add(1)
go c.workerLoop()
}
}

func (c *FromPdataConverter) Stop() {
c.stopOnce.Do(func() {
close(c.stopChan)
c.wg.Wait()
close(c.entriesChan)
close(c.workerChan)
})
}

// OutChannel returns the channel on which converted entries will be sent to.
func (c *FromPdataConverter) OutChannel() <-chan []*entry.Entry {
return c.entriesChan
}

type fromConverterWorkerItem struct {
Resource pdata.Resource
LogRecordSlice pdata.LogRecordSlice
Scope pdata.ScopeLogs
}

// workerLoop is responsible for obtaining pdata logs from Batch() calls,
// converting them to []*entry.Entry and sending them out
func (c *FromPdataConverter) workerLoop() {
defer c.wg.Done()

for {
select {
case <-c.stopChan:
return

case workerItem, ok := <-c.workerChan:
if !ok {
return
}

select {
case c.entriesChan <- convertFromLogs(workerItem):
case <-c.stopChan:
return
}
}
}
}

// Batch takes in an set of pdata.Logs and sends it to an available worker for processing.
func (c *FromPdataConverter) Batch(pLogs pdata.Logs) error {
for i := 0; i < pLogs.ResourceLogs().Len(); i++ {
rls := pLogs.ResourceLogs().At(i)
for j := 0; j < rls.ScopeLogs().Len(); j++ {
scope := rls.ScopeLogs().At(j)
item := fromConverterWorkerItem{
Resource: rls.Resource(),
Scope: scope,
LogRecordSlice: scope.LogRecords(),
}
select {
case c.workerChan <- item:
continue
case <-c.stopChan:
return nil
}
}
}

return nil
}

// convertFromLogs converts the contents of a fromConverterWorkerItem into a slice of entry.Entry
func convertFromLogs(workerItem fromConverterWorkerItem) []*entry.Entry {
result := make([]*entry.Entry, 0, workerItem.LogRecordSlice.Len())
for i := 0; i < workerItem.LogRecordSlice.Len(); i++ {
record := workerItem.LogRecordSlice.At(i)
entry := entry.Entry{}

entry.ScopeName = workerItem.Scope.Scope().Name()
entry.Resource = valueToMap(workerItem.Resource.Attributes())
convertFrom(record, &entry)
result = append(result, &entry)
}
return result
}

// ConvertFrom converts pdata.Logs into a slice of entry.Entry
// To be used in a stateless setting like tests where ease of use is more
// important than performance or throughput.
func ConvertFrom(pLogs pdata.Logs) []*entry.Entry {
result := make([]*entry.Entry, 0, pLogs.LogRecordCount())
for i := 0; i < pLogs.ResourceLogs().Len(); i++ {
rls := pLogs.ResourceLogs().At(i)
for j := 0; j < rls.ScopeLogs().Len(); j++ {
scope := rls.ScopeLogs().At(j)
result = append(result, convertFromLogs(fromConverterWorkerItem{Resource: rls.Resource(), Scope: scope, LogRecordSlice: scope.LogRecords()})...)
}
}
return result
}

// convertFrom converts pdata.LogRecord into provided entry.Entry.
func convertFrom(src pdata.LogRecord, ent *entry.Entry) {
// if src.Timestamp == 0, then leave ent.Timestamp as nil
if src.Timestamp() != 0 {
ent.Timestamp = src.Timestamp().AsTime()
}

if src.ObservedTimestamp() == 0 {
ent.ObservedTimestamp = time.Now()
} else {
ent.ObservedTimestamp = src.ObservedTimestamp().AsTime()
}

ent.Severity = fromPdataSevMap[src.SeverityNumber()]
ent.SeverityText = src.SeverityText()

ent.Attributes = valueToMap(src.Attributes())
ent.Body = valueToInterface(src.Body())

if !src.TraceID().IsEmpty() {
buffer := src.TraceID().Bytes()
ent.TraceId = buffer[:]
}
if !src.SpanID().IsEmpty() {
buffer := src.SpanID().Bytes()
ent.SpanId = buffer[:]
}
if src.Flags() != 0 {
a := make([]byte, 4)
binary.LittleEndian.PutUint32(a, src.Flags())
ent.TraceFlags = []byte{a[0]}
}
}

func valueToMap(value pdata.Map) map[string]interface{} {
rawMap := map[string]interface{}{}
value.Range(func(k string, v pdata.Value) bool {
rawMap[k] = valueToInterface(v)
return true
})
return rawMap
}

func valueToInterface(value pdata.Value) interface{} {
switch value.Type() {
case pcommon.ValueTypeEmpty:
return nil
case pcommon.ValueTypeString:
return value.StringVal()
case pcommon.ValueTypeBool:
return value.BoolVal()
case pcommon.ValueTypeDouble:
return value.DoubleVal()
case pcommon.ValueTypeInt:
return value.IntVal()
case pcommon.ValueTypeBytes:
return value.BytesVal()
case pcommon.ValueTypeMap:
return value.MapVal().AsRaw()
case pcommon.ValueTypeSlice:
arr := make([]interface{}, 0, value.SliceVal().Len())
for i := 0; i < value.SliceVal().Len(); i++ {
arr = append(arr, valueToInterface(value.SliceVal().At(i)))
}
return arr
default:
return value.AsString()
}
}

var fromPdataSevMap = map[pdata.SeverityNumber]entry.Severity{
plog.SeverityNumberUNDEFINED: entry.Default,
plog.SeverityNumberTRACE: entry.Trace,
plog.SeverityNumberTRACE2: entry.Trace2,
plog.SeverityNumberTRACE3: entry.Trace3,
plog.SeverityNumberTRACE4: entry.Trace4,
plog.SeverityNumberDEBUG: entry.Debug,
plog.SeverityNumberDEBUG2: entry.Debug2,
plog.SeverityNumberDEBUG3: entry.Debug3,
plog.SeverityNumberDEBUG4: entry.Debug4,
plog.SeverityNumberINFO: entry.Info,
plog.SeverityNumberINFO2: entry.Info2,
plog.SeverityNumberINFO3: entry.Info3,
plog.SeverityNumberINFO4: entry.Info4,
plog.SeverityNumberWARN: entry.Warn,
plog.SeverityNumberWARN2: entry.Warn2,
plog.SeverityNumberWARN3: entry.Warn3,
plog.SeverityNumberWARN4: entry.Warn4,
plog.SeverityNumberERROR: entry.Error,
plog.SeverityNumberERROR2: entry.Error2,
plog.SeverityNumberERROR3: entry.Error3,
plog.SeverityNumberERROR4: entry.Error4,
plog.SeverityNumberFATAL: entry.Fatal,
plog.SeverityNumberFATAL2: entry.Fatal2,
plog.SeverityNumberFATAL3: entry.Fatal3,
plog.SeverityNumberFATAL4: entry.Fatal4,
}
Loading

0 comments on commit dc97633

Please sign in to comment.